diff --git a/docs/templates/template.pipeline.py b/docs/templates/template.pipeline.py index a1591d1..23c38f6 100644 --- a/docs/templates/template.pipeline.py +++ b/docs/templates/template.pipeline.py @@ -8,8 +8,11 @@ from pathlib import Path from typing import Any +from pydantic import BaseModel + from marimba.core.pipeline import BasePipeline from marimba.core.schemas.base import BaseMetadata +from marimba.core.schemas.header.base import MetadataHeader from marimba.core.schemas.ifdo import iFDOMetadata @@ -111,7 +114,10 @@ def _package( data_dir: Path, config: dict[str, Any], **kwargs: dict[str, Any], - ) -> dict[Path, tuple[Path, list[BaseMetadata] | None, dict[str, Any] | None]]: + ) -> tuple[ + dict[Path, tuple[Path, list[BaseMetadata] | None, dict[str, Any] | None]], + dict[type[BaseMetadata], MetadataHeader[BaseModel]], + ]: """ Package data from data_dir for distribution. @@ -123,5 +129,11 @@ def _package( Returns: Dictionary mapping source paths to tuples of (destination path, BaseMetadata list, metadata). """ - data_mapping: dict[Path, tuple[Path, list[BaseMetadata] | None, dict[str, Any] | None]] = {} + data_mapping: tuple[ + dict[Path, tuple[Path, list[BaseMetadata] | None, dict[str, Any] | None]], + dict[type[BaseMetadata], MetadataHeader[BaseModel]], + ] = ( + {}, + {}, + ) return data_mapping diff --git a/marimba/core/pipeline.py b/marimba/core/pipeline.py index 330910d..bde9dd8 100644 --- a/marimba/core/pipeline.py +++ b/marimba/core/pipeline.py @@ -15,19 +15,33 @@ Classes: - BasePipeline: Abstract base class for Marimba pipelines. + - PackageEntry: Package metadata for a single file. """ from abc import ABC, abstractmethod from pathlib import Path -from typing import Any +from typing import Any, NamedTuple + +from pydantic import BaseModel from marimba.core.schemas.base import BaseMetadata +from marimba.core.schemas.header.base import MetadataHeader from marimba.core.utils.log import LogMixin from marimba.core.utils.paths import format_path_for_logging from marimba.core.utils.rich import format_command, format_entity +class PackageEntry(NamedTuple): + """ + Package metadata for a single file. + """ + + path: Path + metadata: list[BaseMetadata] | None = None + extra: dict[str, Any] | None = None + + class BasePipeline(ABC, LogMixin): """ Marimba pipeline abstract base class. All pipelines should inherit from this class. @@ -184,7 +198,10 @@ def run_package( data_dir: Path, config: dict[str, Any], **kwargs: dict[str, Any], - ) -> dict[Path, tuple[Path, list[BaseMetadata] | None, dict[str, Any] | None]]: + ) -> tuple[ + dict[Path, PackageEntry], + dict[type[BaseMetadata], MetadataHeader[BaseModel]], + ]: """ Package a dataset from the given data directories and their corresponding collection configurations. @@ -204,13 +221,20 @@ def run_package( f"data_dir={format_path_for_logging(data_dir, Path(self._root_path).parents[2])}, {config=}, {kwargs=}", ) - data_mapping = self._package(data_dir, config, **kwargs) + result = self._package(data_dir, config, **kwargs) + + metadata_header: dict[type[BaseMetadata], MetadataHeader[BaseModel]] + if isinstance(result, tuple): + data_mapping, metadata_header = result + else: + data_mapping = result + metadata_header = {} self.logger.info( f"Completed {format_command('package')} command for pipeline {format_entity(self.class_name)}", ) - return data_mapping + return {key: PackageEntry(*entry) for key, entry in data_mapping.items()}, metadata_header def run_post_package( self, @@ -273,7 +297,13 @@ def _package( data_dir: Path, config: dict[str, Any], **kwargs: dict[str, Any], - ) -> dict[Path, tuple[Path, list[BaseMetadata] | None, dict[str, Any] | None]]: + ) -> ( + dict[Path, tuple[Path, list[BaseMetadata] | None, dict[str, Any] | None]] + | tuple[ + dict[Path, tuple[Path, list[BaseMetadata] | None, dict[str, Any] | None]], + dict[type[BaseMetadata], MetadataHeader[BaseModel]], + ] + ): """ `run_package` implementation; override this. """ diff --git a/marimba/core/schemas/base.py b/marimba/core/schemas/base.py index 44461a4..3ef5240 100644 --- a/marimba/core/schemas/base.py +++ b/marimba/core/schemas/base.py @@ -12,6 +12,10 @@ from pathlib import Path from typing import Any +from pydantic import BaseModel + +from marimba.core.schemas.header.base import MetadataHeader + class BaseMetadata(ABC): """ @@ -79,6 +83,7 @@ def create_dataset_metadata( dataset_name: str, root_dir: Path, items: dict[str, list["BaseMetadata"]], + metadata_header: MetadataHeader[BaseModel] | None = None, metadata_name: str | None = None, *, dry_run: bool = False, @@ -91,7 +96,14 @@ def create_dataset_metadata( @abstractmethod def process_files( cls, - dataset_mapping: dict[Path, tuple[list["BaseMetadata"], dict[str, Any] | None]], + dataset_mapping: dict[ + Path, + tuple[ + list["BaseMetadata"], + dict[str, Any] | None, + MetadataHeader[BaseModel] | None, + ], + ], max_workers: int | None = None, logger: logging.Logger | None = None, *, diff --git a/marimba/core/schemas/generic.py b/marimba/core/schemas/generic.py index ffa1c1c..2e45c9c 100644 --- a/marimba/core/schemas/generic.py +++ b/marimba/core/schemas/generic.py @@ -12,7 +12,10 @@ from pathlib import Path from typing import Any, Union, cast +from pydantic import BaseModel + from marimba.core.schemas.base import BaseMetadata +from marimba.core.schemas.header.base import MetadataHeader from marimba.core.utils.metadata import yaml_saver @@ -177,6 +180,7 @@ def create_dataset_metadata( dataset_name: str, root_dir: Path, items: dict[str, list["BaseMetadata"]], + _metadata_header: MetadataHeader[BaseModel] | None = None, metadata_name: str | None = None, *, dry_run: bool = False, @@ -212,7 +216,14 @@ def create_dataset_metadata( @classmethod def process_files( cls, - dataset_mapping: dict[Path, tuple[list["BaseMetadata"], dict[str, Any] | None]], + dataset_mapping: dict[ + Path, + tuple[ + list["BaseMetadata"], + dict[str, Any] | None, + MetadataHeader[BaseModel] | None, + ], + ], max_workers: int | None = None, logger: logging.Logger | None = None, *, diff --git a/marimba/core/schemas/header/__init__.py b/marimba/core/schemas/header/__init__.py new file mode 100644 index 0000000..008f67a --- /dev/null +++ b/marimba/core/schemas/header/__init__.py @@ -0,0 +1,6 @@ +from marimba.core.schemas.header.base import ( + HeaderMergeConflictError, + MetadataHeader, +) + +__all__ = ["MetadataHeader", "HeaderMergeConflictError"] diff --git a/marimba/core/schemas/header/base.py b/marimba/core/schemas/header/base.py new file mode 100644 index 0000000..6ca5a6d --- /dev/null +++ b/marimba/core/schemas/header/base.py @@ -0,0 +1,101 @@ +""" +Module containing the implementation of the metadata header. + +Classes: + HeaderMergeConflictError: Custom Error-Type for signaling that two header cannot be merged. + MetadataHeader: Metadata header class wrapping mergeable header data. +""" + +from __future__ import annotations + +import inspect +from copy import copy +from typing import Any, Generic, TypeVar + +from pydantic import BaseModel + +T = TypeVar("T", bound=BaseModel) + + +class HeaderMergeConflictError(Exception): + """ + Custom Error-Type for signaling that two header cannot be merged. + """ + + def __init__(self, conflict_attr: str, *args: object) -> None: + """ + Initializes a HeaderMergeConflictError instance. + + Args: + conflict_attr: The name of the attribute responsible for the merge conflict. + *args: Error + """ + super().__init__(*args) + self._conflict_attr = conflict_attr + + def __str__(self) -> str: + return f"Conflicting header information in field: {self._conflict_attr}" + + +class MetadataHeader(Generic[T]): + """ + Metadata header class wrapping mergeable header data. + + For this the data has to be able to be parsed into a Python dictionary. + """ + + def __init__(self, header: T) -> None: + """ + Initializes a MetadataHeader instance. + + Args: + header: Header data. + """ + self._header = header + + @property + def header(self) -> T: + """ + Returns inner header data. + """ + return self._header + + def __add__(self, other: MetadataHeader[T]) -> MetadataHeader[T]: + result_data = self.header.model_dump(mode="python") + other_data = other.header.model_dump(mode="python") + + for attr_name, own_value in result_data.items(): + other_value = other_data.get(attr_name, None) + if own_value == other_value: + continue + + if other_value is None: + continue + + if own_value is not None: + raise HeaderMergeConflictError(attr_name) + + result_data[attr_name] = other_value + + return MetadataHeader(type(self.header).model_validate(result_data)) + + def merge(self, other: MetadataHeader[T] | None) -> MetadataHeader[T]: + """ + Merge a metadata header with this header. + + Args: + other: Other metadata header. + + Returns: + Merged header of the this and the other header. + """ + if other is None: + return copy(self) + return self + other + + @staticmethod + def _get_attributes(value: object) -> list[tuple[str, Any]]: + members = inspect.getmembers(value) + return [ + (name, value) for name, value in members if (not name.startswith("_")) and (not inspect.ismethod(value)) + ] diff --git a/marimba/core/schemas/ifdo.py b/marimba/core/schemas/ifdo.py index 2acb77e..b793a39 100644 --- a/marimba/core/schemas/ifdo.py +++ b/marimba/core/schemas/ifdo.py @@ -33,9 +33,11 @@ import piexif from PIL import Image +from pydantic import BaseModel from rich.progress import Progress, SpinnerColumn, TaskID from marimba.core.schemas.base import BaseMetadata +from marimba.core.schemas.header.base import MetadataHeader from marimba.core.utils.log import get_logger from marimba.core.utils.metadata import yaml_saver from marimba.core.utils.rich import get_default_columns @@ -234,7 +236,10 @@ def _process_image_metadata( """Process image metadata items into a single ImageData.""" # Take the first iFDO metadata item item = ifdo_items[0] - image_data = cast(ImageData, item.image_data[0] if item.is_video else item.image_data) + image_data = cast( + ImageData, + item.image_data[0] if item.is_video else item.image_data, + ) # Set image-set-local-path for subdirectory files if path.parent != Path(): @@ -248,6 +253,7 @@ def create_dataset_metadata( dataset_name: str, root_dir: Path, items: dict[str, list["BaseMetadata"]], + metadata_header: MetadataHeader[BaseModel] | None = None, metadata_name: str | None = None, *, dry_run: bool = False, @@ -278,12 +284,20 @@ def create_dataset_metadata( image_data = cls._process_image_metadata(ifdo_items, path) image_set_items[filename] = image_data - ifdo = iFDO( - image_set_header=ImageSetHeader( + if metadata_header is not None and isinstance( + metadata_header.header, + ImageSetHeader, + ): + image_set_header = metadata_header.header + else: + image_set_header = ImageSetHeader( image_set_name=dataset_name, image_set_uuid=str(uuid.uuid4()), image_set_handle="", # TODO @: Populate from distribution target URL - ), + ) + + ifdo = iFDO( + image_set_header=image_set_header, image_set_items=image_set_items, ) @@ -295,12 +309,23 @@ def create_dataset_metadata( output_name = metadata_name if metadata_name.endswith(".ifdo") else f"{metadata_name}.ifdo" if not dry_run: - saver(root_dir, output_name, ifdo.model_dump(mode="json", by_alias=True, exclude_none=True)) + saver( + root_dir, + output_name, + ifdo.model_dump(mode="json", by_alias=True, exclude_none=True), + ) @classmethod def process_files( cls, - dataset_mapping: dict[Path, tuple[list[BaseMetadata], dict[str, Any] | None]], + dataset_mapping: dict[ + Path, + tuple[ + list[BaseMetadata], + dict[str, Any] | None, + MetadataHeader[BaseModel] | None, + ], + ], max_workers: int | None = None, logger: logging.Logger | None = None, *, @@ -317,12 +342,19 @@ def process_files( def process_file( cls: type[iFDOMetadata], thread_num: str, - item: tuple[Path, tuple[list[BaseMetadata], dict[str, Any] | None]], + item: tuple[ + Path, + tuple[ + list[BaseMetadata], + dict[str, Any] | None, + MetadataHeader[BaseModel] | None, + ], + ], logger: logging.Logger, progress: Progress | None = None, task: TaskID | None = None, ) -> None: - file_path, (metadata_items, ancillary_data) = item + file_path, (metadata_items, ancillary_data, header) = item # Formats with reliable EXIF support exif_supported_extensions = { @@ -397,8 +429,17 @@ def process_file( progress.advance(task) with Progress(SpinnerColumn(), *get_default_columns()) as progress: - task = progress.add_task("[green]Processing files with metadata (4/12)", total=len(dataset_mapping)) - process_file(cls, items=dataset_mapping.items(), progress=progress, task=task, logger=log) # type: ignore[call-arg] + task = progress.add_task( + "[green]Processing files with metadata (4/12)", + total=len(dataset_mapping), + ) + process_file( + cls, + items=dataset_mapping.items(), + progress=progress, + task=task, + logger=log, + ) # type: ignore[call-arg] @staticmethod def _inject_datetime(image_data: ImageData, exif_dict: dict[str, Any]) -> None: @@ -552,7 +593,11 @@ def _embed_exif_metadata( ancillary_data: Any ancillary data to include in the user comment. exif_dict: The EXIF metadata dictionary. """ - image_data_dict = image_data.model_dump(mode="json", by_alias=True, exclude_none=True) + image_data_dict = image_data.model_dump( + mode="json", + by_alias=True, + exclude_none=True, + ) user_comment_data = { "metadata": {"ifdo": image_data_dict, "ancillary": ancillary_data}, } diff --git a/marimba/core/utils/dataset.py b/marimba/core/utils/dataset.py index 80463fc..25d3938 100644 --- a/marimba/core/utils/dataset.py +++ b/marimba/core/utils/dataset.py @@ -9,35 +9,67 @@ from collections.abc import Callable from functools import reduce from pathlib import Path -from typing import Any, TypeAlias, TypeVar +from typing import TypeAlias, TypeVar +from pydantic import BaseModel + +from marimba.core.pipeline import PackageEntry from marimba.core.schemas.base import BaseMetadata +from marimba.core.schemas.header.base import MetadataHeader from marimba.core.utils.constants import MetadataGenerationLevelOptions +PIPELINE_METADATA_HEADER_TYPE: TypeAlias = dict[ + type[BaseMetadata], + MetadataHeader[BaseModel], +] + PIPELINE_DATASET_MAPPING_TYPE: TypeAlias = dict[ str, - dict[Path, tuple[Path, list[BaseMetadata] | None, dict[str, Any] | None]], + tuple[dict[Path, PackageEntry], PIPELINE_METADATA_HEADER_TYPE], ] DATASET_MAPPING_TYPE: TypeAlias = dict[str, PIPELINE_DATASET_MAPPING_TYPE] -PIPELINE_MAPPED_DATASET_ITEMS = dict[str, dict[str, list[BaseMetadata]]] +PIPELINE_MAPPED_DATASET_ITEMS = dict[ + str, + tuple[dict[str, list[BaseMetadata]], PIPELINE_METADATA_HEADER_TYPE], +] MAPPED_DATASET_ITEMS = dict[str, PIPELINE_MAPPED_DATASET_ITEMS] -PIPLINE_MAPPED_GROUPED_ITEMS = dict[str, dict[type[BaseMetadata], dict[str, list[BaseMetadata]]]] +PIPLINE_MAPPED_GROUPED_ITEMS = dict[ + str, + dict[ + type[BaseMetadata], + tuple[dict[str, list[BaseMetadata]], MetadataHeader[BaseModel] | None], + ], +] MAPPED_GROUPED_ITEMS = dict[str, PIPLINE_MAPPED_GROUPED_ITEMS] MAPPING_PROCESSOR_TYPE: TypeAlias = Callable[ - [dict[type[BaseMetadata], dict[str, list[BaseMetadata]]], str | None], + [ + dict[ + type[BaseMetadata], + tuple[dict[str, list[BaseMetadata]], MetadataHeader[BaseModel] | None], + ], + str | None, + ], + None, +] +DECORATOR_TYPE: TypeAlias = Callable[ + [MAPPING_PROCESSOR_TYPE, MAPPED_GROUPED_ITEMS], None, ] -DECORATOR_TYPE: TypeAlias = Callable[[MAPPING_PROCESSOR_TYPE, MAPPED_GROUPED_ITEMS], None] T = TypeVar("T") S = TypeVar("S") R = TypeVar("R") -def flatten_middle_mapping(mapping: dict[str, dict[str, dict[T, S]]]) -> dict[str, dict[T, S]]: +def flatten_middle_mapping( + mapping: dict[ + str, + dict[str, tuple[dict[T, S], dict[type[R], MetadataHeader[BaseModel]]]], + ], +) -> dict[str, dict[T, S]]: """ Flattens the middle level of a mapping structure. @@ -47,7 +79,48 @@ def flatten_middle_mapping(mapping: dict[str, dict[str, dict[T, S]]]) -> dict[st Returns: flattened mapping structure. """ - return {pipeline_name: flatten_mapping(pipeline_data) for pipeline_name, pipeline_data in mapping.items()} + return {pipeline_name: flatten_composite_mapping(pipeline_data) for pipeline_name, pipeline_data in mapping.items()} + + +def flatten_composite_mapping( + mapping: dict[str, tuple[dict[T, S], dict[type[R], MetadataHeader[BaseModel]]]], +) -> dict[T, S]: + """ + Flattens a mapping structure for one level. + + Args: + mapping: Mapping to flatten. + + Returns: + flattened mapping structure. + """ + return flatten_mapping( + {key: value for key, (value, _) in mapping.items()}, + ) + + +def flatten_header_mapping( + mapping: dict[str, dict[type[R], MetadataHeader[BaseModel]]], +) -> dict[type[R], MetadataHeader[BaseModel]]: + """ + Flattens a mapping structure for one level. + + Args: + mapping: Mapping to flatten. + + Returns: + flattened mapping structure. + """ + headers = list(mapping.values()) + types = {key for entry in headers for key in entry} + + return { + header_type: reduce( + MetadataHeader.__add__, + [entry[header_type] for entry in headers if entry.get(header_type, None) is not None], + ) + for header_type in types + } def flatten_mapping(mapping: dict[str, dict[T, S]]) -> dict[T, S]: @@ -63,7 +136,12 @@ def flatten_mapping(mapping: dict[str, dict[T, S]]) -> dict[T, S]: return reduce(lambda x, y: x | y, mapping.values(), {}) -def flatten_middle_list_mapping(mapping: dict[str, dict[str, dict[T, dict[S, R]]]]) -> dict[str, dict[T, dict[S, R]]]: +def flatten_middle_list_mapping( + mapping: dict[ + str, + dict[str, dict[T, tuple[dict[S, R], MetadataHeader[BaseModel] | None]]], + ], +) -> dict[str, dict[T, tuple[dict[S, R], MetadataHeader[BaseModel] | None]]]: """ Flattens the middle level of a mapping structure. @@ -76,7 +154,12 @@ def flatten_middle_list_mapping(mapping: dict[str, dict[str, dict[T, dict[S, R]] return {pipeline_name: flatten_list_mapping(pipeline_data) for pipeline_name, pipeline_data in mapping.items()} -def flatten_list_mapping(mapping: dict[str, dict[T, dict[S, R]]]) -> dict[T, dict[S, R]]: +def flatten_list_mapping( + mapping: dict[ + str, + dict[T, tuple[dict[S, R], MetadataHeader[BaseModel] | None]], + ], +) -> dict[T, tuple[dict[S, R], MetadataHeader[BaseModel] | None]]: """ Flattens the middle level of a mapping structure. @@ -86,15 +169,19 @@ def flatten_list_mapping(mapping: dict[str, dict[T, dict[S, R]]]) -> dict[T, dic Returns: flattened mapping structure. """ - output: defaultdict[T, dict[S, R]] = defaultdict(dict) + output: defaultdict[T, tuple[dict[S, R], MetadataHeader[BaseModel] | None]] = defaultdict(lambda: ({}, None)) for dictionary in mapping.values(): - for key, value in dictionary.items(): - output[key].update(value) - + for key, (entries, header) in dictionary.items(): + output[key][0].update(entries) + if header is not None: + output[key] = (output[key][0], header.merge(output[key][1])) return dict(output) -def execute_on_mapping(mapping: dict[str, dict[str, S]], executor: Callable[[S], T]) -> dict[str, dict[str, T]]: +def execute_on_mapping( + mapping: dict[str, dict[str, S]], + executor: Callable[[S], T], +) -> dict[str, dict[str, T]]: """ Executes a function on a mapping structure. diff --git a/marimba/core/wrappers/dataset.py b/marimba/core/wrappers/dataset.py index f37c17e..559af0d 100644 --- a/marimba/core/wrappers/dataset.py +++ b/marimba/core/wrappers/dataset.py @@ -44,14 +44,18 @@ from shutil import copy2, copytree, ignore_patterns from typing import Any +from pydantic import BaseModel from rich.progress import Progress, SpinnerColumn, TaskID +from marimba.core.pipeline import PackageEntry from marimba.core.schemas.base import BaseMetadata +from marimba.core.schemas.header.base import MetadataHeader from marimba.core.utils.constants import Operation from marimba.core.utils.dataset import ( DATASET_MAPPING_TYPE, DECORATOR_TYPE, MAPPED_DATASET_ITEMS, + MAPPED_GROUPED_ITEMS, execute_on_mapping, flatten_mapping, flatten_middle_mapping, @@ -422,13 +426,23 @@ def populate( self.logger.info( f'Started packaging dataset "{dataset_name}" containing {len(dataset_mapping)} {pipeline_label}', ) - reduced_dataset_mapping = flatten_middle_mapping(dataset_mapping) self.check_dataset_mapping(reduced_dataset_mapping, max_workers) - mapped_dataset_items = self._populate_files(dataset_mapping, operation, max_workers) - self._process_files_with_metadata(reduced_dataset_mapping, max_workers) - self.generate_metadata(dataset_name, mapped_dataset_items, mapping_processor_decorator, max_workers) - dataset_items = flatten_mapping(flatten_middle_mapping(mapped_dataset_items)) + mapped_dataset_items = self._populate_files( + dataset_mapping, + operation, + max_workers, + ) + self._process_files_with_metadata(dataset_mapping, max_workers) + self.generate_metadata( + dataset_name, + mapped_dataset_items, + mapping_processor_decorator, + max_workers, + ) + dataset_items = flatten_mapping( + flatten_middle_mapping(mapped_dataset_items), + ) self.generate_dataset_summary(dataset_items) # TODO @: Generate summary method currently does not use multithreading @@ -447,7 +461,16 @@ def _populate_files( # noqa: C901 dataset_mapping: DATASET_MAPPING_TYPE, operation: Operation, max_workers: int | None = None, - ) -> dict[str, dict[str, dict[str, list[BaseMetadata]]]]: + ) -> dict[ + str, + dict[ + str, + tuple[ + dict[str, list[BaseMetadata]], + dict[type[BaseMetadata], MetadataHeader[BaseModel]], + ], + ], + ]: """ Copy or move files from the dataset mapping to the destination directory. @@ -465,7 +488,7 @@ def process_file( self: DatasetWrapper, item: tuple[ Path, - tuple[Path, list[BaseMetadata] | None, dict[str, Any] | None], + PackageEntry, ], thread_num: str, pipeline_name: str, @@ -514,7 +537,16 @@ def process_file( if progress and tasks_by_pipeline_name: progress.advance(tasks_by_pipeline_name[pipeline_name]) - dataset_items: dict[str, dict[str, dict[str, list[BaseMetadata]]]] = defaultdict(lambda: defaultdict(dict)) + dataset_items: dict[ + str, + dict[ + str, + tuple[ + dict[str, list[BaseMetadata]], + dict[type[BaseMetadata], MetadataHeader[BaseModel]], + ], + ], + ] = defaultdict(lambda: defaultdict(lambda: ({}, {}))) with Progress(SpinnerColumn(), *get_default_columns()) as progress: tasks_by_pipeline_name = { pipeline_name: progress.add_task( @@ -526,20 +558,27 @@ def process_file( } for pipeline_name, pipeline_data_mapping in dataset_mapping.items(): - for collection_name, collection_data_mapping in pipeline_data_mapping.items(): + for ( + collection_name, + collection_data_mapping, + ) in pipeline_data_mapping.items(): self.logger.info( f'Started populating data for pipeline "{pipeline_name}"', ) process_file( self, - items=list(collection_data_mapping.items()), + items=list(collection_data_mapping[0].items()), pipeline_name=pipeline_name, operation=operation, - dataset_items=dataset_items[pipeline_name][collection_name], + dataset_items=dataset_items[pipeline_name][collection_name][0], logger=self.logger, progress=progress, tasks_by_pipeline_name=tasks_by_pipeline_name, ) # type: ignore[call-arg] + + for key, value in collection_data_mapping[1].items(): + dataset_items[pipeline_name][collection_name][1][key] = value + self.logger.info( f'Completed populating data for pipeline "{pipeline_name}"', ) @@ -548,10 +587,7 @@ def process_file( def _process_files_with_metadata( self, - dataset_mapping: dict[ - str, - dict[Path, tuple[Path, list[BaseMetadata] | None, dict[str, Any] | None]], - ], + dataset_mapping: DATASET_MAPPING_TYPE, max_workers: int | None = None, ) -> None: """ @@ -564,28 +600,39 @@ def _process_files_with_metadata( # Group files by metadata type files_by_type: dict[ type, - dict[Path, tuple[list[BaseMetadata], dict[str, Any] | None]], + dict[ + Path, + tuple[ + list[BaseMetadata], + dict[str, Any] | None, + MetadataHeader[BaseModel] | None, + ], + ], ] = {} for pipeline_name, pipeline_data_mapping in dataset_mapping.items(): - for ( - relative_dst, - metadata_items, - ancillary_data, - ) in pipeline_data_mapping.values(): - if not metadata_items: - continue - - dst = self.get_pipeline_data_dir(pipeline_name) / relative_dst - - # Group by the type of the first metadata item - metadata_type = type(metadata_items[0]) - if metadata_type not in files_by_type: - files_by_type[metadata_type] = {} - - files_by_type[metadata_type][dst] = (metadata_items, ancillary_data) + for collection_data, collection_header in pipeline_data_mapping.values(): + for ( + relative_dst, + metadata_items, + ancillary_data, + ) in collection_data.values(): + if not metadata_items: + continue + + dst = self.get_pipeline_data_dir(pipeline_name) / relative_dst + + # Group by the type of the first metadata item + metadata_type = type(metadata_items[0]) + if metadata_type not in files_by_type: + files_by_type[metadata_type] = {} + + files_by_type[metadata_type][dst] = ( + metadata_items, + ancillary_data, + collection_header.get(metadata_type, None), + ) - # Process files for each metadata type for metadata_type, files in files_by_type.items(): metadata_type.process_files( dataset_mapping=files, @@ -615,11 +662,17 @@ def _update_metadata_hashes( def _process_items( self, - dataset_items: dict[str, list[BaseMetadata]], + dataset_items: tuple[ + dict[str, list[BaseMetadata]], + dict[type[BaseMetadata], MetadataHeader[BaseModel]], + ], progress: Progress | None = None, task: TaskID | None = None, max_workers: int | None = None, - ) -> dict[str, list[BaseMetadata]]: + ) -> tuple[ + dict[str, list[BaseMetadata]], + dict[type[BaseMetadata], MetadataHeader[BaseModel]], + ]: """Process all items and return them sorted by path.""" @multithreaded(max_workers=max_workers) @@ -636,7 +689,7 @@ def process_items_with_hashes( self._update_metadata_hashes(file_path, metadata_items, progress, task) items = [ - (Path(self.root_dir) / file_path, metadata_items) for file_path, metadata_items in dataset_items.items() + (Path(self.root_dir) / file_path, metadata_items) for file_path, metadata_items in dataset_items[0].items() ] process_items_with_hashes( self, @@ -645,38 +698,59 @@ def process_items_with_hashes( progress=progress, task=task, ) # type: ignore[call-arg] - return OrderedDict(sorted(dataset_items.items(), key=lambda item: item[0])) + return ( + OrderedDict( + sorted(dataset_items[0].items(), key=lambda item: item[0]), + ), + dataset_items[1], + ) def _group_by_metadata_type( self, - items: dict[str, list[BaseMetadata]], - ) -> dict[type[BaseMetadata], dict[str, list[BaseMetadata]]]: + items: tuple[ + dict[str, list[BaseMetadata]], + dict[type[BaseMetadata], MetadataHeader[BaseModel]], + ], + ) -> dict[ + type[BaseMetadata], + tuple[dict[str, list[BaseMetadata]], MetadataHeader[BaseModel] | None], + ]: """Group dataset items by their metadata type.""" - grouped_items: dict[type[BaseMetadata], dict[str, list[BaseMetadata]]] = {} + grouped_items: dict[ + type[BaseMetadata], + tuple[dict[str, list[BaseMetadata]], MetadataHeader[BaseModel] | None], + ] = {} - for path, metadata_items in items.items(): + for path, metadata_items in items[0].items(): for metadata_item in metadata_items: metadata_type = type(metadata_item) if metadata_type not in grouped_items: - grouped_items[metadata_type] = {} - if path not in grouped_items[metadata_type]: - grouped_items[metadata_type][path] = [] - grouped_items[metadata_type][path].append(metadata_item) + grouped_items[metadata_type] = ( + {}, + items[1].get(metadata_type, None), + ) + if path not in grouped_items[metadata_type][0]: + grouped_items[metadata_type][0][path] = [] + grouped_items[metadata_type][0][path].append(metadata_item) return grouped_items def _create_metadata_files( self, dataset_name: str, - grouped_items: dict[type[BaseMetadata], dict[str, list[BaseMetadata]]], + grouped_items: dict[ + type[BaseMetadata], + tuple[dict[str, list[BaseMetadata]], MetadataHeader[BaseModel] | None], + ], collection_name: str | None = None, ) -> None: """Create metadata files for each type.""" - for metadata_type, type_items in grouped_items.items(): + for metadata_type, (type_items, type_header) in grouped_items.items(): metadata_type.create_dataset_metadata( dataset_name=dataset_name, root_dir=self.root_dir, items=type_items, + metadata_header=type_header, metadata_name=collection_name, dry_run=self.dry_run, saver_overwrite=self._metadata_saver_overwrite, @@ -684,12 +758,23 @@ def _create_metadata_files( def _log_metadata_summary( self, - grouped_items: dict[type[BaseMetadata], dict[str, list[BaseMetadata]]], + grouped_items: MAPPED_GROUPED_ITEMS, ) -> None: """Log a summary of the metadata generation.""" - type_counts = [f"{len(items)} {metadata_type.__name__}" for metadata_type, items in grouped_items.items()] + metadata_counts = [ + {metadata_type: len(items)} + for pipeline_group in grouped_items.values() + for collection_group in pipeline_group.values() + for metadata_type, (items, _) in collection_group.items() + ] + metadata_types = {keys for item in metadata_counts for keys in item} + type_counts = { + metadata_type: sum([x.get(metadata_type, 0) for x in metadata_counts]) for metadata_type in metadata_types + } + + type_counts_messages = [f"{counts} {metadata_type.__name__}" for metadata_type, counts in type_counts.items()] self.logger.info( - f"Generated metadata file containing {', '.join(type_counts)} items", + f"Generated metadata file containing {', '.join(type_counts_messages)} items", ) def generate_metadata( @@ -721,7 +806,14 @@ def generate_metadata( """ if progress: with Progress(SpinnerColumn(), *get_default_columns()) as progress_bar: - total_tasks = len(flatten_mapping(flatten_middle_mapping(dataset_items))) + 1 + total_tasks = ( + len( + flatten_mapping( + flatten_middle_mapping(dataset_items), + ), + ) + + 1 + ) task = progress_bar.add_task( "[green]Generating dataset metadata (5/12)", total=total_tasks, @@ -736,22 +828,43 @@ def generate_metadata( max_workers, ), ) - grouped_items = execute_on_mapping(processed_items, self._group_by_metadata_type) + grouped_items = execute_on_mapping( + processed_items, + self._group_by_metadata_type, + ) - progress_bar.update(task, description="[green]Writing dataset metadata (5/12)") + progress_bar.update( + task, + description="[green]Writing dataset metadata (5/12)", + ) for decorator in mapping_processor_decorator: - decorator(lambda x, y: self._create_metadata_files(dataset_name, x, y), grouped_items) + decorator( + lambda x, y: self._create_metadata_files(dataset_name, x, y), + grouped_items, + ) progress_bar.advance(task) else: - processed_items = execute_on_mapping(dataset_items, lambda x: self._process_items(x)) - grouped_items = execute_on_mapping(processed_items, self._group_by_metadata_type) + processed_items = execute_on_mapping( + dataset_items, + lambda x: self._process_items(x), + ) + grouped_items = execute_on_mapping( + processed_items, + self._group_by_metadata_type, + ) for decorator in mapping_processor_decorator: - decorator(lambda x, y: self._create_metadata_files(dataset_name, x, y), grouped_items) + decorator( + lambda x, y: self._create_metadata_files(dataset_name, x, y), + grouped_items, + ) - self._log_metadata_summary(flatten_mapping(flatten_middle_mapping(grouped_items))) + self._log_metadata_summary(grouped_items) - def _run_post_package_processors(self, post_package_processors: list[Callable[[Path], set[Path]]]) -> set[Path]: + def _run_post_package_processors( + self, + post_package_processors: list[Callable[[Path], set[Path]]], + ) -> set[Path]: changed_files = set() with Progress(SpinnerColumn(), *get_default_columns()) as progress_bar: task = progress_bar.add_task( @@ -764,7 +877,11 @@ def _run_post_package_processors(self, post_package_processors: list[Callable[[P return changed_files - def _update_manifest(self, changed_files: set[Path], max_worker: int | None = None) -> None: + def _update_manifest( + self, + changed_files: set[Path], + max_worker: int | None = None, + ) -> None: manifest = Manifest.load(self.manifest_path) manifest.update( changed_files, @@ -988,7 +1105,7 @@ def check_dataset_mapping( self, dataset_mapping: dict[ str, - dict[Path, tuple[Path, list[Any] | None, dict[str, Any] | None]], + dict[Path, PackageEntry], ], max_workers: int | None = None, ) -> None: @@ -1044,7 +1161,7 @@ def _verify_source_paths_exist( self, pipeline_data_mapping: dict[ Path, - tuple[Path, list[Any] | None, dict[str, Any] | None], + PackageEntry, ], progress: Progress, task: TaskID, @@ -1078,7 +1195,7 @@ def _verify_unique_source_resolutions( self, pipeline_data_mapping: dict[ Path, - tuple[Path, list[Any] | None, dict[str, Any] | None], + PackageEntry, ], progress: Progress, task: TaskID, @@ -1118,7 +1235,7 @@ def _verify_relative_destination_paths( self, pipeline_data_mapping: dict[ Path, - tuple[Path, list[Any] | None, dict[str, Any] | None], + PackageEntry, ], progress: Progress, task: TaskID, @@ -1154,7 +1271,7 @@ def _verify_no_destination_collisions( self, pipeline_data_mapping: dict[ Path, - tuple[Path, list[Any] | None, dict[str, Any] | None], + PackageEntry, ], progress: Progress, task: TaskID, diff --git a/marimba/core/wrappers/project.py b/marimba/core/wrappers/project.py index d8cb63f..44610dc 100644 --- a/marimba/core/wrappers/project.py +++ b/marimba/core/wrappers/project.py @@ -43,14 +43,16 @@ from pathlib import Path from typing import Any +from pydantic import BaseModel from rich.progress import Progress, SpinnerColumn from marimba.core.installer.pipeline_installer import PipelineInstaller from marimba.core.parallel.pipeline_loader import load_pipeline_instance -from marimba.core.pipeline import BasePipeline +from marimba.core.pipeline import BasePipeline, PackageEntry from marimba.core.schemas.base import BaseMetadata +from marimba.core.schemas.header.base import MetadataHeader from marimba.core.utils.constants import Operation -from marimba.core.utils.dataset import DECORATOR_TYPE +from marimba.core.utils.dataset import DATASET_MAPPING_TYPE, DECORATOR_TYPE from marimba.core.utils.log import LogMixin, get_file_handler from marimba.core.utils.paths import format_path_for_logging, remove_directory_tree from marimba.core.utils.prompt import prompt_schema @@ -247,7 +249,13 @@ def execute_packaging( dry_run: bool, log_string_prefix: str, merged_kwargs: dict[str, Any], -) -> tuple[dict[Path, tuple[Path, list[Any] | None, dict[str, Any] | None]], str]: +) -> tuple[ + tuple[ + dict[Path, PackageEntry], + dict[type[BaseMetadata], MetadataHeader[BaseModel]], + ], + str, +]: """ Package a pipeline's data for a given collection directory and configuration. @@ -1056,13 +1064,7 @@ def compose( extra_args: list[str] | None = None, max_workers: int | None = None, **kwargs: dict[str, Any], - ) -> dict[ - str, - dict[ - str, - dict[Path, tuple[Path, list[BaseMetadata] | None, dict[str, Any] | None]], - ], - ]: + ) -> DATASET_MAPPING_TYPE: """ Compose a dataset for given collections across multiple pipelines. @@ -1114,13 +1116,7 @@ def compose( f"{collection_label} {pretty_collections}{self._format_kwargs_message(merged_kwargs)}", ) - dataset_mapping: dict[ - str, - dict[ - str, - dict[Path, tuple[Path, list[BaseMetadata] | None, dict[str, Any] | None]], - ], - ] = defaultdict(lambda: defaultdict(dict)) + dataset_mapping: DATASET_MAPPING_TYPE = defaultdict(dict) with Progress(SpinnerColumn(), *get_default_columns()) as progress: total_task_length = len(self.pipeline_wrappers) * len(collection_wrappers) @@ -1154,7 +1150,8 @@ def compose( try: (pipeline_data_mapping, message) = future.result() self.logger.info(f"{log_string_prefix}{message}") - dataset_mapping[pipeline_name][collection_name].update(pipeline_data_mapping) + dataset_mapping[pipeline_name][collection_name] = pipeline_data_mapping + except Exception as e: raise ProjectWrapper.CompositionError( f"{log_string_prefix}" @@ -1174,13 +1171,7 @@ def compose( def create_dataset( self, dataset_name: str, - dataset_mapping: dict[ - str, - dict[ - str, - dict[Path, tuple[Path, list[BaseMetadata] | None, dict[str, Any] | None]], - ], - ], + dataset_mapping: DATASET_MAPPING_TYPE, metadata_mapping_processor_decorator: list[DECORATOR_TYPE], post_package_processors: list[Callable[[Path], set[Path]]], operation: Operation = Operation.copy, @@ -1261,7 +1252,10 @@ def create_dataset( self._dataset_wrappers[dataset_name] = dataset_wrapper return dataset_wrapper - def get_pipeline_post_processors(self, pipeline_names: list[str]) -> list[Callable[[Path], set[Path]]]: + def get_pipeline_post_processors( + self, + pipeline_names: list[str], + ) -> list[Callable[[Path], set[Path]]]: """ Gets the post processor methods for all given pipeline names. diff --git a/pyproject.toml b/pyproject.toml index 6e5a811..9d6452d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ authors = [ { name = "Chris Jackett", email = "chris.jackett@csiro.au" }, { name = "Kevin Barnard", email = "kbarnard@mbari.org" }, ] -requires-python = "~=3.10" +requires-python = "~=3.10.0" readme = "README.md" license = {file = "LICENSE"} dependencies = [ @@ -27,6 +27,7 @@ dependencies = [ "botocore>=1.38.2", "requests>=2.32.3", "uv>=0.7.8", + "pydantic>=2.11.3", ] classifiers = [ "Development Status :: 5 - Production/Stable", @@ -100,3 +101,6 @@ pillow = ["PIL"] PyYAML = ["yaml"] gitpython = ["git"] opencv-python-headless = ["cv2"] + +[tool.deptry.per_rule_ignores] +DEP002 = ["uv"] diff --git a/tests/core/schemas/header/__init__.py b/tests/core/schemas/header/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/core/schemas/header/test_base.py b/tests/core/schemas/header/test_base.py new file mode 100644 index 0000000..f37c0fa --- /dev/null +++ b/tests/core/schemas/header/test_base.py @@ -0,0 +1,42 @@ +from functools import reduce +from ifdo import ImageContext, ImageSetHeader +from pydantic import BaseModel +import pytest +from dataclasses import dataclass +from marimba.core.schemas.header import MetadataHeader, HeaderMergeConflictError + + +class TestHeader(BaseModel): + a: str | None + b: int | None + + +def test_valid_merge() -> None: + first_header = MetadataHeader(TestHeader(a=None, b=0)) + second_header = MetadataHeader(TestHeader(a="test", b=0)) + + result = reduce(MetadataHeader.__add__, [first_header, second_header]) + + assert result.header.a == "test" + assert result.header.b == 0 + + +def test_invalid_merge() -> None: + first_header = MetadataHeader(TestHeader(a="other", b=0)) + second_header = MetadataHeader(TestHeader(a="test", b=0)) + + with pytest.raises(Exception) as e: + assert isinstance(e, HeaderMergeConflictError) + assert e._conflict_attr == "a" + + first_header + second_header + + +def test_ifdo_Header() -> None: + first_header = MetadataHeader(ImageSetHeader(image_set_name="", image_set_uuid="", image_set_handle="")) + first_header.header.image_context = ImageContext(name="Test") + second_header = MetadataHeader(ImageSetHeader(image_set_name="", image_set_uuid="", image_set_handle="")) + + result = first_header + second_header + assert result.header.image_context is not None + assert result.header.image_context.name == "Test" diff --git a/tests/core/utils/test_dataset.py b/tests/core/utils/test_dataset.py index 27922f8..fbee8c6 100644 --- a/tests/core/utils/test_dataset.py +++ b/tests/core/utils/test_dataset.py @@ -1,18 +1,23 @@ from pathlib import Path +from pydantic import BaseModel import pytest from marimba.core.schemas.base import BaseMetadata from marimba.core.schemas.generic import GenericMetadata +from marimba.core.schemas.header.base import MetadataHeader +from marimba.core.schemas.ifdo import iFDOMetadata from marimba.core.utils.constants import MetadataGenerationLevelOptions from marimba.core.utils.dataset import ( DATASET_MAPPING_TYPE, _run_mapping_processor, _run_mapping_processor_per_pipeline, _run_mapping_processor_per_pipline_and_collection, + flatten_composite_mapping, get_mapping_processor_decorator, PIPELINE_DATASET_MAPPING_TYPE, flatten_middle_mapping, + flatten_header_mapping, MAPPED_DATASET_ITEMS, MAPPED_GROUPED_ITEMS, flatten_mapping, @@ -33,8 +38,25 @@ def test_get_mapping_processor_decorator(): get_mapping_processor_decorator("bla") # type: ignore +class TestModel(BaseModel): ... + + def test_flatten_middle_mapping(): - mapping = {"a": {"b": {"c": 1}, "d": {"e": 1}}} + mapping: dict[ + str, + dict[ + str, + tuple[ + dict[str, int], + dict[type[BaseMetadata], MetadataHeader[BaseModel]], + ], + ], + ] = { + "a": { + "b": ({"c": 1}, {iFDOMetadata: MetadataHeader(TestModel())}), + "d": ({"e": 1}, {}), + } + } assert flatten_middle_mapping(mapping) == {"a": {"c": 1, "e": 1}} @@ -43,37 +65,63 @@ def test_flatten_mapping(): assert flatten_mapping(mapping) == {"b": 1, "d": 1} +def test_flatten_composite_mapping(): + mapping: dict[str, dict[type[BaseMetadata], MetadataHeader[BaseModel]]] = { + "a": {iFDOMetadata: MetadataHeader(TestModel())} + } + + assert flatten_header_mapping(mapping) == mapping["a"] + + def test_run_mapping_processor(): def dataset_mapping_processor( - dataset_mapping: dict[type[BaseMetadata], dict[str, list[BaseMetadata]]], _: str | None + dataset_mapping: dict[ + type[BaseMetadata], + tuple[dict[str, list[BaseMetadata]], MetadataHeader[BaseModel] | None], + ], + _: str | None, ) -> None: - assert dataset_mapping == {GenericMetadata: {"a": [], "b": []}} + assert dataset_mapping == {GenericMetadata: ({"a": [], "b": []}, None)} dataset_mapping: MAPPED_GROUPED_ITEMS = { - "pipeline": {"collection": {GenericMetadata: {"a": []}}, "another": {GenericMetadata: {"b": []}}} + "pipeline": { + "collection": {GenericMetadata: ({"a": []}, None)}, + "another": {GenericMetadata: ({"b": []}, None)}, + } } _run_mapping_processor(dataset_mapping_processor, dataset_mapping) def test_run_mapping_processor_per_pipeline(): def dataset_mapping_processor( - dataset_mapping: dict[type[BaseMetadata], dict[str, list[BaseMetadata]]], collection_name: str | None + dataset_mapping: dict[ + type[BaseMetadata], + tuple[dict[str, list[BaseMetadata]], MetadataHeader[BaseModel] | None], + ], + collection_name: str | None, ) -> None: - assert dataset_mapping == {GenericMetadata: {"a": [], "b": []}} + assert dataset_mapping == {GenericMetadata: ({"a": [], "b": []}, None)} assert collection_name == "pipeline" dataset_mapping: MAPPED_GROUPED_ITEMS = { - "pipeline": {"collection": {GenericMetadata: {"a": []}}, "another": {GenericMetadata: {"b": []}}} + "pipeline": { + "collection": {GenericMetadata: ({"a": []}, None)}, + "another": {GenericMetadata: ({"b": []}, None)}, + } } _run_mapping_processor_per_pipeline(dataset_mapping_processor, dataset_mapping) def test_run_mapping_processor_per_pipline_and_collection(): def dataset_mapping_processor( - dataset_mapping: dict[type[BaseMetadata], dict[str, list[BaseMetadata]]], collection_name: str | None + dataset_mapping: dict[ + type[BaseMetadata], + tuple[dict[str, list[BaseMetadata]], MetadataHeader[BaseModel] | None], + ], + collection_name: str | None, ) -> None: - assert dataset_mapping == {GenericMetadata: {"a": []}} + assert dataset_mapping == {GenericMetadata: ({"a": []}, None)} assert collection_name == "collection.pipeline" - dataset_mapping: MAPPED_GROUPED_ITEMS = {"pipeline": {"collection": {GenericMetadata: {"a": []}}}} + dataset_mapping: MAPPED_GROUPED_ITEMS = {"pipeline": {"collection": {GenericMetadata: ({"a": []}, None)}}} _run_mapping_processor_per_pipline_and_collection(dataset_mapping_processor, dataset_mapping)