diff --git a/scene_select/merge_bulk_runs.py b/scene_select/merge_bulk_runs.py index 486f516b..d31b33e4 100644 --- a/scene_select/merge_bulk_runs.py +++ b/scene_select/merge_bulk_runs.py @@ -15,6 +15,8 @@ import csv import shutil +import sys +import time from datetime import datetime from pathlib import Path @@ -121,13 +123,17 @@ def process_dataset( archive_list: UUIDsForPath, dataset_filter: DatasetFilter = DatasetFilter(), dry_run: bool = False, -) -> None: +) -> bool: + """ + Returns true if the dataset was processed, false if it was skipped. + """ log = _LOG.bind(newly_arrived_dataset=metadata_file) log.info("dataset.processing.start") if not metadata_file.name.endswith(".odc-metadata.yaml"): - log.error("dataset.error", error="Expected dataset path to be a metadata path") - return + raise UnexpectedDataset( + "Expected metadata file to be a .odc-metadata.yaml file" + ) dataset = load_dataset(index, metadata_file) source_level1_path = find_source_level1_path(metadata_file) @@ -135,14 +141,14 @@ def process_dataset( if index.datasets.has(dataset.id): log.info("dataset.skip.already_indexed", dataset_id=str(dataset.id)) # TODO: mark this somehow? Remove the file? - return + return False processing_base, metadata_offset = split_dataset_base_path(metadata_file) collection_product = get_product(dataset.product.name) dest_metadata_path = collection_product.base_package_directory / metadata_offset if not dataset_filter.should_process_dataset(dataset, collection_product, log): - return + return False # We are processing! @@ -167,6 +173,7 @@ def process_dataset( dataset_id=str(dataset.id), target_path=str(dest_metadata_path), ) + return True def consolidate_filesystem( @@ -467,6 +474,18 @@ def index_dataset( nargs=2, type=click.DateTime(), ) +@click.option( + "--max-count", + type=int, + default=sys.maxsize, + help="Maximum number of datasets to process", +) +@click.option( + "--max-consecutive-failures", + type=int, + default=5, + help="Stop if we reach this many consecutive failures", +) @click.option( "--only-same-filesystem/--allow-different-filesystems", is_flag=True, @@ -478,6 +497,8 @@ def cli( index: Index, bulk_run_dirs: List[Path], dry_run: bool, + max_count: int, + max_consecutive_failures: int, only_products: Optional[Tuple[str, ...]] = None, only_region_codes: Optional[Tuple[str, ...]] = None, only_region_code_file: Optional[Path] = None, @@ -500,7 +521,8 @@ def cli( only_time_range=only_time_range, only_same_filesystem=only_same_filesystem, ) - + count = 0 + consecutive_failures = 0 _LOG.info("command.start", bulk_run_dir_count=len(bulk_run_dirs), dry_run=dry_run) with Datacube(index=index) as dc: for bulk_run_dir in bulk_run_dirs: @@ -519,15 +541,30 @@ def cli( continue try: - process_dataset( + was_processed = process_dataset( dc.index, metadata_file=metadata_path, archive_list=archive_list, dataset_filter=dataset_filter, dry_run=dry_run, ) - except Exception: + consecutive_failures = 0 + if was_processed: + count += 1 + if count >= max_count: + log.info("scan.max_count_reached") + return + + except Exception as e: log.exception("run.error", dataset_path=str(metadata_path)) + consecutive_failures += 1 + + # Wait a bit before retrying + time.sleep(min((consecutive_failures**3) * 5, 500)) + if consecutive_failures > max_consecutive_failures: + raise RuntimeError( + f"Reached maximum consecutive failures ({max_consecutive_failures})" + ) from e log.info("scan.directory.end")