Skip to content

Commit

Permalink
Add error and processing limits
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremyh committed Oct 9, 2024
1 parent 96401af commit f9aa286
Showing 1 changed file with 45 additions and 8 deletions.
53 changes: 45 additions & 8 deletions scene_select/merge_bulk_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import csv
import shutil
import sys
import time
from datetime import datetime

from pathlib import Path
Expand Down Expand Up @@ -121,28 +123,32 @@ def process_dataset(
archive_list: UUIDsForPath,
dataset_filter: DatasetFilter = DatasetFilter(),
dry_run: bool = False,
) -> None:
) -> bool:
"""
Returns true if the dataset was processed, false if it was skipped.
"""
log = _LOG.bind(newly_arrived_dataset=metadata_file)
log.info("dataset.processing.start")

if not metadata_file.name.endswith(".odc-metadata.yaml"):
log.error("dataset.error", error="Expected dataset path to be a metadata path")
return
raise UnexpectedDataset(
"Expected metadata file to be a .odc-metadata.yaml file"
)

dataset = load_dataset(index, metadata_file)
source_level1_path = find_source_level1_path(metadata_file)

if index.datasets.has(dataset.id):
log.info("dataset.skip.already_indexed", dataset_id=str(dataset.id))
# TODO: mark this somehow? Remove the file?
return
return False

processing_base, metadata_offset = split_dataset_base_path(metadata_file)
collection_product = get_product(dataset.product.name)
dest_metadata_path = collection_product.base_package_directory / metadata_offset

if not dataset_filter.should_process_dataset(dataset, collection_product, log):
return
return False

# We are processing!

Expand All @@ -167,6 +173,7 @@ def process_dataset(
dataset_id=str(dataset.id),
target_path=str(dest_metadata_path),
)
return True


def consolidate_filesystem(
Expand Down Expand Up @@ -467,6 +474,18 @@ def index_dataset(
nargs=2,
type=click.DateTime(),
)
@click.option(
"--max-count",
type=int,
default=sys.maxsize,
help="Maximum number of datasets to process",
)
@click.option(
"--max-consecutive-failures",
type=int,
default=5,
help="Stop if we reach this many consecutive failures",
)
@click.option(
"--only-same-filesystem/--allow-different-filesystems",
is_flag=True,
Expand All @@ -478,6 +497,8 @@ def cli(
index: Index,
bulk_run_dirs: List[Path],
dry_run: bool,
max_count: int,
max_consecutive_failures: int,
only_products: Optional[Tuple[str, ...]] = None,
only_region_codes: Optional[Tuple[str, ...]] = None,
only_region_code_file: Optional[Path] = None,
Expand All @@ -500,7 +521,8 @@ def cli(
only_time_range=only_time_range,
only_same_filesystem=only_same_filesystem,
)

count = 0
consecutive_failures = 0
_LOG.info("command.start", bulk_run_dir_count=len(bulk_run_dirs), dry_run=dry_run)
with Datacube(index=index) as dc:
for bulk_run_dir in bulk_run_dirs:
Expand All @@ -519,15 +541,30 @@ def cli(
continue

try:
process_dataset(
was_processed = process_dataset(
dc.index,
metadata_file=metadata_path,
archive_list=archive_list,
dataset_filter=dataset_filter,
dry_run=dry_run,
)
except Exception:
consecutive_failures = 0
if was_processed:
count += 1
if count >= max_count:
log.info("scan.max_count_reached")
return

except Exception as e:
log.exception("run.error", dataset_path=str(metadata_path))
consecutive_failures += 1

# Wait a bit before retrying
time.sleep(min((consecutive_failures**3) * 5, 500))
if consecutive_failures > max_consecutive_failures:
raise RuntimeError(
f"Reached maximum consecutive failures ({max_consecutive_failures})"
) from e

log.info("scan.directory.end")

Expand Down

0 comments on commit f9aa286

Please sign in to comment.