Skip to content

Commit

Permalink
Handle duplicate processing of a dataset
Browse files Browse the repository at this point in the history
When our list of archived datasets isn't enough to remove
the destination directory
  • Loading branch information
jeremyh committed Oct 12, 2024
1 parent 996bb23 commit 4ea556d
Showing 1 changed file with 33 additions and 8 deletions.
41 changes: 33 additions & 8 deletions scene_select/merge_bulk_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,41 @@ def process_dataset(
# We are processing!
original_metadata_file = metadata_file

# Move to same filesystem if needed.
# Find all our recorded old datasets, to make sure we will be clearing out
# the destination before we move there.
old_datasets = []
is_removing_destination_path = False
for old_ard_uuid in archive_list.get(source_level1_path, []):
old_dataset = index.datasets.get(old_ard_uuid)
if old_dataset is None:
log.warning("dataset.archive.not_found", old_ard_uuid=str(old_ard_uuid))
continue
old_datasets.append(old_dataset)

# Is this old dataset currently at our destination path?
if normal_path(old_dataset.local_path) == normal_path(dest_metadata_path):
is_removing_destination_path = True

# If there's a dataset already at the destination, but it's not one of the datasets
# we're about to archive...
if (not is_removing_destination_path) and dest_metadata_path.exists():
# Perhaps this has been newly processed already?
log.error(
"dataset.skip.existing_dataset_at_path", destination_path=dest_metadata_path
)
return False

# Move our incoming data to same filesystem if needed.
# (this takes the longest out of every step, so we do it before we start merging/replacing anything.)
metadata_file = consolidate_filesystem(
metadata_file, dest_metadata_path, dry_run, log
)

# Now we're good to proceed with merge.

# 1. Archive and trash the older datasets for the same L1.
for old_ard_uuid in archive_list.get(source_level1_path, []):
archive_old_dataset(index, old_ard_uuid, dry_run, log)
for old_dataset in old_datasets:
archive_old_dataset(index, old_dataset, dry_run, log)

# 2. Move the dataset into place
rename_dataset(metadata_file, dest_metadata_path, dry_run, log)
Expand Down Expand Up @@ -244,14 +271,12 @@ def load_dataset(index: Index, metadata_file: Path, with_lineage=True) -> Datase

def archive_old_dataset(
index: Index,
old_ard_uuid: UUID,
old_dataset: Dataset,
dry_run: bool,
log: structlog.BoundLogger,
) -> None:
"""Archive an old ARD dataset and move its files to trash."""
log = log.bind(old_ard_uuid=str(old_ard_uuid))

old_dataset = index.datasets.get(old_ard_uuid)
log = log.bind(old_ard_uuid=str(old_dataset.id))
if old_dataset is None:
log.warning("dataset.archive.not_found")
return
Expand All @@ -262,7 +287,7 @@ def archive_old_dataset(
else:
log.info("do.archive_in_index")
if not dry_run:
index.datasets.archive([old_ard_uuid])
index.datasets.archive([old_dataset.id])

move_to_trash(
old_dataset.local_path,
Expand Down

0 comments on commit 4ea556d

Please sign in to comment.