Skip to content

Commit

Permalink
Allow scan for interrupted batches
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremyh committed Oct 14, 2024
1 parent 4ea556d commit cd1ee8a
Showing 1 changed file with 18 additions and 2 deletions.
20 changes: 18 additions & 2 deletions scene_select/merge_bulk_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -612,19 +612,35 @@ def cli(
log.info("scan.directory.end")


def iter_output_datasets(bulk_run_dir: Path, log) -> Iterator[Path]:
def iter_output_datasets(
bulk_run_dir: Path, log, scan_interrupted_batches=True
) -> Iterator[Path]:
"""Get all reported output datasets for batches that have finished for this bulk-run.
It will return the metadata path to each dataset.
"""
unique_datasets = set()
has_interrupted_batches = False
for batch_dir in bulk_run_dir.glob("batchid-*"):
if not (batch_dir / "level-1-final_state-done.txt"):
log.info("skipping_unfinished_batch", batch_dir=batch_dir)
has_interrupted_batches = True
continue
for index_file in batch_dir.glob("*-datasets-to-index.txt"):
with open(index_file) as f:
for line in f:
yield Path(line.strip())
unique_datasets.add(Path(line.strip()))

# If a batch was interrupted, it didn't write out a report, so we will scan it here.
# TODO: this wont catch datasets if they were written to another area.
if has_interrupted_batches and scan_interrupted_batches:
pkg_dir = bulk_run_dir / "pkg"
for product_dir in pkg_dir.iterdir():
if product_dir.is_dir() and not product_dir.name.startswith("."):
for md in product_dir.rglob("*.odc-metadata.yaml"):
unique_datasets.add(md.absolute())

yield from unique_datasets


if __name__ == "__main__":
Expand Down

0 comments on commit cd1ee8a

Please sign in to comment.