|
1 | 1 | # Copyright (c) 2024 Airbyte, Inc., all rights reserved.
|
2 | 2 |
|
3 |
| -from typing import Any, Iterable, Mapping, Optional |
| 3 | +from typing import Any, Hashable, Iterable, Mapping, Optional |
4 | 4 |
|
5 | 5 | from airbyte_cdk.sources.declarative.retrievers import Retriever
|
6 | 6 | from airbyte_cdk.sources.message import MessageRepository
|
@@ -89,5 +89,21 @@ def __init__(
|
89 | 89 | self._stream_slicer = stream_slicer
|
90 | 90 |
|
91 | 91 | def generate(self) -> Iterable[Partition]:
|
| 92 | + # Yield partitions for unique stream slices, avoiding duplicates |
| 93 | + seen_slices: set[Hashable] = set() |
92 | 94 | for stream_slice in self._stream_slicer.stream_slices():
|
| 95 | + slice_key = self._make_hashable(stream_slice) |
| 96 | + if slice_key in seen_slices: |
| 97 | + continue |
| 98 | + seen_slices.add(slice_key) |
93 | 99 | yield self._partition_factory.create(stream_slice)
|
| 100 | + |
| 101 | + @staticmethod |
| 102 | + def _make_hashable(obj: Any) -> Any: |
| 103 | + if isinstance(obj, dict): |
| 104 | + return frozenset( |
| 105 | + (k, StreamSlicerPartitionGenerator._make_hashable(v)) for k, v in obj.items() |
| 106 | + ) |
| 107 | + if isinstance(obj, list): |
| 108 | + return tuple(StreamSlicerPartitionGenerator._make_hashable(i) for i in obj) |
| 109 | + return obj |
0 commit comments