Skip to content

Commit

Permalink
Add error and processing limits
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremyh committed Oct 9, 2024
1 parent 96401af commit 10d1a87
Showing 1 changed file with 53 additions and 10 deletions.
63 changes: 53 additions & 10 deletions scene_select/merge_bulk_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@

import csv
import shutil
import sys
import time
from datetime import datetime

from pathlib import Path
Expand Down Expand Up @@ -121,34 +123,38 @@ def process_dataset(
archive_list: UUIDsForPath,
dataset_filter: DatasetFilter = DatasetFilter(),
dry_run: bool = False,
) -> None:
) -> bool:
"""
Returns true if the dataset was processed, false if it was skipped.
"""
log = _LOG.bind(newly_arrived_dataset=metadata_file)
log.info("dataset.processing.start")

if not metadata_file.name.endswith(".odc-metadata.yaml"):
log.error("dataset.error", error="Expected dataset path to be a metadata path")
return
raise UnexpectedDataset(
"Expected metadata file to be a .odc-metadata.yaml file"
)

dataset = load_dataset(index, metadata_file)
source_level1_path = find_source_level1_path(metadata_file)

if index.datasets.has(dataset.id):
log.info("dataset.skip.already_indexed", dataset_id=str(dataset.id))
# TODO: mark this somehow? Remove the file?
return
return False

processing_base, metadata_offset = split_dataset_base_path(metadata_file)
collection_product = get_product(dataset.product.name)
dest_metadata_path = collection_product.base_package_directory / metadata_offset

if not dataset_filter.should_process_dataset(dataset, collection_product, log):
return
return False

# We are processing!

# Move to same filesystem if needed.
metadata_file = consolidate_filesystem(
dest_metadata_path, metadata_file, dry_run, log
metadata_file, dest_metadata_path, dry_run, log
)

# 1. Archive and trash the older datasets for the same L1.
Expand All @@ -167,6 +173,7 @@ def process_dataset(
dataset_id=str(dataset.id),
target_path=str(dest_metadata_path),
)
return True


def consolidate_filesystem(
Expand Down Expand Up @@ -200,6 +207,12 @@ def consolidate_filesystem(
if not dry_run:
destination_dataset_dir.parent.mkdir(parents=True, exist_ok=True)
shutil.copytree(source_dataset_dir, destination_dataset_dir)

# Sanity check
if not inbox_metadata_path.exists():
raise UnexpectedDataset(
f"Dataset is not in the expected inbox? {inbox_metadata_path}"
)
log.info("do.dest_filesystem_copy.done")
metadata_file = inbox_metadata_path

Expand Down Expand Up @@ -467,17 +480,31 @@ def index_dataset(
nargs=2,
type=click.DateTime(),
)
@click.option(
"--max-count",
type=int,
default=sys.maxsize,
help="Maximum number of datasets to process",
)
@click.option(
"--max-consecutive-failures",
type=int,
default=5,
help="Stop if we reach this many consecutive failures",
)
@click.option(
"--only-same-filesystem/--allow-different-filesystems",
is_flag=True,
default=True,
default=False,
help="Only process datasets that don't require copying between filesystems",
)
@ui.pass_index(app_name="ard-dataset-merger")
def cli(
index: Index,
bulk_run_dirs: List[Path],
dry_run: bool,
max_count: int,
max_consecutive_failures: int,
only_products: Optional[Tuple[str, ...]] = None,
only_region_codes: Optional[Tuple[str, ...]] = None,
only_region_code_file: Optional[Path] = None,
Expand All @@ -500,7 +527,8 @@ def cli(
only_time_range=only_time_range,
only_same_filesystem=only_same_filesystem,
)

count = 0
consecutive_failures = 0
_LOG.info("command.start", bulk_run_dir_count=len(bulk_run_dirs), dry_run=dry_run)
with Datacube(index=index) as dc:
for bulk_run_dir in bulk_run_dirs:
Expand All @@ -519,15 +547,30 @@ def cli(
continue

try:
process_dataset(
was_processed = process_dataset(
dc.index,
metadata_file=metadata_path,
archive_list=archive_list,
dataset_filter=dataset_filter,
dry_run=dry_run,
)
except Exception:
consecutive_failures = 0
if was_processed:
count += 1
if count >= max_count:
log.info("scan.max_count_reached")
return

except Exception as e:
log.exception("run.error", dataset_path=str(metadata_path))
consecutive_failures += 1

# Wait a bit before retrying
time.sleep(min((consecutive_failures**3) * 5, 500))
if consecutive_failures > max_consecutive_failures:
raise RuntimeError(
f"Reached maximum consecutive failures ({max_consecutive_failures})"
) from e

log.info("scan.directory.end")

Expand Down

0 comments on commit 10d1a87

Please sign in to comment.