Skip to content

Commit 533b70a

Browse files
artem1205brianjlai
andauthored
feat: add IncrementingCountCursor (#346)
Signed-off-by: Artem Inzhyyants <[email protected]> Co-authored-by: brianjlai <[email protected]>
1 parent 406542d commit 533b70a

8 files changed

+398
-11
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

+28-7
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@
3131
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
3232
DatetimeBasedCursor as DatetimeBasedCursorModel,
3333
)
34+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
35+
IncrementingCountCursor as IncrementingCountCursorModel,
36+
)
3437
from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
3538
ModelToComponentFactory,
3639
)
@@ -222,7 +225,7 @@ def _group_streams(
222225
and not incremental_sync_component_definition
223226
)
224227

225-
if self._is_datetime_incremental_without_partition_routing(
228+
if self._is_concurrent_cursor_incremental_without_partition_routing(
226229
declarative_stream, incremental_sync_component_definition
227230
):
228231
stream_state = self._connector_state_manager.get_stream_state(
@@ -254,15 +257,26 @@ def _group_streams(
254257
stream_slicer=declarative_stream.retriever.stream_slicer,
255258
)
256259
else:
257-
cursor = (
258-
self._constructor.create_concurrent_cursor_from_datetime_based_cursor(
260+
if (
261+
incremental_sync_component_definition
262+
and incremental_sync_component_definition.get("type")
263+
== IncrementingCountCursorModel.__name__
264+
):
265+
cursor = self._constructor.create_concurrent_cursor_from_incrementing_count_cursor(
266+
model_type=IncrementingCountCursorModel,
267+
component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above
268+
stream_name=declarative_stream.name,
269+
stream_namespace=declarative_stream.namespace,
270+
config=config or {},
271+
)
272+
else:
273+
cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor(
259274
model_type=DatetimeBasedCursorModel,
260275
component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above
261276
stream_name=declarative_stream.name,
262277
stream_namespace=declarative_stream.namespace,
263278
config=config or {},
264279
)
265-
)
266280
partition_generator = StreamSlicerPartitionGenerator(
267281
partition_factory=DeclarativePartitionFactory(
268282
declarative_stream.name,
@@ -389,19 +403,26 @@ def _group_streams(
389403

390404
return concurrent_streams, synchronous_streams
391405

392-
def _is_datetime_incremental_without_partition_routing(
406+
def _is_concurrent_cursor_incremental_without_partition_routing(
393407
self,
394408
declarative_stream: DeclarativeStream,
395409
incremental_sync_component_definition: Mapping[str, Any] | None,
396410
) -> bool:
397411
return (
398412
incremental_sync_component_definition is not None
399413
and bool(incremental_sync_component_definition)
400-
and incremental_sync_component_definition.get("type", "")
401-
== DatetimeBasedCursorModel.__name__
414+
and (
415+
incremental_sync_component_definition.get("type", "")
416+
in (DatetimeBasedCursorModel.__name__, IncrementingCountCursorModel.__name__)
417+
)
402418
and hasattr(declarative_stream.retriever, "stream_slicer")
403419
and (
404420
isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor)
421+
# IncrementingCountCursorModel is hardcoded to be of type DatetimeBasedCursor
422+
# add isintance check here if we want to create a Declarative IncrementingCountCursor
423+
# or isinstance(
424+
# declarative_stream.retriever.stream_slicer, IncrementingCountCursor
425+
# )
405426
or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter)
406427
)
407428
)

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

+39
Original file line numberDiff line numberDiff line change
@@ -777,6 +777,44 @@ definitions:
777777
type:
778778
type: string
779779
enum: [LegacyToPerPartitionStateMigration]
780+
IncrementingCountCursor:
781+
title: Incrementing Count Cursor
782+
description: Cursor that allows for incremental sync according to a continuously increasing integer.
783+
type: object
784+
required:
785+
- type
786+
- cursor_field
787+
properties:
788+
type:
789+
type: string
790+
enum: [IncrementingCountCursor]
791+
cursor_field:
792+
title: Cursor Field
793+
description: The location of the value on a record that will be used as a bookmark during sync. To ensure no data loss, the API must return records in ascending order based on the cursor field. Nested fields are not supported, so the field must be at the top level of the record. You can use a combination of Add Field and Remove Field transformations to move the nested field to the top.
794+
type: string
795+
interpolation_context:
796+
- config
797+
examples:
798+
- "created_at"
799+
- "{{ config['record_cursor'] }}"
800+
start_value:
801+
title: Start Value
802+
description: The value that determines the earliest record that should be synced.
803+
anyOf:
804+
- type: string
805+
- type: integer
806+
interpolation_context:
807+
- config
808+
examples:
809+
- 0
810+
- "{{ config['start_value'] }}"
811+
start_value_option:
812+
title: Inject Start Value Into Outgoing HTTP Request
813+
description: Optionally configures how the start value will be sent in requests to the source API.
814+
"$ref": "#/definitions/RequestOption"
815+
$parameters:
816+
type: object
817+
additionalProperties: true
780818
DatetimeBasedCursor:
781819
title: Datetime Based Cursor
782820
description: Cursor to provide incremental capabilities over datetime.
@@ -1319,6 +1357,7 @@ definitions:
13191357
anyOf:
13201358
- "$ref": "#/definitions/CustomIncrementalSync"
13211359
- "$ref": "#/definitions/DatetimeBasedCursor"
1360+
- "$ref": "#/definitions/IncrementingCountCursor"
13221361
name:
13231362
title: Name
13241363
description: The stream name.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

+25-1
Original file line numberDiff line numberDiff line change
@@ -1508,6 +1508,28 @@ class AuthFlow(BaseModel):
15081508
oauth_config_specification: Optional[OAuthConfigSpecification] = None
15091509

15101510

1511+
class IncrementingCountCursor(BaseModel):
1512+
type: Literal["IncrementingCountCursor"]
1513+
cursor_field: str = Field(
1514+
...,
1515+
description="The location of the value on a record that will be used as a bookmark during sync. To ensure no data loss, the API must return records in ascending order based on the cursor field. Nested fields are not supported, so the field must be at the top level of the record. You can use a combination of Add Field and Remove Field transformations to move the nested field to the top.",
1516+
examples=["created_at", "{{ config['record_cursor'] }}"],
1517+
title="Cursor Field",
1518+
)
1519+
start_value: Optional[Union[str, int]] = Field(
1520+
None,
1521+
description="The value that determines the earliest record that should be synced.",
1522+
examples=[0, "{{ config['start_value'] }}"],
1523+
title="Start Value",
1524+
)
1525+
start_value_option: Optional[RequestOption] = Field(
1526+
None,
1527+
description="Optionally configures how the start value will be sent in requests to the source API.",
1528+
title="Inject Start Value Into Outgoing HTTP Request",
1529+
)
1530+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
1531+
1532+
15111533
class DatetimeBasedCursor(BaseModel):
15121534
type: Literal["DatetimeBasedCursor"]
15131535
clamping: Optional[Clamping] = Field(
@@ -1948,7 +1970,9 @@ class Config:
19481970
description="Component used to coordinate how records are extracted across stream slices and request pages.",
19491971
title="Retriever",
19501972
)
1951-
incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = Field(
1973+
incremental_sync: Optional[
1974+
Union[CustomIncrementalSync, DatetimeBasedCursor, IncrementingCountCursor]
1975+
] = Field(
19521976
None,
19531977
description="Component used to fetch data incrementally based on a time field in the data.",
19541978
title="Incremental Sync",

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

+112
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,9 @@
245245
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
246246
HttpResponseFilter as HttpResponseFilterModel,
247247
)
248+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
249+
IncrementingCountCursor as IncrementingCountCursorModel,
250+
)
248251
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
249252
InlineSchemaLoader as InlineSchemaLoaderModel,
250253
)
@@ -496,6 +499,9 @@
496499
CustomFormatConcurrentStreamStateConverter,
497500
DateTimeStreamStateConverter,
498501
)
502+
from airbyte_cdk.sources.streams.concurrent.state_converters.incrementing_count_stream_state_converter import (
503+
IncrementingCountStreamStateConverter,
504+
)
499505
from airbyte_cdk.sources.streams.http.error_handlers.response_models import ResponseAction
500506
from airbyte_cdk.sources.types import Config
501507
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
@@ -584,6 +590,7 @@ def _init_mappings(self) -> None:
584590
FlattenFieldsModel: self.create_flatten_fields,
585591
DpathFlattenFieldsModel: self.create_dpath_flatten_fields,
586592
IterableDecoderModel: self.create_iterable_decoder,
593+
IncrementingCountCursorModel: self.create_incrementing_count_cursor,
587594
XmlDecoderModel: self.create_xml_decoder,
588595
JsonFileSchemaLoaderModel: self.create_json_file_schema_loader,
589596
DynamicSchemaLoaderModel: self.create_dynamic_schema_loader,
@@ -1189,6 +1196,70 @@ def create_concurrent_cursor_from_datetime_based_cursor(
11891196
clamping_strategy=clamping_strategy,
11901197
)
11911198

1199+
def create_concurrent_cursor_from_incrementing_count_cursor(
1200+
self,
1201+
model_type: Type[BaseModel],
1202+
component_definition: ComponentDefinition,
1203+
stream_name: str,
1204+
stream_namespace: Optional[str],
1205+
config: Config,
1206+
message_repository: Optional[MessageRepository] = None,
1207+
**kwargs: Any,
1208+
) -> ConcurrentCursor:
1209+
# Per-partition incremental streams can dynamically create child cursors which will pass their current
1210+
# state via the stream_state keyword argument. Incremental syncs without parent streams use the
1211+
# incoming state and connector_state_manager that is initialized when the component factory is created
1212+
stream_state = (
1213+
self._connector_state_manager.get_stream_state(stream_name, stream_namespace)
1214+
if "stream_state" not in kwargs
1215+
else kwargs["stream_state"]
1216+
)
1217+
1218+
component_type = component_definition.get("type")
1219+
if component_definition.get("type") != model_type.__name__:
1220+
raise ValueError(
1221+
f"Expected manifest component of type {model_type.__name__}, but received {component_type} instead"
1222+
)
1223+
1224+
incrementing_count_cursor_model = model_type.parse_obj(component_definition)
1225+
1226+
if not isinstance(incrementing_count_cursor_model, IncrementingCountCursorModel):
1227+
raise ValueError(
1228+
f"Expected {model_type.__name__} component, but received {incrementing_count_cursor_model.__class__.__name__}"
1229+
)
1230+
1231+
interpolated_start_value = (
1232+
InterpolatedString.create(
1233+
incrementing_count_cursor_model.start_value, # type: ignore
1234+
parameters=incrementing_count_cursor_model.parameters or {},
1235+
)
1236+
if incrementing_count_cursor_model.start_value
1237+
else 0
1238+
)
1239+
1240+
interpolated_cursor_field = InterpolatedString.create(
1241+
incrementing_count_cursor_model.cursor_field,
1242+
parameters=incrementing_count_cursor_model.parameters or {},
1243+
)
1244+
cursor_field = CursorField(interpolated_cursor_field.eval(config=config))
1245+
1246+
connector_state_converter = IncrementingCountStreamStateConverter(
1247+
is_sequential_state=True, # ConcurrentPerPartitionCursor only works with sequential state
1248+
)
1249+
1250+
return ConcurrentCursor(
1251+
stream_name=stream_name,
1252+
stream_namespace=stream_namespace,
1253+
stream_state=stream_state,
1254+
message_repository=message_repository or self._message_repository,
1255+
connector_state_manager=self._connector_state_manager,
1256+
connector_state_converter=connector_state_converter,
1257+
cursor_field=cursor_field,
1258+
slice_boundary_fields=None,
1259+
start=interpolated_start_value, # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice
1260+
end_provider=connector_state_converter.get_end_provider(), # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice
1261+
)
1262+
11921263
def _assemble_weekday(self, weekday: str) -> Weekday:
11931264
match weekday:
11941265
case "MONDAY":
@@ -1622,6 +1693,31 @@ def create_declarative_stream(
16221693
config=config,
16231694
parameters=model.parameters or {},
16241695
)
1696+
elif model.incremental_sync and isinstance(
1697+
model.incremental_sync, IncrementingCountCursorModel
1698+
):
1699+
cursor_model: IncrementingCountCursorModel = model.incremental_sync # type: ignore
1700+
1701+
start_time_option = (
1702+
self._create_component_from_model(
1703+
cursor_model.start_value_option, # type: ignore # mypy still thinks cursor_model of type DatetimeBasedCursor
1704+
config,
1705+
parameters=cursor_model.parameters or {},
1706+
)
1707+
if cursor_model.start_value_option # type: ignore # mypy still thinks cursor_model of type DatetimeBasedCursor
1708+
else None
1709+
)
1710+
1711+
# The concurrent engine defaults the start/end fields on the slice to "start" and "end", but
1712+
# the default DatetimeBasedRequestOptionsProvider() sets them to start_time/end_time
1713+
partition_field_start = "start"
1714+
1715+
request_options_provider = DatetimeBasedRequestOptionsProvider(
1716+
start_time_option=start_time_option,
1717+
partition_field_start=partition_field_start,
1718+
config=config,
1719+
parameters=model.parameters or {},
1720+
)
16251721
else:
16261722
request_options_provider = None
16271723

@@ -2111,6 +2207,22 @@ def create_gzip_decoder(
21112207
stream_response=False if self._emit_connector_builder_messages else True,
21122208
)
21132209

2210+
@staticmethod
2211+
def create_incrementing_count_cursor(
2212+
model: IncrementingCountCursorModel, config: Config, **kwargs: Any
2213+
) -> DatetimeBasedCursor:
2214+
# This should not actually get used anywhere at runtime, but needed to add this to pass checks since
2215+
# we still parse models into components. The issue is that there's no runtime implementation of a
2216+
# IncrementingCountCursor.
2217+
# A known and expected issue with this stub is running a check with the declared IncrementingCountCursor because it is run without ConcurrentCursor.
2218+
return DatetimeBasedCursor(
2219+
cursor_field=model.cursor_field,
2220+
datetime_format="%Y-%m-%d",
2221+
start_datetime="2024-12-12",
2222+
config=config,
2223+
parameters={},
2224+
)
2225+
21142226
@staticmethod
21152227
def create_iterable_decoder(
21162228
model: IterableDecoderModel, config: Config, **kwargs: Any

airbyte_cdk/sources/streams/concurrent/state_converters/abstract_stream_state_converter.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,15 @@
44

55
from abc import ABC, abstractmethod
66
from enum import Enum
7-
from typing import TYPE_CHECKING, Any, List, MutableMapping, Optional, Tuple
7+
from typing import TYPE_CHECKING, Any, Callable, List, MutableMapping, Optional, Tuple
88

99
if TYPE_CHECKING:
1010
from airbyte_cdk.sources.streams.concurrent.cursor import CursorField
1111

1212

1313
class ConcurrencyCompatibleStateType(Enum):
1414
date_range = "date-range"
15+
integer = "integer"
1516

1617

1718
class AbstractStreamStateConverter(ABC):

0 commit comments

Comments
 (0)