diff --git a/mlonmcu/cli/common.py b/mlonmcu/cli/common.py index 9e2808b29..6590817eb 100644 --- a/mlonmcu/cli/common.py +++ b/mlonmcu/cli/common.py @@ -201,23 +201,10 @@ def kickoff_runs(args, until, context): assert len(context.sessions) > 0 session = context.sessions[-1] # session.label = args.label - config = extract_config(args) - # TODO: move into context/session - per_stage = True - print_report = True - if "runs_per_stage" in config: - per_stage = bool(config["runs_per_stage"]) - elif "runs_per_stage" in context.environment.vars: - per_stage = bool(context.environment.vars["runs_per_stage"]) - if "print_report" in config: - print_report = bool(config["print_report"]) - elif "print_report" in context.environment.vars: - print_report = bool(context.environment.vars["print_report"]) success = session.process_runs( until=until, - per_stage=per_stage, - print_report=print_report, num_workers=args.parallel, + print_report=True, progress=args.progress, context=context, export=True, diff --git a/mlonmcu/context.py b/mlonmcu/context.py index 897a2ee54..05e1d4e4f 100644 --- a/mlonmcu/context.py +++ b/mlonmcu/context.py @@ -129,7 +129,7 @@ def get_environment_by_name(name: str) -> Environment: return None -def get_ids(directory: Path) -> List[int]: +def get_infos(directory: Path) -> List[tuple]: """Get a sorted list of ids for sessions/runs found in the given directory. Parameters @@ -139,13 +139,26 @@ def get_ids(directory: Path) -> List[int]: Returns: list - List of integers representing the session numbers. Empty list if directory does not exist. + List of tuples containing the session/run numbers and their hidden attribute. + Empty list if directory does not exist. """ if not directory.is_dir(): return [] - ids = [int(o) for o in os.listdir(directory) if os.path.isdir(directory / o) and not os.path.islink(directory / o)] - return sorted(ids) # TODO: sort by session datetime? + def extract_idx(x): + hidden = False + if x[0] == ".": + hidden = True + x = x[1:] + if "_" in x: + idx = x.split("_") + idx = tuple(list(map(int, idx))) + else: + idx = int(x) + return idx, hidden + + ids = [extract_idx(o) for o in os.listdir(directory) if os.path.isdir(directory / o) and not os.path.islink(directory / o)] + return sorted(ids, key=lambda x: x[0] if isinstance(x[0], tuple) else (x[0], )) # TODO: sort by session datetime? def load_recent_sessions(env: Environment, count: int = None) -> List[Session]: @@ -171,22 +184,23 @@ def load_recent_sessions(env: Environment, count: int = None) -> List[Session]: sessions_directory = env.paths["temp"].path / "sessions" # TODO: in the future also strs (custom or hash) should be allowed - session_ids = get_ids(sessions_directory) + session_infos = get_infos(sessions_directory) - for sid in session_ids: + for sid, _ in session_infos: session_directory = sessions_directory / str(sid) # session_file = sessions_directory / str(sid) / "session.txt" # if not session_file.is_file(): # continue runs_directory = session_directory / "runs" - run_ids = get_ids(runs_directory) + run_infos = get_infos(runs_directory) runs = [] - for rid in run_ids: + for rid, hidden in run_infos: run_directory = runs_directory / str(rid) # run_file = run_directory / "run.txt" # run = Run.from_file(run_file) # TODO: actually implement run restore run = Run() # TODO: fix run.archived = True + run.hidden = hidden run.dir = run_directory runs.append(run) session = Session(idx=sid, archived=True, dir=session_directory) diff --git a/mlonmcu/feature/feature.py b/mlonmcu/feature/feature.py index f27b19f4e..d16cb1290 100644 --- a/mlonmcu/feature/feature.py +++ b/mlonmcu/feature/feature.py @@ -55,7 +55,12 @@ def helper(key): return {helper(key): value for key, value in config.items() if f"{self.name}." in key} def __repr__(self): - return type(self).__name__ + f"({self.name})" + probs = [] + if self.name: + probs.append(self.name) + if self.config and len(self.config) > 0: + probs.append(str(self.config)) + return "Feature(" + ",".join(probs) + ")" # @property # def types(self): diff --git a/mlonmcu/flow/backend.py b/mlonmcu/flow/backend.py index e6f185a1d..e7252a1c0 100644 --- a/mlonmcu/flow/backend.py +++ b/mlonmcu/flow/backend.py @@ -54,8 +54,14 @@ def __init__( self.tuner = None def __repr__(self): - name = type(self).name - return f"Backend({name})" + probs = [] + if self.name: + probs.append(type(self).name) + if self.features and len(self.features) > 0: + probs.append(str(self.features)) + if self.config and len(self.config) > 0: + probs.append(str(self.config)) + return "Backend(" + ",".join(probs) + ")" def process_features(self, features): # Filter out non-backend features diff --git a/mlonmcu/flow/framework.py b/mlonmcu/flow/framework.py index f101c9b71..e0ce18811 100644 --- a/mlonmcu/flow/framework.py +++ b/mlonmcu/flow/framework.py @@ -38,6 +38,16 @@ def __init__(self, features=None, config=None, backends={}): self.features = self.process_features(features) self.config = filter_config(self.config, self.name, self.DEFAULTS, self.OPTIONAL, self.REQUIRED) + def __repr__(self): + probs = [] + if self.name: + probs.append(type(self).name) + if self.features and len(self.features) > 0: + probs.append(str(self.features)) + if self.config and len(self.config) > 0: + probs.append(str(self.config)) + return "Framework(" + ",".join(probs) + ")" + def process_features(self, features): if features is None: return [] diff --git a/mlonmcu/models/model.py b/mlonmcu/models/model.py index 09202c71d..356559cc3 100644 --- a/mlonmcu/models/model.py +++ b/mlonmcu/models/model.py @@ -99,6 +99,9 @@ def outputs_path(self): return self.config["outputs_path"] def __repr__(self): - if self.alt: - return f"Model({self.name},alt={self.alt})" - return f"Model({self.name})" + probs = [] + if self.name: + probs.append(self.name) + if self.config and len(self.config) > 0: + probs.append(str(self.config)) + return "Model(" + ",".join(probs) + ")" diff --git a/mlonmcu/platform/platform.py b/mlonmcu/platform/platform.py index 52c74ec36..36b50c90e 100644 --- a/mlonmcu/platform/platform.py +++ b/mlonmcu/platform/platform.py @@ -67,6 +67,16 @@ def __init__(self, name, features=None, config=None): self.config = filter_config(self.config, self.name, self.DEFAULTS, self.OPTIONAL, self.REQUIRED) self.artifacts = [] + def __repr__(self): + probs = [] + if self.name: + probs.append(self.name) + if self.features and len(self.features) > 0: + probs.append(str(self.features)) + if self.config and len(self.config) > 0: + probs.append(str(self.config)) + return "Platform(" + ",".join(probs) + ")" + def init_directory(self, path=None, context=None): raise NotImplementedError diff --git a/mlonmcu/session/group.py b/mlonmcu/session/group.py new file mode 100644 index 000000000..2ef12a01c --- /dev/null +++ b/mlonmcu/session/group.py @@ -0,0 +1,19 @@ +# +# Copyright (c) 2022 TUM Department of Electrical and Computer Engineering. +# +# This file is part of MLonMCU. +# See https://github.com/tum-ei-eda/mlonmcu.git for further info. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""TODO""" diff --git a/mlonmcu/session/job.py b/mlonmcu/session/job.py new file mode 100644 index 000000000..2ef12a01c --- /dev/null +++ b/mlonmcu/session/job.py @@ -0,0 +1,19 @@ +# +# Copyright (c) 2022 TUM Department of Electrical and Computer Engineering. +# +# This file is part of MLonMCU. +# See https://github.com/tum-ei-eda/mlonmcu.git for further info. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""TODO""" diff --git a/mlonmcu/session/postprocess/postprocess.py b/mlonmcu/session/postprocess/postprocess.py index 8ad50a9a3..9d42ef1b2 100644 --- a/mlonmcu/session/postprocess/postprocess.py +++ b/mlonmcu/session/postprocess/postprocess.py @@ -36,6 +36,16 @@ def __init__(self, name, config=None, features=None): self.features = self.process_features(features) self.config = filter_config(self.config, self.name, self.DEFAULTS, self.OPTIONAL, self.REQUIRED) + def __repr__(self): + probs = [] + if self.name: + probs.append(self.name) + if self.features and len(self.features) > 0: + probs.append(str(self.features)) + if self.config and len(self.config) > 0: + probs.append(str(self.config)) + return "Postprocess(" + ",".join(probs) + ")" + def process_features(self, features): """Utility which handles postprocess_features.""" # Currently there is no support for postprocess features (FIXME) diff --git a/mlonmcu/session/progress.py b/mlonmcu/session/progress.py new file mode 100644 index 000000000..15afe5c0a --- /dev/null +++ b/mlonmcu/session/progress.py @@ -0,0 +1,51 @@ +# +# Copyright (c) 2022 TUM Department of Electrical and Computer Engineering. +# +# This file is part of MLonMCU. +# See https://github.com/tum-ei-eda/mlonmcu.git for further info. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""TODO""" + +from tqdm import tqdm + + +def init_progress(total, msg="Processing..."): + """Helper function to initialize a progress bar for the session.""" + return tqdm( + total=total, + desc=msg, + ncols=100, + bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}s]", + leave=None, + mininterval=0.001, + # maxinterval=0, + ) + + +def update_progress(pbar, count=1): + """Helper function to update the progress bar for the session.""" + pbar.update(count) + + +def get_pbar_callback(pbar): + def callback(_): + update_progress(pbar) + return callback + + +def close_progress(pbar): + """Helper function to close the session progressbar, if available.""" + if pbar: + pbar.close() diff --git a/mlonmcu/session/run.py b/mlonmcu/session/run.py index 8b04348a6..7c9c6189d 100644 --- a/mlonmcu/session/run.py +++ b/mlonmcu/session/run.py @@ -90,6 +90,7 @@ def __init__( config=None, # TODO: All config combined or explicit run-config? postprocesses=None, archived=False, + hidden=False, session=None, comment="", ): @@ -98,9 +99,11 @@ def __init__( self.frontends = frontends if frontends is not None else [] self.framework = framework # ??? self.backend = backend + self.target = target self.platforms = platforms if platforms is not None else [] self.artifacts_per_stage = {} self.archived = archived + self.hidden = hidden self.session = session self.postprocesses = postprocesses if postprocesses else [] self.comment = comment @@ -108,7 +111,6 @@ def __init__( self.completed = {stage: stage == RunStage.NOP for stage in RunStage} self.init_directory() - self.target = target self.cache_hints = [] self.config = config if config else {} self.features = features if features else [] @@ -250,7 +252,14 @@ def init_directory(self): self.dir = Path(self.tempdir.name) else: self.tempdir = None - self.dir = self.session.runs_dir / str(self.idx) + if isinstance(self.idx, int): + dirname = str(self.idx) + else: + assert isinstance(self.idx, tuple) + dirname = "_".join(list(map(str, self.idx))) + if self.hidden: + dirname = "." + dirname + self.dir = self.session.runs_dir / dirname if not self.dir.is_dir(): os.mkdir(self.dir) # This is not a good idea, but else we would need a mutex/lock on the shared build_dir @@ -495,6 +504,80 @@ def __repr__(self): probs.append(str(self.config)) return "Run(" + ",".join(probs) + ")" + def get_hash(self, until=RunStage.DONE): + probs = {} + cfg = {} + + def has_prefix(key): + """Returns true if the configuration key does not have global scope.""" + return "." in key + + def config_helper(obj, prefix=None): + """Helper to access the configuration of a given component object.""" + if prefix: + name = prefix + else: + assert hasattr(obj, "name") + name = obj.name + ret = { + key if has_prefix(key) else f"{name}.{key}": value + for key, value in obj.config.items() + } + return ret + + ret = {} + ret.update( + {key: value for key, value in self.config.items() if not has_prefix(key)} + ) # Only config without a prefix! + ret.update(self.run_config) + if until >= RunStage.LOAD: + probs["model"] = str(self.model) + # TODO: MODEL CFG + probs["frontends"] = str(self.frontends) + for frontend in self.frontends: + cfg.update(config_helper(frontend)) + if until >= RunStage.TUNE: + if self.target_to_backend and self.target: + probs["target"] = str(self.target) + cfg.update(config_helper(self.target)) + probs["backend"] = str(self.backend) + cfg.update(config_helper(self.backend)) + probs["framework"] = str(self.framework) + cfg.update(config_helper(self.framework)) + if self.tune_platform: + probs["tune_platform"] = str(self.tune_platform) + cfg.update(config_helper(self.tune_platform)) + if until >= RunStage.BUILD: + if self.target_to_backend and self.target: + probs["target"] = str(self.target) + probs["backend"] = str(self.backend) + cfg.update(config_helper(self.backend)) + probs["framework"] = str(self.framework) + cfg.update(config_helper(self.framework)) + if self.build_platform: + probs["build_platform"] = str(self.build_platform) + cfg.update(config_helper(self.build_platform)) + if until >= RunStage.COMPILE: + probs["target"] = str(self.target) + if self.compile_platform: + probs["compile_platform"] = str(self.compile_platform) + cfg.update(config_helper(self.compile_platform)) + if until >= RunStage.RUN: + probs["target"] = str(self.target) + # if self.run_platform: + # probs["run_platform"] = str(self.run_platform) + # cfg.update(config_helper(self.run_platform)) + if until >= RunStage.POSTPROCESS: + probs["postprocesses"] = self.postprocesses + for postprocess in self.postprocesses: + cfg.update(config_helper(postprocess)) + probs["features"] = str(self.features) + for feature in self.features: + cfg.update(config_helper(feature)) + # probs["config"] = str(cfg) + probs["config"] = str(self.run_config) + return "Run(" + str([f"{key}={value}" for key, value in probs.items()]) + ")" + @property def frontend(self): assert len(self.frontends) > 0, "Not frontend is available for this run." diff --git a/mlonmcu/session/session.py b/mlonmcu/session/session.py index 7f32d0ed3..b98888ff5 100644 --- a/mlonmcu/session/session.py +++ b/mlonmcu/session/session.py @@ -20,21 +20,25 @@ import os import shutil import tempfile +import itertools import multiprocessing from datetime import datetime from enum import Enum +from copy import deepcopy from pathlib import Path -import concurrent.futures +from concurrent.futures import ThreadPoolExecutor, wait, FIRST_COMPLETED +import time -from tqdm import tqdm +from enum import IntEnum from mlonmcu.session.run import Run from mlonmcu.logging import get_logger from mlonmcu.report import Report -from mlonmcu.config import filter_config +from mlonmcu.config import filter_config, str2bool from .postprocess.postprocess import SessionPostprocess from .run import RunStage +from .progress import init_progress, update_progress, close_progress, get_pbar_callback logger = get_logger() # TODO: rename to get_mlonmcu_logger @@ -48,11 +52,333 @@ class SessionStatus(Enum): # TODO: remove? ERROR = 3 +def get_cpu_count(): + return multiprocessing.cpu_count() + + +def get_used_stages(runs, until): + """Determines the stages which are used by at least one run.""" + if not isinstance(runs, list): + runs = [runs] + used = [] + for stage_index in list(range(RunStage.LOAD, until + 1)) + [RunStage.POSTPROCESS]: + stage = RunStage(stage_index) + if any(run.has_stage(stage) for run in runs): + used.append(stage) + return used + + +class Group: + """TODO""" + + def __init__(self, jobs, parent=None): + self.jobs = jobs + self.parent = parent + + def __len__(self): + return len(self, self.jobs) + + def merge(self, runs): + # TODO: check if valid + # Run + new_run = deepcopy(runs[self.jobs[0].run]) + new_run.idx = tuple([job.run for job in self.jobs]) + new_run.hidden = True + + # Job + new_job = deepcopy(self.jobs[0]) + new_job.idx = tuple([job.idx for job in self.jobs]) + new_job._needs = [self.parent] if self.parent is not None else [] + new_job.run = new_run.idx + new_job.reset() + + return new_job, new_run + + +class JobError(Exception): + pass + + +class JobStatus(IntEnum): + + DONE = 0 + FAILING = 1 + RUNNING = 2 + BLOCKED = 3 + READY = 4 + + +class Job: + """TODO.""" + + def __init__(self, idx, run, stage=None, needs=None): + self.idx = idx + self.run = run # idx only + self.stage = stage + self._needs = needs if needs is not None else [] + self.needs = self._needs.copy() + self.result = None + self.in_progress = False + self.exception = None + self.failed_stage = None + + def __repr__(self): + tmp = str(self.run) if self.stage is None else f"{self.run}.{self.stage}" + # return f"Job({self.idx}, {tmp})" + return f"Job({self.idx}, {tmp}, {self.needs})" + + @property + def ready(self): + return not self.result and not self.exception and not self.in_progress and len(self.needs) == 0 + + @property + def status(self): + if self.result: + return JobStatus.DONE + elif self.exception or self.failed_stage: + return JobStatus.FAILING + elif self.in_progress: + return JobStatus.RUNNING + elif self.ready: + return JobStatus.READY + else: + return JobStatus.BLOCKED + + def reset(self): + self.result = None + self.in_progress = False + self.exception = None + self.failed_stage = None + self.needs = self._needs.copy() + + def process(self, run, until, skip, export): + assert not self.in_progress # TODO: just reset instead? + # print("process", self.idx, self.result, self.exception) + assert not self.result + assert not self.exception + assert self.ready + self.in_progress = True + if self.stage: + # print("self.stage", self.stage, type(self.stage)) + # print("until", until, type(until)) + assert self.stage == until + try: + input(f"> {self}") + # print(f"> {self}") + result = run.process(until=until, skip=skip, export=export) + # print(f"< {self}") + if run.failing: + if self.stage: + self.failed_stage = self.stage.name + else: + self.failed_stage = RunStage(run.next_stage).name + self.fail(JobError(f"Failed at stage {self.failed_stage}")) + self.done(result) + except Exception as e: + logger.exception(e) + logger.error("An exception was thrown by a worker during simulation") + self.fail(e) + # self.in_progress = False # See done + return self + + def update(self, done_list, failed_list): + # print("update", self.idx, done_list, failed_list, self.needs) + new = [] + for need in self.needs: + if need not in done_list: + new.append(need) + break + elif need in failed_list: + self.fail(JobError("Failed due to failing dependencies")) + break + self.needs = new + + def fail(self, exception): + self.exception = exception + if not self.failed_stage: + self.failed_stage = self.stage if self.stage is not None else "UNKNOWN" + self.in_progress = False + + def done(self, result): + self.result = result + self.in_progress = False + + +def collect_dependencies(jobs): + deps = [] + for job in jobs: + for need in job.needs: + deps.append((need, job.idx)) + return deps + + +def enumerate_jobs(jobs): + for i, job in enumerate(jobs): + job.idx = i + + +def split_run_to_jobs(run: Run, start=0) -> list: + jobs = [] + i = start + prev = None + for stage in get_used_stages(run, RunStage.DONE): + needs = [prev] if prev is not None else [] + jobs.append(Job(i, run.idx, stage=stage, needs=needs)) + prev = i + i = i + 1 + return jobs + + +def create_groups(jobs, runs, merge=False): + assert merge + run_stage_jobs = {} + for job in jobs: + run_stage_jobs[(job.run, int(job.stage))] = job.idx + # print("run_stage_jobs", run_stage_jobs) + used = {} + prevs = {} + prev_stage = {} + for until in list(range(RunStage.LOAD, RunStage.DONE)): + mapping = {} + for run in runs: + idx = run.idx + hash_str = str(run.get_hash(until=until)) + if hash_str in mapping: + mapping[hash_str].append(idx) + else: + mapping[hash_str] = [idx] + # print("mapping", until, mapping) + # names = ["_".join(list(map(str, value))) for key, value in mapping.items()] + # keys = tuple(itertools.chain(*mapping.values())) + mapping = {key: tuple(value) for key, value in mapping.items()} + for value in mapping.values(): + if value in used: + used[value].append(until) + else: + used[value] = [until] + # print("name", name) + # input(">") + # print("used", used) + # input(">") + for key, value in used.items(): + prevs[key] = None + prev_stage[key] = None + idxs = key if isinstance(key, tuple) else (key,) + # print("idxs", idxs) + for key_, value_ in used.items(): + if key == key_: + continue + idxs_ = key_ if isinstance(key_, tuple) else (key_,) + # print("idxs_", idxs_) + valid = True + for idx in idxs: + if idx not in idxs_: + valid = False + break + # print("valid", valid, key, key_) + if valid: + prevs[key] = key_ + prev_stage[key] = max(value_) + # prevs = {key: max(value, key=lambda x:x[1]) if len(value) else [] for key, value in prevs.items()} + # print("prevs", prevs) + # print("prev_stages", prev_stage) + groups_per_stage = {stage: [] for stage in range(RunStage.LOAD, RunStage.DONE)} + new_jobs = [] + hidden_runs = [] + for keys, values in used.items(): + for stage in values: + jobs_ = [run_stage_jobs.get((run, stage), None) for run in keys] + jobs_ = [job for job in jobs_ if job is not None] + jobs__ = [jobs[job_] for job_ in jobs_] + parent = prevs.get(keys, None) + parent_stage = prev_stage.get(keys, None) + if parent is not None: + assert parent_stage is not None + assert parent_stage < stage + parent = [run_stage_jobs.get((run, parent_stage), None) for run in parent] + parent = tuple([x for x in parent if x is not None]) + if len(jobs__) == 1: + new_job = deepcopy(jobs__[0]) + if parent: + new_job._needs=[parent] + new_job.reset() + new_jobs.append(new_job) + elif len(jobs__) > 1: + # print("keys", keys) + # print("stage", stage) + # print("parent", parent) + group = Group(jobs__, parent=parent) + job, run = group.merge(runs) + # print("job", job) + groups_per_stage[stage].append(group) + new_jobs.append(job) + hidden_runs.append(run) + # print("new_jobs", new_jobs, len(new_jobs)) + # print("groups_per_stage", groups_per_stage) + input(">") + return new_jobs, hidden_runs + + +class JobGraph: + """TODO.""" + + def __init__(self, jobs): + self.jobs = jobs + for job in self.jobs: + job.reset() + self.nodes = [job.idx for job in self.jobs] + self.dependencies = collect_dependencies(jobs) + + def plot(self): + raise NotImplementedError + + @property + def completed(self): + # print("completed", len(self.jobs) == len(self.get_done_jobs() + self.get_failing_jobs())) + return len(self.jobs) == len(self.get_done_jobs() + self.get_failing_jobs()) + + def get_jobs(self, status=None, stage=None): + # print("get_jobs", status, stage) + if status is None and stage is None: + return self.jobs + ret = [] + for job in self.jobs: + # print("job", job, job.stage, job.status) + if status is None: + if job.stage == stage: + ret.append(job) + elif stage is None: + if job.status == status: + ret.append(job) + elif job.status == status: + ret.append(job) + return ret + + def get_running_jobs(self): + return self.get_jobs(status=JobStatus.RUNNING) + + def get_ready_jobs(self): + return self.get_jobs(status=JobStatus.READY) + + def get_failing_jobs(self): + return self.get_jobs(status=JobStatus.FAILING) + + def get_done_jobs(self): + return self.get_jobs(status=JobStatus.DONE) + + def get_blocked_jobs(self): + return self.get_jobs(status=JobStatus.BLOCKED) + + class Session: """A session which wraps around multiple runs in a context.""" DEFAULTS = { "report_fmt": "csv", + "runs_per_stage": True, + "interleave_stages": False, + "group_runs": False, + # "print_report": True, } def __init__(self, label="", idx=None, archived=False, dir=None, config=None): @@ -85,6 +411,12 @@ def __init__(self, label="", idx=None, archived=False, dir=None, config=None): if not self.archived: self.open() + def get_run_from_idx(self, idx): + for run in self.runs: + if run.idx == idx: + return run + return None + @property def prefix(self): """get prefix property.""" @@ -95,6 +427,30 @@ def report_fmt(self): """get report_fmt property.""" return str(self.config["report_fmt"]) + # @property + # def print_report(self): + # """get print_report property.""" + # value = self.config["print_report"] + # return str2bool(value) if not isinstance(value, (bool, int)) else value + + @property + def runs_per_stage(self): + """get runs_per_stage property.""" + value = self.config["runs_per_stage"] + return str2bool(value) if not isinstance(value, (bool, int)) else value + + @property + def interleave_stages(self): + """get interleave_stages property.""" + value = self.config["interleave_stages"] + return str2bool(value) if not isinstance(value, (bool, int)) else value + + @property + def group_runs(self): + """get group_runs property.""" + value = self.config["group_runs"] + return str2bool(value) if not isinstance(value, (bool, int)) else value + def create_run(self, *args, **kwargs): """Factory method to create a run and add it to this session.""" idx = len(self.runs) @@ -132,10 +488,16 @@ def enumerate_runs(self): for run in self.runs: if not run.archived: run.idx = run_idx - run.init_directory() run_idx += 1 self.next_run_idx = run_idx + def init_directories(self): + for run in self.runs: + print("run", run, run.hidden) + if not run.archived: + run.init_directory() + + def request_run_idx(self): """Return next free run index.""" ret = self.next_run_idx @@ -146,7 +508,6 @@ def request_run_idx(self): def process_runs( self, until=RunStage.DONE, - per_stage=False, print_report=False, num_workers=1, progress=False, @@ -155,9 +516,32 @@ def process_runs( ): """Process a runs in this session until a given stage.""" - # TODO: Add configurable callbacks for stage/run complete + # TODO: Add configurable callbacks for stage/run complete + + + used_stages = get_used_stages(self.runs, until) + skipped_stages = [stage for stage in RunStage if stage not in used_stages] self.enumerate_runs() + # TODO: jobs = split_tasks(...) + per_stage = self.runs_per_stage + interleave = self.interleave_stages + # print("interleave", interleave) + if per_stage: + jobs = [] + for run in self.runs: + new = split_run_to_jobs(run, start=len(jobs)) + jobs.extend(new) + else: + jobs = [Job(i, run.idx) for i, run in enumerate(self.runs)] + + if self.group_runs: + jobs, hidden_runs = create_groups(jobs, self.runs, merge=True) + # print("groups", groups) + self.runs.extend(hidden_runs) + # print("!jobs", jobs) + graph = JobGraph(jobs) + self.init_directories() self.report = None assert num_workers > 0, "num_workers can not be < 1" workers = [] @@ -168,32 +552,8 @@ def process_runs( num_runs = len(self.runs) num_failures = 0 stage_failures = {} - worker_run_idx = [] - - def _init_progress(total, msg="Processing..."): - """Helper function to initialize a progress bar for the session.""" - return tqdm( - total=total, - desc=msg, - ncols=100, - bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} [{elapsed}s]", - leave=None, - ) - - def _update_progress(pbar, count=1): - """Helper function to update the progress bar for the session.""" - pbar.update(count) - - def _close_progress(pbar): - """Helper function to close the session progressbar, if available.""" - if pbar: - pbar.close() - - def _process(pbar, run, until, skip): - """Helper function to invoke the run.""" - run.process(until=until, skip=skip, export=export) - if progress: - _update_progress(pbar) + done_jobs = [] + failed_jobs = [] def _join_workers(workers): """Helper function to collect all worker threads.""" @@ -201,99 +561,112 @@ def _join_workers(workers): results = [] for i, w in enumerate(workers): try: - results.append(w.result()) - except Exception as e: - logger.exception(e) + job = w.result() + if job.status == JobStatus.DONE: + done_jobs.append(job.idx) + elif job.status == JobStatus.FAILING: + failed_jobs.append(job.idx) + failed_stage = job.failed_stage + assert failed_stage is not None + if failed_stage in stage_failures: + stage_failures[failed_stage].append(job.run) + else: + stage_failures[failed_stage] = [job.run] + num_failures += 1 + except Exception as exe: + logger.exception(exe) logger.error("An exception was thrown by a worker during simulation") - run_index = worker_run_idx[i] - run = self.runs[run_index] - if run.failing: - num_failures += 1 - failed_stage = RunStage(run.next_stage).name - if failed_stage in stage_failures: - stage_failures[failed_stage].append(run_index) - else: - stage_failures[failed_stage] = [run_index] - if progress: - _close_progress(pbar) return results - def _used_stages(runs, until): - """Determines the stages which are used by at least one run.""" - used = [] - for stage_index in list(range(RunStage.LOAD, until + 1)) + [RunStage.POSTPROCESS]: - stage = RunStage(stage_index) - if any(run.has_stage(stage) for run in runs): - used.append(stage) - return used + def _process(job, until, skip, export): + return job.process(self.get_run_from_idx(job.run), until, skip, export) - used_stages = _used_stages(self.runs, until) - skipped_stages = [stage for stage in RunStage if stage not in used_stages] - - with concurrent.futures.ThreadPoolExecutor(num_workers) as executor: + with ThreadPoolExecutor(num_workers) as executor: if per_stage: - if progress: - pbar2 = _init_progress(len(used_stages), msg="Processing stages") - for stage in used_stages: - run_stage = RunStage(stage).name + if interleave: + if progress: + pbar = init_progress(len(jobs), msg="Processing jobs") + + submitted = [] + while not graph.completed: + time.sleep(1) + ready_jobs = graph.get_ready_jobs() + # print("ready_jobs", ready_jobs) + # print("submitted", submitted) + for job in ready_jobs: + # print("job.idx", job.idx) + if job.idx in submitted: + continue + submitted.append(job.idx) + future = executor.submit(_process, job, job.stage, skipped_stages, export) + # future.add_done_callback(get_pbar_callback(pbar)) + workers.append(future) + assert len(workers) > 0 + done_list, x = wait(workers, return_when=FIRST_COMPLETED) + # print("done_list", done_list) + # print("x", x) + _join_workers(done_list) + # print("done_jobs", done_jobs) + # print("failed_jobs", failed_jobs) + for job in jobs: + job.update(done_jobs, failed_jobs) + # print("workers_before", len(workers)) + for future in done_list: + update_progress(pbar) + # print("updated") + workers.remove(future) + # print("workers_after", len(workers)) + # TODO: detect deadlock (check for circular deps etc.) + if progress: - pbar = _init_progress(len(self.runs), msg=f"Processing stage {run_stage}") - else: - logger.info("%s Processing stage %s", self.prefix, run_stage) - for i, run in enumerate(self.runs): - if i == 0: - total_threads = min(len(self.runs), num_workers) - cpu_count = multiprocessing.cpu_count() - if (stage == RunStage.COMPILE) and run.compile_platform: - total_threads *= run.compile_platform.num_threads - if total_threads > 2 * cpu_count: - if pbar2: - print() - logger.warning( - "The chosen configuration leads to a maximum of %d threads being" - + " processed which heavily exceeds the available CPU resources (%d)." - + " It is recommended to lower the value of 'mlif.num_threads'!", - total_threads, - cpu_count, - ) - if run.failing: - logger.warning("Skiping stage '%s' for failed run", run_stage) + close_progress(pbar) + else: + if progress: + pbar2 = init_progress(len(used_stages), msg="Processing stages") + for stage in used_stages: + run_stage = RunStage(stage).name + # jobs_ = jobs_per_stage[stage] + jobs_ = graph.get_jobs(stage=stage) + # print("stage", stage, "jobs_", jobs_, "len(jobs_)", len(jobs)) + if progress: + pbar = init_progress(len(jobs_), msg=f"Processing stage {run_stage}") else: - worker_run_idx.append(i) - workers.append(executor.submit(_process, pbar, run, until=stage, skip=skipped_stages)) - _join_workers(workers) - workers = [] - worker_run_idx = [] + logger.info("%s Processing stage %s", self.prefix, run_stage) + for i, job in enumerate(jobs_): + if job.failed_stage: + logger.warning("Skiping stage '%s' for failed run", run_stage) + else: + # future = executor.submit(job.process, self.runs[job.run], until=stage, skip=skipped_stages) + future = executor.submit(_process, job, stage, skipped_stages, export) + # future.add_done_callback(job.callback) + future.add_done_callback(get_pbar_callback(pbar)) + + workers.append(future) + _join_workers(workers) + if progress: + close_progress(pbar) + # TODO: function + for job in jobs: + job.update(done_jobs, failed_jobs) + workers = [] + if progress: + update_progress(pbar2) if progress: - _update_progress(pbar2) - if progress: - _close_progress(pbar2) + close_progress(pbar2) else: + assert not interleave, "session.interleave_stages requires session.runs_per_stage" if progress: - pbar = _init_progress(len(self.runs), msg="Processing all runs") + pbar = init_progress(len(self.runs), msg="Processing all runs") else: logger.info(self.prefix + "Processing all stages") - for i, run in enumerate(self.runs): - if i == 0: - total_threads = min(len(self.runs), num_workers) - cpu_count = multiprocessing.cpu_count() - if (until >= RunStage.COMPILE) and run.compile_platform.name == "mlif": - total_threads *= ( - run.compile_platform.num_threads - ) # TODO: This should also be used for non-mlif platforms - if total_threads > 2 * cpu_count: - if pbar2: - print() - logger.warning( - "The chosen configuration leads to a maximum of %d being processed which" - + " heavily exceeds the available CPU resources (%d)." - + " It is recommended to lower the value of 'mlif.num_threads'!", - total_threads, - cpu_count, - ) - worker_run_idx.append(i) - workers.append(executor.submit(_process, pbar, run, until=until, skip=skipped_stages)) + for i, job in enumerate(jobs): + # workers.append(executor.submit(job.process, self.runs[job.run], until=until, skip=skipped_stages, export=export)) + future = executor.submit(_process, job, until, skipped_stages, export) + future.add_done_callback(get_pbar_callback(pbar)) + workers.append(future) _join_workers(workers) + if progress: + close_progress(pbar) if num_failures == 0: logger.info("All runs completed successfuly!") elif num_failures == num_runs: diff --git a/mlonmcu/session/stage.py b/mlonmcu/session/stage.py new file mode 100644 index 000000000..e69de29bb diff --git a/mlonmcu/target/target.py b/mlonmcu/target/target.py index a23447ebd..86d046fe8 100644 --- a/mlonmcu/target/target.py +++ b/mlonmcu/target/target.py @@ -93,7 +93,14 @@ def repeat(self): return self.config["repeat"] def __repr__(self): - return f"Target({self.name})" + probs = [] + if self.name: + probs.append(self.name) + if self.features and len(self.features) > 0: + probs.append(str(self.features)) + if self.config and len(self.config) > 0: + probs.append(str(self.config)) + return "Target(" + ",".join(probs) + ")" def process_features(self, features): if features is None: