diff --git a/airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py b/airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py index 94ee03a56..25ff4c19c 100644 --- a/airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py +++ b/airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py @@ -1,6 +1,6 @@ # Copyright (c) 2024 Airbyte, Inc., all rights reserved. -from typing import Any, Iterable, Mapping, Optional +from typing import Any, Hashable, Iterable, Mapping, Optional from airbyte_cdk.sources.declarative.retrievers import Retriever from airbyte_cdk.sources.message import MessageRepository @@ -89,5 +89,21 @@ def __init__( self._stream_slicer = stream_slicer def generate(self) -> Iterable[Partition]: + # Yield partitions for unique stream slices, avoiding duplicates + seen_slices: set[Hashable] = set() for stream_slice in self._stream_slicer.stream_slices(): + slice_key = self._make_hashable(stream_slice) + if slice_key in seen_slices: + continue + seen_slices.add(slice_key) yield self._partition_factory.create(stream_slice) + + @staticmethod + def _make_hashable(obj: Any) -> Any: + if isinstance(obj, dict): + return frozenset( + (k, StreamSlicerPartitionGenerator._make_hashable(v)) for k, v in obj.items() + ) + if isinstance(obj, list): + return tuple(StreamSlicerPartitionGenerator._make_hashable(i) for i in obj) + return obj