Skip to content

Commit

Permalink
Handle cross-filesystem merges
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremyh committed Oct 9, 2024
1 parent dd68526 commit 96401af
Showing 1 changed file with 62 additions and 15 deletions.
77 changes: 62 additions & 15 deletions scene_select/merge_bulk_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@
"""

import csv
import shutil
from datetime import datetime

from pathlib import Path
from typing import Dict, Optional, List, Tuple, Iterator, Set, NamedTuple
from uuid import UUID
Expand All @@ -37,6 +39,10 @@
_LOG = structlog.get_logger()

UUIDsForPath = Dict[Path, List[UUID]]
RUN_TIMESTAMP = datetime.now()


class UnexpectedDataset(ValueError): ...


def load_archive_list(csv_path: Path) -> UUIDsForPath:
Expand Down Expand Up @@ -140,12 +146,17 @@ def process_dataset(

# We are processing!

# Move to same filesystem if needed.
metadata_file = consolidate_filesystem(
dest_metadata_path, metadata_file, dry_run, log
)

# 1. Archive and trash the older datasets for the same L1.
for old_ard_uuid in archive_list.get(source_level1_path, []):
archive_old_dataset(index, old_ard_uuid, dry_run, log)

# 2. Move the dataset into place
move_dataset(metadata_file, dest_metadata_path, dry_run, log)
rename_dataset(metadata_file, dest_metadata_path, dry_run, log)

# 3. Index it.
dataset.uris = [dest_metadata_path.as_uri()]
Expand All @@ -158,6 +169,43 @@ def process_dataset(
)


def consolidate_filesystem(
metadata_file: Path, eventual_destination_path: Path, dry_run, log
) -> Path:
destination_base, _ = split_dataset_base_path(eventual_destination_path)
if not same_filesystem(metadata_file, destination_base):
base_path, metadata_offset = split_dataset_base_path(metadata_file)

# Place it on the same filesystem as the eventual destination.
inbox_metadata_path = (
destination_base
/ ".inbox"
/ RUN_TIMESTAMP.strftime("%Y%m%d-%H%M%S")
/ metadata_offset
)
if inbox_metadata_path.exists():
log.warning("dataset.skip.already_in_inbox", path=inbox_metadata_path)
raise UnexpectedDataset(
f"Dataset {metadata_file} is already in inbox: {inbox_metadata_path}"
)
source_dataset_dir = metadata_file.parent
destination_dataset_dir = inbox_metadata_path.parent

log = log.bind(
source=source_dataset_dir,
destination=destination_dataset_dir,
mkdir=destination_dataset_dir.parent,
)
log.info("do.dest_filesystem_copy")
if not dry_run:
destination_dataset_dir.parent.mkdir(parents=True, exist_ok=True)
shutil.copytree(source_dataset_dir, destination_dataset_dir)
log.info("do.dest_filesystem_copy.done")
metadata_file = inbox_metadata_path

return metadata_file


def load_dataset(index: Index, metadata_file: Path, with_lineage=True) -> Dataset:
"""
Load a dataset from a metadata file.
Expand Down Expand Up @@ -246,10 +294,7 @@ def move_to_trash(

base_path, metadata_offset = split_dataset_base_path(source_path)
trash_path = (
base_path
/ ".trash"
/ datetime.now().strftime("%Y%m%d")
/ metadata_offset.parent
base_path / ".trash" / RUN_TIMESTAMP.strftime("%Y%m%d") / metadata_offset.parent
)

log.info(
Expand Down Expand Up @@ -347,22 +392,24 @@ def split_dataset_base_path(metadata_file: Path) -> Tuple[Path, Path]:
return source_base_folder, metadata_file.relative_to(source_base_folder)


def move_dataset(
souce_md_path: Path, dest_md_path: Path, dry_run: bool, log: structlog.BoundLogger
def rename_dataset(
source_md_path: Path, dest_md_path: Path, dry_run: bool, log: structlog.BoundLogger
) -> None:
"""Move a dataset from source to destination."""
source_dataset_dir = souce_md_path.parent
source_dataset_dir = source_md_path.parent
dest_dataset_dir = dest_md_path.parent

# We can only do an atomic rename on the same drive.
# The destination doesn't dataset exist yet, so we check its base directory drive.
destination_base, _ = split_dataset_base_path(dest_md_path)
if not same_filesystem(source_dataset_dir, destination_base):
raise ValueError(
f"Source and destination are not on the same filesystem. "
f"(for now this script has been altered to not do moves, only renames): "
f"{source_dataset_dir} != {destination_base}"
)
if not dry_run:
destination_base, _ = split_dataset_base_path(dest_md_path)
if not same_filesystem(source_dataset_dir, destination_base):
raise ValueError(
f"Source and destination are not on the same filesystem. "
f"(for now this script has been altered to not do moves, only renames): "
f"{source_dataset_dir} != {destination_base}"
)

log.info(
"do.rename_dataset",
source=source_dataset_dir,
Expand Down

0 comments on commit 96401af

Please sign in to comment.