From cd1ee8a4677954f694c08573e1cb4ab16a1ad898 Mon Sep 17 00:00:00 2001 From: Jeremy Hooke Date: Mon, 14 Oct 2024 16:50:32 +1100 Subject: [PATCH] Allow scan for interrupted batches --- scene_select/merge_bulk_runs.py | 20 ++++++++++++++++++-- 1 file changed, 18 insertions(+), 2 deletions(-) diff --git a/scene_select/merge_bulk_runs.py b/scene_select/merge_bulk_runs.py index 99efb50b..f6ebf406 100644 --- a/scene_select/merge_bulk_runs.py +++ b/scene_select/merge_bulk_runs.py @@ -612,19 +612,35 @@ def cli( log.info("scan.directory.end") -def iter_output_datasets(bulk_run_dir: Path, log) -> Iterator[Path]: +def iter_output_datasets( + bulk_run_dir: Path, log, scan_interrupted_batches=True +) -> Iterator[Path]: """Get all reported output datasets for batches that have finished for this bulk-run. It will return the metadata path to each dataset. """ + unique_datasets = set() + has_interrupted_batches = False for batch_dir in bulk_run_dir.glob("batchid-*"): if not (batch_dir / "level-1-final_state-done.txt"): log.info("skipping_unfinished_batch", batch_dir=batch_dir) + has_interrupted_batches = True continue for index_file in batch_dir.glob("*-datasets-to-index.txt"): with open(index_file) as f: for line in f: - yield Path(line.strip()) + unique_datasets.add(Path(line.strip())) + + # If a batch was interrupted, it didn't write out a report, so we will scan it here. + # TODO: this wont catch datasets if they were written to another area. + if has_interrupted_batches and scan_interrupted_batches: + pkg_dir = bulk_run_dir / "pkg" + for product_dir in pkg_dir.iterdir(): + if product_dir.is_dir() and not product_dir.name.startswith("."): + for md in product_dir.rglob("*.odc-metadata.yaml"): + unique_datasets.add(md.absolute()) + + yield from unique_datasets if __name__ == "__main__":