diff --git a/scripts/custom_scripts/check_flowy_data_records.py b/scripts/custom_scripts/check_flowy_data_records.py new file mode 100644 index 0000000..394ffba --- /dev/null +++ b/scripts/custom_scripts/check_flowy_data_records.py @@ -0,0 +1,210 @@ +#!/usr/bin/env python3 +""" +Scan synth_out/res_* folders, read flowy_data_record.parquet, and report: +- number of unique run_identifier values (per file + histogram) +- maximum step value (per file + global max) +- number of res_* folders found + +Usage: + python scan_flowy_data_record.py + python scan_flowy_data_record.py --base /path/to/run +""" + +from __future__ import annotations + +import argparse +import glob +import os +from pathlib import Path +from collections import Counter + +import pandas as pd + +# get env var +DATA_DIR = os.getenv("DATA_DIR") + +DEFAULT_BASE = ( + f"{DATA_DIR}/output/" + "multiplier_4bi_8bo_permuti_flowy/flowy_trans_run_12chains_3000steps_gen_iter0" +) + + +def text_hist(counter: Counter[int], *, title: str, bar_width: int = 40) -> str: + if not counter: + return f"{title}\n (empty)\n" + + items = sorted(counter.items(), key=lambda kv: kv[0]) + max_count = max(counter.values()) + + lines = [title] + for k, c in items: + bar_len = int(round((c / max_count) * bar_width)) if max_count > 0 else 0 + bar = "#" * bar_len + bar = '' + lines.append(f" {k:>6}: {c:>6} occurrences {bar}") + return "\n".join(lines) + "\n" + + +def _to_num(s: pd.Series) -> pd.Series: + return pd.to_numeric(s, errors="coerce") + +def main() -> int: + ap = argparse.ArgumentParser() + ap.add_argument( + "--base", + type=str, + default=DEFAULT_BASE, + help="Base folder containing synth_out/ (default is your GENIAL run path).", + ) + ap.add_argument( + "--pattern", + type=str, + default="synth_out/res_*", + help="Glob pattern under base to find result folders.", + ) + ap.add_argument( + "--parquet-name", + type=str, + default="flowy_data_record.parquet", + help="Parquet filename to read inside each res_* folder.", + ) + ap.add_argument( + "--quiet", + action="store_true", + help="Only print aggregate statistics (no per-folder lines).", + ) + args = ap.parse_args() + + base = Path(args.base).expanduser() + res_glob = str(base / args.pattern) + res_dirs = [Path(p) for p in sorted(glob.glob(res_glob)) if Path(p).is_dir()] + + # Always print number of folders found (as requested) + print(f"Found {len(res_dirs)} res_* folders matching: {res_glob}") + + if not res_dirs: + print(f"[ERROR] No directories matched: {res_glob}") + return 2 + + uniq_hist: Counter[int] = Counter() + maxstep_hist: Counter[int] = Counter() + rowcount_hist: Counter[int] = Counter() + global_max_step: int | None = None + + missing_files = 0 + bad_files = 0 + processed = 0 + + if not args.quiet: + print(f"Base: {base}") + print("-" * 80) + + # Aggregate accumulators across all rows / all files + all_aig = [] + all_tr = [] + + for d in res_dirs: + pq = d / args.parquet_name + if not pq.exists(): + missing_files += 1 + if not args.quiet: + print(f"{d.name}: MISSING {args.parquet_name}") + continue + + try: + # read only needed columns (fast) + # read only needed columns (fast) + df = pd.read_parquet( + pq, + columns=["run_identifier", "step", "aig_count", "nb_transistors"], + ) + except Exception as e: + bad_files += 1 + if not args.quiet: + print(f"{d.name}: ERROR reading parquet: {e}") + continue + + if "run_identifier" not in df.columns or "step" not in df.columns or "aig_count" not in df.columns or "nb_transistors" not in df.columns: + bad_files += 1 + if not args.quiet: + print(f"{d.name}: ERROR missing required columns in parquet") + continue + + rowcount = int(len(df)) + uniq = int(df["run_identifier"].nunique(dropna=True)) + + step_series = pd.to_numeric(df["step"], errors="coerce") + if step_series.notna().any(): + max_step = int(step_series.max()) + else: + max_step = None + + uniq_hist[uniq] += 1 + rowcount_hist[rowcount] += 1 + if max_step is not None: + maxstep_hist[max_step] += 1 + global_max_step = max_step if global_max_step is None else max(global_max_step, max_step) + + aig = _to_num(df["aig_count"]) + tr = _to_num(df["nb_transistors"]) + + # accumulate for global stats (ignore NaNs) + + all_aig.extend(aig.dropna().tolist()) + all_tr.extend(tr.dropna().tolist()) + + processed += 1 + + if not args.quiet: + ms = "NA" if max_step is None else str(max_step) + print(f"{d.name}: unique(run_identifier)={uniq:4d} | max(step)={ms}") + + if not args.quiet: + print("-" * 80) + + print(f"Processed: {processed}/{len(res_dirs)}") + if missing_files: + print(f"Missing parquet files: {missing_files}") + if bad_files: + print(f"Unreadable/invalid parquet files: {bad_files}") + + print() + print(text_hist(uniq_hist, title="Unique run_identifier per flowy_data_record.parquet")) + + # if global_max_step is None: + # print("Global max(step): NA (no valid step values found)") + # else: + # print(f"Global max(step): {global_max_step}") + + print() + print(text_hist(rowcount_hist, title="Row count per flowy_data_record.parquet")) + + print() + print(text_hist(maxstep_hist, title="Max step per flowy_data_record.parquet")) + + # Global aig/transistor summary + def safe_mean_min(vals: list[float]): + if not vals: + return None, None + s = pd.Series(vals, dtype="float64") + return float(s.mean()), float(s.min()) + + aig_g_mean, aig_g_min = safe_mean_min(all_aig) + tr_g_mean, tr_g_min = safe_mean_min(all_tr) + + print("Aggregate over ALL rows in ALL files:") + if aig_g_mean is None: + print(" aig_count : mean=NA, min=NA") + else: + print(f" aig_count : mean={aig_g_mean:.3f}, min={aig_g_min:.3f}") + + if tr_g_mean is None: + print(" nb_transistors : mean=NA, min=NA") + else: + print(f" nb_transistors : mean={tr_g_mean:.3f}, min={tr_g_min:.3f}") + + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/slurm_scripts/sbatch_dispatch_slurm_temp.sh b/scripts/slurm_scripts/sbatch_dispatch_slurm_temp.sh index 8cc2670..302b6a2 100644 --- a/scripts/slurm_scripts/sbatch_dispatch_slurm_temp.sh +++ b/scripts/slurm_scripts/sbatch_dispatch_slurm_temp.sh @@ -3,7 +3,7 @@ #SBATCH --output="/home/%u/slurm_logs/genial/sbatch_info/genial_flowy_%j_%N_$timestamp.log" #SBATCH --error="/home/%u/slurm_logs/genial/sbatch_error/genial_flowy_%j_%N_$timestamp.log" # Move to working directory -cd $$HOME/proj/genial +cd $$HOME/proj/GENIAL # Activate Python Environment # set -a diff --git a/src/genial/ext_plugins/flowy/flowy_launcher_helper.py b/src/genial/ext_plugins/flowy/flowy_launcher_helper.py index 514e9f7..85dae75 100644 --- a/src/genial/ext_plugins/flowy/flowy_launcher_helper.py +++ b/src/genial/ext_plugins/flowy/flowy_launcher_helper.py @@ -266,6 +266,46 @@ def flowy_synthesis(self): else: logger.warning(f"Could not find {path} in {best_data_path}") + def collect_output_txts(temp_dir: str | Path, out_file: str | Path) -> None: + """ + Collect all `output.txt` files from: + temp_dir/output/db/genial/data_collection/*/output.txt + and write them into `out_file` as: + + : + + + : + + ... + """ + temp_dir = Path(temp_dir) + out_file = Path(out_file) + + pattern = temp_dir / "output" / "db" / "genial" / "data_collection" / "*" / "output.txt" + files = sorted(pattern.parent.parent.glob("*/output.txt")) # one-level directories only + + # Alternatively, simpler: + # files = sorted((temp_dir / "output/db/genial/data_collection").glob("*/output.txt")) + + out_file.parent.mkdir(parents=True, exist_ok=True) + + with out_file.open("w", encoding="utf-8") as out: + for f in files: + # Use the one-level directory name as "filename" label (more informative than just "output.txt") + label = "output.txt content of run " + f.parent.name + + try: + content = f.read_text(encoding="utf-8", errors="replace") + except OSError as e: + content = f"[ERROR reading file: {e}]" + + out.write(f"{label}:\n") + out.write(content.rstrip("\n")) + out.write("\n\n") # blank line between entries + + collect_output_txts(flowy_tmp_dir.name, self.design_output_dir_path / "all_outputs.txt") + # Restoring environment (in case this was not launched from a subprocess) os.environ["SRC_DIR"] = original_src_dir if original_data_dir is not None: diff --git a/src/genial/loop/full_run_v2.py b/src/genial/loop/full_run_v2.py index 510820f..8a0a95e 100644 --- a/src/genial/loop/full_run_v2.py +++ b/src/genial/loop/full_run_v2.py @@ -46,52 +46,22 @@ class SlurmDispatcher: "clean", ] - __valid_nodes__ = [ - # "aisrv01", - # "aisrv02", - "aisrv03", - # "epyc01", - # "epyc02", - ] - __valid_work_dirpath__ = [ # "/netscratch/aisrv01", - # "/netscratch/aisrv02", - "/netscratch/aisrv03", + "/netscratch/aisrv02", + # "/netscratch/aisrv03", # "/netscratch/epyc01", # "/netscratch/epyc02", ] - __partition__ = { - "generate": "AI-CPU,Zen3", - "launch": "AI-CPU,Zen3", - "analyze": "AI-CPU,Zen3", - "train": "AI-CPU", - "recommend": "AI-CPU", - "merge": "AI-CPU,Zen3", - "clean": "AI-CPU,Zen3", - } - - __mem_per_cpu__ = { - "AI-CPU,Zen3": "3G", - "VNL_GPU": "2G", - } - - __nodelist__ = { - "generate": "aisrv01,aisrv03", - "launch": "aisrv01,aisrv03", - "analyze": "aisrv01,aisrv03", - "train": "aime01,aime02,aime03", - "recommend": "aime01,aime02,aime03", - "merge": "aisrv01,aisrv03", - } - - __can_multi_node__ = { - "generate": False, - "launch": True, - "analyze": False, - "train": False, - "recommend": False, + __task_resources__ = { + "generate": {"partition": "AI-CPU,Zen3", "node_list": ["aisrv02"]}, + "launch": {"partition": "AI-CPU,Zen3", "node_list": ["aisrv02"]}, + "analyze": {"partition": "AI-CPU,Zen3", "node_list": ["aisrv02"]}, + "train": {"partition": "AI-CPU", "node_list": ["aime01", "aime02", "aime03"]}, + "recommend": {"partition": "AI-CPU", "node_list": ["aime01", "aime02", "aime03"]}, + "merge": {"partition": "AI-CPU,Zen3", "node_list": ["aisrv02"]}, + "clean": {"partition": "AI-CPU,Zen3", "node_list": ["aisrv02"]}, } @staticmethod @@ -138,7 +108,12 @@ def generate_sbatch_args(task: str, nb_workers: int): device_ids = None time = "3:00:00" - print(f"=========================== {task}") + task_resources = SlurmDispatcher.__task_resources__.get(task) + if task_resources is None: + raise ValueError(f"Missing slurm resources for task '{task}'.") + partition = task_resources["partition"] + node_list = SlurmDispatcher._normalize_node_list(task_resources.get("node_list")) + if task == "train" or task == "recommend": _nb_workers = 16 # checker = SlurmGPUFinder(partition="VNL_GPU") # Change 'gpu' if needed @@ -157,37 +132,18 @@ def generate_sbatch_args(task: str, nb_workers: int): f"--time={time}", # "--ntasks=1", f"--mem-per-cpu=2G", - "--partition=" + SlurmDispatcher.__partition__[task], + f"--partition={partition}", f"--cpus-per-task={cpus_per_task}", - # "--nodelist=" + SlurmDispatcher.__nodelist__[task], ] - if task in ["analyze", "merge", "train", "recommend"]: - cmd_prefix_list += ["--nodelist=aisrv03"] - elif task == "clean": - # For cleaning, submit one job per available node to speed things up. - # Filter out nodes that are not available to avoid infinite waits. - available_nodes = SlurmDispatcher.get_available_nodes(SlurmDispatcher.__valid_nodes__) - if not available_nodes: - logger.warning("No usable nodes detected for clean task; skipping clean submissions.") + if task == "clean": + # For cleaning, optionally submit one job per node to speed things up. + if node_list: + cmd_prefix_list = [cmd_prefix_list + [f"--nodelist={node}"] for node in node_list] else: - if set(available_nodes) != set(SlurmDispatcher.__valid_nodes__): - skipped = list(set(SlurmDispatcher.__valid_nodes__) - set(available_nodes)) - logger.warning(f"Skipping unavailable nodes for clean task: {sorted(skipped)}") - cmd_prefix_list = [cmd_prefix_list + [f"--nodelist={node}"] for node in available_nodes] - - # if task == "launch": - # cmd_prefix_list += (f"--cpus-per-task={str(int(24 * 1.5))}",) # Use a margin of 1.5 - just to make sure. - # else: - # cmd_prefix_list += ( - # f"--cpus-per-task={}", - # ) # Use a margin of 1.5 - jst to make sure. - - # if SlurmDispatcher.__partition__[task] == "VNL_GPU": - # cmd_prefix_list += ["--exclude=aime02,aime01"] - if SlurmDispatcher.__partition__[task] == "AI-CPU,Zen3" and not task == "clean": - cmd_prefix_list += ["--exclude=epyc01,epyc02,aisrv01,aisrv02"] - # pass + cmd_prefix_list = [cmd_prefix_list] + elif node_list: + cmd_prefix_list += [f"--nodelist={','.join(node_list)}"] # if node is not None: # cmd_prefix_list.append(f"--nodelist={node}") @@ -197,52 +153,14 @@ def generate_sbatch_args(task: str, nb_workers: int): return cmd_prefix_list, device_ids @staticmethod - def get_available_nodes(candidate_nodes: list[str]) -> list[str]: - """Return nodes from candidate_nodes that appear available in Slurm. - - A node is considered available if its state starts with one of: - 'idle', 'mix', 'alloc', 'comp' (completing). Nodes in 'down', 'drain', - 'maint', 'fail', 'unkn' states are excluded. - If Slurm commands are unavailable, fall back to the candidates as-is. - """ - try: - # Query node states once and parse - result = subprocess.run( - ["sinfo", "-h", "-N", "-o", "%N|%t"], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - check=True, - text=True, - ) - states = {} - for line in result.stdout.strip().splitlines(): - if not line: - continue - try: - name, state = line.split("|", 1) - except ValueError: - continue - states[name.strip()] = state.strip().lower() - - ok_prefixes = ("idle", "mix", "alloc", "comp") - bad_prefixes = ("down", "drain", "drng", "maint", "fail", "unkn") - - usable = [] - for node in candidate_nodes: - st = states.get(node) - if st is None: - # If node is unknown to sinfo, better to skip than to block - logger.warning(f"Node '{node}' missing from sinfo; skipping for clean task.") - continue - if st.startswith(bad_prefixes): - continue - if st.startswith(ok_prefixes): - usable.append(node) - return usable - except Exception as e: - # On any error (e.g., not on a Slurm login node), return provided list - logger.warning(f"Unable to query node availability via sinfo ({e}); using candidate nodes as-is.") - return list(candidate_nodes) + def _normalize_node_list(node_list) -> list[str]: + if node_list is None: + return [] + if isinstance(node_list, str): + return [node.strip() for node in node_list.split(",") if node.strip()] + if isinstance(node_list, (list, tuple, set)): + return [str(node).strip() for node in node_list if str(node).strip()] + raise TypeError(f"Unsupported node_list type: {type(node_list)}") @staticmethod def generate_slurm_launch_script(template_filepath: str) -> str: @@ -287,24 +205,18 @@ def run_cmd_slurm(cmd: str | list[str], task: str, nb_workers: int = 1, is_dry_r if task == "clean": # One clean command per sbatch args (per available node) _cmd_list = [cmd] * len(sbatch_args) - assert len(sbatch_args) == len(_cmd_list), ( - f"Number of commands and sbatch args must match for clean task. {len(sbatch_args)} != {len(_cmd_list)}" - ) + assert len(sbatch_args) == len(_cmd_list), f"Number of commands and sbatch args must match for clean task. {len(sbatch_args)} != {len(_cmd_list)}" template = None else: # Generate the SLURM script from template - slurm_template_path = ( - Path(os.environ.get("SRC_DIR")) / "scripts/slurm_scripts/sbatch_dispatch_slurm_temp.sh" - ) + slurm_template_path = Path(os.environ.get("SRC_DIR")) / "scripts/slurm_scripts/sbatch_dispatch_slurm_temp.sh" template = SlurmDispatcher.generate_slurm_launch_script(slurm_template_path) # Generate a script for all commands if template is not None: script_pathlist = dict() for _cmd in _cmd_list: - script_object = tempfile.NamedTemporaryFile( - mode="w", suffix=".slurm", delete=False, dir=os.environ.get("HOME") + "/tmp_genial_slurm_scripts" - ) + script_object = tempfile.NamedTemporaryFile(mode="w", suffix=".slurm", delete=False, dir=os.environ.get("HOME") + "/tmp_genial_slurm_scripts") os.chmod(script_object.name, 0o777) if device_ids is not None: @@ -315,6 +227,11 @@ def run_cmd_slurm(cmd: str | list[str], task: str, nb_workers: int = 1, is_dry_r _cmd = _cmd.replace( f"--device {device_nb}", f"--device {device_ids[-1]}" ) # TODO: make this compatible with multi cmd in parallel ... (useless for now) + # write std output into a temp file with > + + # make sure logs directory exists + Path(os.environ.get("DATA_DIR")).joinpath("logs").mkdir(parents=True, exist_ok=True) + _cmd += f" &> {Path(os.environ.get("DATA_DIR")) / "logs" / (Path(script_object.name).name + ".out")}" script_content = template.substitute( { @@ -480,17 +397,12 @@ def dispatch_all_jobs( for job_index in list(pending): # If not yet submitted or needs retry if jobs[job_index]["job_id"] is None or ( - jobs[job_index]["status"] in ["FAILED", "CANCELLED", "TIMEOUT"] - and jobs[job_index]["tries"] < max_retries + jobs[job_index]["status"] in ["FAILED", "CANCELLED", "TIMEOUT"] and jobs[job_index]["tries"] < max_retries ): - jobs[job_index]["job_id"] = SlurmDispatcher.submit_job( - jobs[job_index]["cmd"], jobs[job_index]["sbatch_args"], is_dry_run - ) + jobs[job_index]["job_id"] = SlurmDispatcher.submit_job(jobs[job_index]["cmd"], jobs[job_index]["sbatch_args"], is_dry_run) jobs[job_index]["tries"] += 1 jobs[job_index]["status"] = "SUBMITTED" - logger.info( - f"Submitted job index {job_index} as job {jobs[job_index]['job_id']} (try {jobs[job_index]['tries']})" - ) + logger.info(f"Submitted job index {job_index} as job {jobs[job_index]['job_id']} (try {jobs[job_index]['tries']})") sleep(1.0) # Check all job status @@ -508,12 +420,12 @@ def dispatch_all_jobs( if sq_state == "PENDING" and sq_reason is not None: reason_lc = sq_reason.lower() if "reqnodenotavail" in reason_lc or "unavailablenodes" in reason_lc: - logger.warning( - f"Job {job_id} appears unschedulable (reason: {sq_reason}). Cancelling to avoid hang." - ) - SlurmDispatcher.cancel_job(job_id) - jobs[job_index]["status"] = "CANCELLED" + logger.warning(f"Job {job_id} appears unschedulable (reason: {sq_reason}). Let's wait.") + #logger.warning(f"Job {job_id} appears unschedulable (reason: {sq_reason}). Cancelling to avoid hang.") + #SlurmDispatcher.cancel_job(job_id) + #jobs[job_index]["status"] = "CANCELLED" # Let the normal failure/cancel flow handle retries/termination + sleep(5.0) continue if status and status not in ["PENDING", "RUNNING"]: jobs[job_index]["status"] = status @@ -521,9 +433,7 @@ def dispatch_all_jobs( if status in ["COMPLETED"]: pending.remove(job_index) elif status in ["FAILED", "CANCELLED", "TIMEOUT"] and jobs[job_index]["tries"] >= max_retries: - logger.info( - f"Job {job_id} ({job_index}) failed after {jobs[job_index]['tries']} tries. Giving up." - ) + logger.info(f"Job {job_id} ({job_index}) failed after {jobs[job_index]['tries']} tries. Giving up.") pending.remove(job_index) any_errors.append(job_index) @@ -547,9 +457,7 @@ def dispatch_all_scripts( while pending: for script in list(pending): # If not yet submitted or needs retry - if jobs[script]["job_id"] is None or ( - jobs[script]["status"] in ["FAILED", "CANCELLED", "TIMEOUT"] and jobs[script]["tries"] < max_retries - ): + if jobs[script]["job_id"] is None or (jobs[script]["status"] in ["FAILED", "CANCELLED", "TIMEOUT"] and jobs[script]["tries"] < max_retries): jobs[script]["job_id"] = SlurmDispatcher.submit_script(script, sbatch_args, is_dry_run) jobs[script]["tries"] += 1 jobs[script]["status"] = "SUBMITTED" @@ -574,11 +482,11 @@ def dispatch_all_scripts( if sq_state == "PENDING" and sq_reason is not None: reason_lc = sq_reason.lower() if "reqnodenotavail" in reason_lc or "unavailablenodes" in reason_lc: - logger.warning( - f"Job {job_id} appears unschedulable (reason: {sq_reason}). Cancelling to avoid hang." - ) - SlurmDispatcher.cancel_job(job_id) - jobs[script]["status"] = "CANCELLED" + logger.warning(f"Job {job_id} appears unschedulable (reason: {sq_reason}). Not cancelling") + #logger.warning(f"Job {job_id} appears unschedulable (reason: {sq_reason}). Cancelling to avoid hang.") + #SlurmDispatcher.cancel_job(job_id) + #jobs[script]["status"] = "CANCELLED" + sleep(5.0) continue if status and status not in ["PENDING", "RUNNING"]: jobs[script]["status"] = status @@ -586,9 +494,7 @@ def dispatch_all_scripts( if status in ["COMPLETED"]: pending.remove(script) elif status in ["FAILED", "CANCELLED", "TIMEOUT"] and jobs[script]["tries"] >= max_retries: - logger.info( - f"Job {job_id} ({script}) failed after {jobs[script]['tries']} tries. Giving up." - ) + logger.info(f"Job {job_id} ({script}) failed after {jobs[script]['tries']} tries. Giving up.") pending.remove(script) if "--job-name=launch_genial_flowy" not in sbatch_args: any_errors.append(script) @@ -597,82 +503,170 @@ def dispatch_all_scripts( logger.info("All jobs done!") return any_errors - + @staticmethod def update_design_number_list(script_path: Path): - """Update the design number list in the script with the current design number list""" + """Update the --design_number_list in a generated SLURM script. + + This version is robust to shell redirections appended to the command, e.g. + ... --design_number_list 1 2 3 > out.log 2> err.log + It only replaces the *argument list* of --design_number_list (not every + occurrence of the design numbers in the whole script). + """ global_vars["keep_not_valid"] = True script_path = Path(script_path) if not script_path.exists(): return - with open(script_path, "r") as f: - script = f.read() + script = script_path.read_text() - skipped_steps = set() - if "--design_number_list" in script: - for line in script.splitlines(): - if "--design_number_list" in line: - design_number_list = line.split("--design_number_list")[1].strip().split() + flag = "--design_number_list" + if flag not in script: + return -1 - if "--skip_" in line: - step_name = line.split("--skip_")[1].strip().split()[0] - skipped_steps.add(step_name) + def _split_line_ending(line: str) -> tuple[str, str]: + if line.endswith("\r\n"): + return line[:-2], "\r\n" + if line.endswith("\n"): + return line[:-1], "\n" + if line.endswith("\r"): + return line[:-1], "\r" + return line, "" + + def _extract_dn_span(after_flag: str) -> tuple[list[str], int, int]: + """Return (design_numbers, start_idx, end_idx) within `after_flag`.""" + # Tokenize by contiguous non-whitespace, keeping spans + matches = list(re.finditer(r"\S+", after_flag)) + dn_spans: list[tuple[int, int]] = [] + + # Tokens that definitely terminate the design list + terminators = { + ">", ">>", "1>", "1>>", "2>", "2>>", "&>", "2>&1", "1>&2", + "|", "||", "&&", ";", + } - if "--experiment_name" in line: - experiment_name = line.split("--experiment_name")[1].strip().split()[0] + for m in matches: + tok = m.group(0) - if "--output_dir_name" in line: - output_dir_name = line.split("--output_dir_name")[1].strip().split()[0] + # Stop when we hit another CLI flag, a redirection/pipe operator, etc. + if tok.startswith("-") or tok in terminators or tok.startswith(">") or tok.endswith(">"): + break - if "--bulk_flow_dirname" in line: - bulk_flow_dirname = line.split("--bulk_flow_dirname")[1].strip().split()[0] + # Accept only pure numeric design numbers (keeps leading zeros) + if re.fullmatch(r"\d+", tok): + dn_spans.append(m.span()) else: - if "flowy" in line: - bulk_flow_dirname = "synth_out" - else: - bulk_flow_dirname = None - - root_dirpath = ConfigDir.get_root_dirpath(experiment_name, output_dir_name) - dir_config = SimpleNamespace( - root_output_dir=root_dirpath, - bulk_flow_dirname=bulk_flow_dirname, - ) + # First non-numeric token ends the list (e.g., a path, or '>' if glued) + break - valid_design_numbers = {} - for step in Analyzer.__existing_steps__: - if step != "gener": - if step not in skipped_steps: - valid_design_numbers[step] = _get_list_of_valid_designs( - dir_config, - step, - return_types="numbers", - filter_design_numbers=design_number_list, - filter_mode="include", - ) + if not dn_spans: + # No numeric tokens: insert position is before the first non-space token (or end) + m_first = re.search(r"\S", after_flag) + pos = m_first.start() if m_first else len(after_flag) + return [], pos, pos - all_valid_design_numbers = set(design_number_list) - for step in valid_design_numbers: - all_valid_design_numbers = all_valid_design_numbers.intersection(set(valid_design_numbers[step])) + start = dn_spans[0][0] + end = dn_spans[-1][1] + dn_list = after_flag[start:end].split() + return dn_list, start, end - todo_design_number_list = list(set(design_number_list) - all_valid_design_numbers) + # Parse required metadata + locate the exact line/segment to replace + skipped_steps = set() + experiment_name = None + output_dir_name = None + bulk_flow_dirname = None + + lines = script.splitlines(keepends=True) + + dn_line_idx = None + dn_line_prefix = None + dn_line_after = None + dn_span = None # (start,end) within dn_line_after + design_number_list = None + dn_line_ending = "" + + for idx, raw_line in enumerate(lines): + line, ending = _split_line_ending(raw_line) + + if flag in line and dn_line_idx is None: + prefix, after = line.split(flag, 1) + dn_list, start, end = _extract_dn_span(after) + dn_line_idx = idx + dn_line_prefix = prefix + dn_line_after = after + dn_span = (start, end) + design_number_list = dn_list + dn_line_ending = ending + + if "--skip_" in line: + step_name = line.split("--skip_", 1)[1].strip().split()[0] + skipped_steps.add(step_name) + + if experiment_name is None and "--experiment_name" in line: + experiment_name = line.split("--experiment_name", 1)[1].strip().split()[0] + + if output_dir_name is None and "--output_dir_name" in line: + output_dir_name = line.split("--output_dir_name", 1)[1].strip().split()[0] + + if bulk_flow_dirname is None and "--bulk_flow_dirname" in line: + bulk_flow_dirname = line.split("--bulk_flow_dirname", 1)[1].strip().split()[0] + + # Best-effort fallback (keeps intent but avoids per-line overwrites) + if bulk_flow_dirname is None and "flowy" in script: + bulk_flow_dirname = "synth_out" + + if dn_line_idx is None or dn_line_prefix is None or dn_line_after is None or dn_span is None or design_number_list is None: + return -1 + if experiment_name is None or output_dir_name is None: + logger.warning(f"Could not parse --experiment_name/--output_dir_name in {script_path}; skipping update.") + return -1 - # new_script_l, new_script_r = script.split("--design_number_list") - for dn in design_number_list: - script = script.replace(f"{dn}", "") + # Compute which design numbers are already valid across all non-skipped steps + root_dirpath = ConfigDir.get_root_dirpath(experiment_name, output_dir_name) + dir_config = SimpleNamespace( + root_output_dir=root_dirpath, + bulk_flow_dirname=bulk_flow_dirname, + ) - _todo_design_number_list = " ".join(todo_design_number_list) - script = script.replace("--design_number_list", f"--design_number_list {_todo_design_number_list}") + valid_design_numbers = {} + for step in Analyzer.__existing_steps__: + if step == "gener": + continue + if step in skipped_steps: + continue + valid_design_numbers[step] = _get_list_of_valid_designs( + dir_config, + step, + return_types="numbers", + filter_design_numbers=design_number_list, + filter_mode="include", + ) - # Write back updated script - with open(script_path, "w") as f: - f.write(script) - logger.info(f"Script {script_path} updated with new design number list: {todo_design_number_list}") + all_valid = set(design_number_list) + for nums in valid_design_numbers.values(): + all_valid &= set(nums) - return len(todo_design_number_list) + # Keep stable order (and avoid duplicates like the old set()-based logic) + todo_design_number_list = [] + seen = set() + for dn in design_number_list: + if dn in all_valid or dn in seen: + continue + todo_design_number_list.append(dn) + seen.add(dn) + + # Replace only the numeric span after --design_number_list, keep suffix (e.g., redirections) intact + start, end = dn_span + new_list_str = " ".join(todo_design_number_list) + new_after = dn_line_after[:start] + new_list_str + dn_line_after[end:] + new_line = dn_line_prefix + flag + new_after + dn_line_ending + + lines[dn_line_idx] = new_line + script_path.write_text("".join(lines)) + + logger.info(f"Script {script_path} updated with new design number list: {todo_design_number_list}") + return len(todo_design_number_list) - else: - return -1 def run_cmd_subprocess(cmd: str, is_dry_run: bool = False, task: str = None) -> CompletedProcess: @@ -718,6 +712,7 @@ def run_cmd_subprocess(cmd: str, is_dry_run: bool = False, task: str = None) -> def run_cmd(cmd: list[str] | str, task: str, nb_workers: int = 1, is_dry_run: bool = False): + logger.info(f"Starting task: {task}") if global_vars["is_slurm"]: process = SlurmDispatcher.run_cmd_slurm(cmd=cmd, task=task, nb_workers=nb_workers, is_dry_run=is_dry_run) else: @@ -786,9 +781,7 @@ def launch_task(**kwargs) -> CompletedProcess: # We expect a flowy run here cmd += ["--skip_swact", "--skip_power"] else: - raise NotImplementedError( - f"bulk_flow_dirname {kwargs['bulk_flow_dirname']} is not implemented for the launher" - ) + raise NotImplementedError(f"bulk_flow_dirname {kwargs['bulk_flow_dirname']} is not implemented for the launher") # Enforce some arguments cmd += ["--ignore_user_prompts"] @@ -811,9 +804,7 @@ def launch_task(**kwargs) -> CompletedProcess: ) list_of_valid_synth_design_numbers = get_list_of_synth_designs_number(dir_config) list_of_valid_design_numbers = list( - get_list_of_gener_designs_number( - dir_config, filter_design_numbers=list_of_valid_synth_design_numbers, filter_mode="exclude" - ) + get_list_of_gener_designs_number(dir_config, filter_design_numbers=list_of_valid_synth_design_numbers, filter_mode="exclude") ) cmd = " ".join(cmd) @@ -1070,18 +1061,14 @@ def delete_directory(todelete_dir_name: str, **kwargs): todelete_dir_config = ConfigDir(**todelete_kwargs) # Save model checkpoints before deleting - save_ckpts_dirpath = ( - todelete_dir_config.root_output_dir.parent / kwargs["save_ckpt_dir_name"] / f"iter{kwargs['iteration']}" - ) + save_ckpts_dirpath = todelete_dir_config.root_output_dir.parent / kwargs["save_ckpt_dir_name"] / f"iter{kwargs['iteration']}" if not save_ckpts_dirpath.exists(): save_ckpts_dirpath.mkdir(exist_ok=True, parents=True) if not kwargs["is_dry_run"]: if todelete_dir_config.trainer_out_dir_ver.exists(): shutil.copytree(todelete_dir_config.trainer_out_dir_ver, save_ckpts_dirpath, dirs_exist_ok=True) else: - logger.info( - f"Saving model checkpoints from {todelete_dir_config.trainer_out_dir_ver} to {save_ckpts_dirpath} skipped for dryrun" - ) + logger.info(f"Saving model checkpoints from {todelete_dir_config.trainer_out_dir_ver} to {save_ckpts_dirpath} skipped for dryrun") # Delete the previous merge directory if todelete_dir_config.root_output_dir.exists(): @@ -1325,9 +1312,7 @@ def main(): elif "delete_merge_dirs" in run_config: # If user explicitly provided this (via YAML or CLI), keep it, but warn if True since it's default if run_config.get("delete_merge_dirs"): - logger.warning( - "--delete_merge_dirs is deprecated; deletion is the default. Use --KEEP_merge_dirs to preserve instead." - ) + logger.warning("--delete_merge_dirs is deprecated; deletion is the default. Use --KEEP_merge_dirs to preserve instead.") else: # Not specified anywhere -> default to delete run_config["delete_merge_dirs"] = True @@ -1343,15 +1328,11 @@ def main(): ) # Check if the initial dataset exists - initial_dataset_dir_path = ( - Path(os.environ.get("WORK_DIR")) / "output" / run_config["experiment_name"] / run_config["initial_dataset"] - ) + initial_dataset_dir_path = Path(os.environ.get("WORK_DIR")) / "output" / run_config["experiment_name"] / run_config["initial_dataset"] print(initial_dataset_dir_path) if initial_dataset_dir_path.exists(): run_config["skip_init_gener"] = True - logger.warning( - f"Initial dataset {run_config['initial_dataset']} already exists. Skipping initial generation." - ) + logger.warning(f"Initial dataset {run_config['initial_dataset']} already exists. Skipping initial generation.") else: run_config["skip_init_gener"] = False # If the initial dataset is not given, we assume we want to do the initial generation and launch @@ -1415,9 +1396,7 @@ def main(): # We have generated designs but synthesis is not complete -> ensure we run initial launch do_init_launch = True skip_restart_launch = False - logger.info( - "Detected partial initial synthesis; will resume initial launch on existing generated designs." - ) + logger.info("Detected partial initial synthesis; will resume initial launch on existing generated designs.") elif len(gen_nums) > 0 and len(synth_nums) >= len(gen_nums): # Everything synthesized -> skip initial launch by default unless explicitly requested if skip_restart_launch is None: @@ -1500,17 +1479,13 @@ def main(): intended_proto_dir = exp_dirpath / get_dir_name(run_config["dir_name"], "proto", restart_iteration) token_path = intended_proto_dir / "proto_generation_done.token" if not token_path.exists(): - logger.warning( - f"Proto resume requested at {intended_proto_dir}, but token not found; will re-run prototype generation (skip training)." - ) + logger.warning(f"Proto resume requested at {intended_proto_dir}, but token not found; will re-run prototype generation (skip training).") # Run proto generation again, but do not re-train skip_training_on_resume = True restart_step = None if restart_step is not None: - logger.info( - f"Resuming at iter={restart_iteration} after step={restart_step}; base dataset set to: {gen_dir_name}" - ) + logger.info(f"Resuming at iter={restart_iteration} after step={restart_step}; base dataset set to: {gen_dir_name}") # Check whether lauch task should be operated on restart if skip_restart_launch is None: @@ -1534,9 +1509,7 @@ def main(): **run_config, ) end = time() - logger.info( - f"Completed initial generation | Generated directory: {run_config['experiment_name']}{gen_dir_name} | Time: {end - start:.2f} seconds" - ) + logger.info(f"Completed initial generation | Generated directory: {run_config['experiment_name']}{gen_dir_name} | Time: {end - start:.2f} seconds") logger.info(f"|| ======================================================================================") else: logger.info("") @@ -1570,9 +1543,7 @@ def main(): restart_step = None skip_training_on_resume = False end = time() - logger.info( - f"Completed iteration {i} | Generated directory: {run_config['experiment_name']}{gen_dir_name} | Time: {end - start:.2f} seconds" - ) + logger.info(f"Completed iteration {i} | Generated directory: {run_config['experiment_name']}{gen_dir_name} | Time: {end - start:.2f} seconds") logger.info(f"|| ======================================================================================") except Exception as e: