Skip to content

fix: yield partitions for unique stream slices in StreamSlicerPartitionGenerator #508

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 26, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.

from typing import Any, Iterable, Mapping, Optional
from typing import Any, Hashable, Iterable, Mapping, Optional

from airbyte_cdk.sources.declarative.retrievers import Retriever
from airbyte_cdk.sources.message import MessageRepository
Expand Down Expand Up @@ -89,5 +89,21 @@ def __init__(
self._stream_slicer = stream_slicer

def generate(self) -> Iterable[Partition]:
# Yield partitions for unique stream slices, avoiding duplicates
seen_slices: set[Hashable] = set()
for stream_slice in self._stream_slicer.stream_slices():
slice_key = self._make_hashable(stream_slice)
if slice_key in seen_slices:
continue
seen_slices.add(slice_key)
yield self._partition_factory.create(stream_slice)

@staticmethod
def _make_hashable(obj: Any) -> Any:
if isinstance(obj, dict):
return frozenset(
(k, StreamSlicerPartitionGenerator._make_hashable(v)) for k, v in obj.items()
)
if isinstance(obj, list):
return tuple(StreamSlicerPartitionGenerator._make_hashable(i) for i in obj)
return obj
Loading