From e68f36f49fdd1fa812c45750721e56ea43c662c7 Mon Sep 17 00:00:00 2001 From: pnilan Date: Tue, 10 Dec 2024 15:59:16 -0800 Subject: [PATCH 01/22] initial JsonParser component --- .../declarative_component_schema.yaml | 11 ++ .../declarative/decoders/parsers/__init__.py | 7 ++ .../declarative/decoders/parsers/parsers.py | 24 ++++ .../models/declarative_component_schema.py | 103 +++++++++++------- .../parsers/model_to_component_factory.py | 11 ++ 5 files changed, 119 insertions(+), 37 deletions(-) create mode 100644 airbyte_cdk/sources/declarative/decoders/parsers/__init__.py create mode 100644 airbyte_cdk/sources/declarative/decoders/parsers/parsers.py diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 48c8f8c05..59c137a0a 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -1810,6 +1810,17 @@ definitions: $parameters: type: object additionalProperties: true + JsonParser: + title: JsonParser + description: Parser used for parsing str, bytes, or bytearray data and returning data in a dictionary format. + type: object + additionalProperties: true + required: + - type + properties: + type: + type: string + enum: [JsonParser] ListPartitionRouter: title: List Partition Router description: A Partition router that specifies a list of attributes where each attribute describes a portion of the complete data set for a stream. During a sync, each value is iterated over and can be used as input to outbound API requests. diff --git a/airbyte_cdk/sources/declarative/decoders/parsers/__init__.py b/airbyte_cdk/sources/declarative/decoders/parsers/__init__.py new file mode 100644 index 000000000..53bd57c9d --- /dev/null +++ b/airbyte_cdk/sources/declarative/decoders/parsers/__init__.py @@ -0,0 +1,7 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +from airbyte_cdk.sources.declarative.decoders.parsers.parsers import Parser, JsonParser + +__all__ = ["Parser", "JsonParser"] diff --git a/airbyte_cdk/sources/declarative/decoders/parsers/parsers.py b/airbyte_cdk/sources/declarative/decoders/parsers/parsers.py new file mode 100644 index 000000000..e98454cd6 --- /dev/null +++ b/airbyte_cdk/sources/declarative/decoders/parsers/parsers.py @@ -0,0 +1,24 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +import json +from abc import abstractmethod +from dataclasses import dataclass +from typing import Any, Generator, MutableMapping, Union + + +@dataclass +class Parser: + """ + Parser strategy to convert byte data into a MutableMapping[str, Any]. + """ + + @abstractmethod + def parse(self, data: bytes) -> Generator[MutableMapping[str, Any], None, None]: + pass + + +class JsonParser(Parser): + def parse(self, data: Union[str, bytes, bytearray]) -> Generator[MutableMapping[str, Any], None, None]: + yield json.loads(data) diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 173e045a0..48b6abbab 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -528,7 +528,9 @@ class OAuthAuthenticator(BaseModel): scopes: Optional[List[str]] = Field( None, description="List of scopes that should be granted to the access token.", - examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], + examples=[ + ["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"] + ], title="Scopes", ) token_expiry_date: Optional[str] = Field( @@ -715,6 +717,13 @@ class Config: parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class JsonParser(BaseModel): + class Config: + extra = Extra.allow + + type: Literal["JsonParser"] + + class MinMaxDatetime(BaseModel): type: Literal["MinMaxDatetime"] datetime: str = Field( @@ -822,13 +831,13 @@ class Config: ) extract_output: List[str] = Field( ..., - description="The DeclarativeOAuth Specific list of strings to indicate which keys should be extracted and returned back to the input config. ", + description="The DeclarativeOAuth Specific list of strings to indicate which keys should be extracted and returned back to the input config.", examples=[{"extract_output": ["access_token", "refresh_token", "other_field"]}], title="DeclarativeOAuth Extract Output", ) state: Optional[State] = Field( None, - description="The DeclarativeOAuth Specific object to provide the criteria of how the `state` query param should be constructed,\nincluding length and complexity. ", + description="The DeclarativeOAuth Specific object to provide the criteria of how the `state` query param should be constructed,\nincluding length and complexity.", examples=[{"state": {"min": 7, "max": 128}}], title="(Optional) DeclarativeOAuth Configurable State Query Param", ) @@ -852,13 +861,13 @@ class Config: ) state_key: Optional[str] = Field( None, - description="The DeclarativeOAuth Specific optional override to provide the custom `state` key name, if required by data-provider. ", + description="The DeclarativeOAuth Specific optional override to provide the custom `state` key name, if required by data-provider.", examples=[{"state_key": "my_custom_state_key_key_name"}], title="(Optional) DeclarativeOAuth State Key Override", ) auth_code_key: Optional[str] = Field( None, - description="The DeclarativeOAuth Specific optional override to provide the custom `code` key name to something like `auth_code` or `custom_auth_code`, if required by data-provider. ", + description="The DeclarativeOAuth Specific optional override to provide the custom `code` key name to something like `auth_code` or `custom_auth_code`, if required by data-provider.", examples=[{"auth_code_key": "my_custom_auth_code_key_name"}], title="(Optional) DeclarativeOAuth Auth Code Key Override", ) @@ -874,24 +883,28 @@ class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( - None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", - examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, - { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], - } - }, - ], - title="OAuth user input", + oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = ( + Field( + None, + description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", + examples=[ + {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + { + "app_id": { + "type": "string", + "path_in_connector_config": ["info", "app_id"], + } + }, + ], + title="OAuth user input", + ) ) - oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field( - None, - description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{my_var}`.\n- The nested resolution variables like `{{my_nested_var}}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {base64Encoder:{my_var_a}:{my_var_b}}\n + base64Decorer - decode from `base64` encoded string, {base64Decoder:{my_string_variable_or_string_value}}\n + urlEncoder - encode the input string to URL-like format, {urlEncoder:https://test.host.com/endpoint}\n + urlDecorer - decode the input url-encoded string into text format, {urlDecoder:https%3A%2F%2Fairbyte.io}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {codeChallengeS256:{state_value}}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{client_id_key}={{client_id_key}}&{redirect_uri_key}={urlEncoder:{{redirect_uri_key}}}&{state_key}={{state_key}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{auth_code_key}": "{{auth_code_key}}",\n "{client_id_key}": "{{client_id_key}}",\n "{client_secret_key}": "{{client_secret_key}}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', - title="DeclarativeOAuth Connector Specification", + oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = ( + Field( + None, + description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{my_var}`.\n- The nested resolution variables like `{{my_nested_var}}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {base64Encoder:{my_var_a}:{my_var_b}}\n + base64Decorer - decode from `base64` encoded string, {base64Decoder:{my_string_variable_or_string_value}}\n + urlEncoder - encode the input string to URL-like format, {urlEncoder:https://test.host.com/endpoint}\n + urlDecorer - decode the input url-encoded string into text format, {urlDecoder:https%3A%2F%2Fairbyte.io}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {codeChallengeS256:{state_value}}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{client_id_key}={{client_id_key}}&{redirect_uri_key}={urlEncoder:{{redirect_uri_key}}}&{state_key}={{state_key}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{auth_code_key}": "{{auth_code_key}}",\n "{client_id_key}": "{{client_id_key}}",\n "{client_secret_key}": "{{client_secret_key}}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', + title="DeclarativeOAuth Connector Specification", + ) ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, @@ -909,7 +922,9 @@ class Config: complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], + examples=[ + {"client_id": {"type": "string"}, "client_secret": {"type": "string"}} + ], title="OAuth input specification", ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( @@ -1600,21 +1615,25 @@ class Config: description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = Field( - None, - description="Component used to fetch data incrementally based on a time field in the data.", - title="Incremental Sync", + incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = ( + Field( + None, + description="Component used to fetch data incrementally based on a time field in the data.", + title="Incremental Sync", + ) + ) + name: Optional[str] = Field( + "", description="The stream name.", example=["Users"], title="Name" ) - name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") primary_key: Optional[PrimaryKey] = Field( "", description="The primary key of the stream.", title="Primary Key" ) - schema_loader: Optional[Union[InlineSchemaLoader, JsonFileSchemaLoader, CustomSchemaLoader]] = ( - Field( - None, - description="Component used to retrieve the schema for the current stream.", - title="Schema Loader", - ) + schema_loader: Optional[ + Union[InlineSchemaLoader, JsonFileSchemaLoader, CustomSchemaLoader] + ] = Field( + None, + description="Component used to retrieve the schema for the current stream.", + title="Schema Loader", ) transformations: Optional[ List[Union[AddFields, CustomTransformation, RemoveFields, KeysToLower]] @@ -1832,7 +1851,11 @@ class SimpleRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], + List[ + Union[ + CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter + ] + ], ] ] = Field( [], @@ -1874,7 +1897,9 @@ class AsyncRetriever(BaseModel): ) download_extractor: Optional[ Union[CustomRecordExtractor, DpathExtractor, ResponseToFileExtractor] - ] = Field(None, description="Responsible for fetching the records from provided urls.") + ] = Field( + None, description="Responsible for fetching the records from provided urls." + ) creation_requester: Union[CustomRequester, HttpRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.", @@ -1904,7 +1929,11 @@ class AsyncRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], + List[ + Union[ + CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter + ] + ], ] ] = Field( [], diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 9a405f8f2..6eefec528 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -67,6 +67,7 @@ PaginationDecoderDecorator, XmlDecoder, ) +from airbyte_cdk.sources.declarative.decoders.parsers import JsonParser from airbyte_cdk.sources.declarative.extractors import ( DpathExtractor, RecordFilter, @@ -218,6 +219,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( JsonlDecoder as JsonlDecoderModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + JsonParser as JsonParserModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( JwtAuthenticator as JwtAuthenticatorModel, ) @@ -450,6 +454,7 @@ def _init_mappings(self) -> None: InlineSchemaLoaderModel: self.create_inline_schema_loader, JsonDecoderModel: self.create_json_decoder, JsonlDecoderModel: self.create_jsonl_decoder, + JsonParser: self.create_json_parser, GzipJsonDecoderModel: self.create_gzipjson_decoder, KeysToLowerModel: self.create_keys_to_lower_transformation, IterableDecoderModel: self.create_iterable_decoder, @@ -1600,6 +1605,12 @@ def create_gzipjson_decoder( ) -> GzipJsonDecoder: return GzipJsonDecoder(parameters={}, encoding=model.encoding) + @staticmethod + def create_json_parser( + model: JsonParserModel, config: Config, **kwargs: Any + ) -> JsonParser: + return JsonParser(parameters={}) + @staticmethod def create_json_file_schema_loader( model: JsonFileSchemaLoaderModel, config: Config, **kwargs: Any From a8a7bb364fbbe46d27086fbdd7ca678223a558c0 Mon Sep 17 00:00:00 2001 From: pnilan Date: Wed, 11 Dec 2024 07:11:38 -0800 Subject: [PATCH 02/22] update parser --- airbyte_cdk/sources/declarative/decoders/parsers/parsers.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/decoders/parsers/parsers.py b/airbyte_cdk/sources/declarative/decoders/parsers/parsers.py index e98454cd6..a23e61101 100644 --- a/airbyte_cdk/sources/declarative/decoders/parsers/parsers.py +++ b/airbyte_cdk/sources/declarative/decoders/parsers/parsers.py @@ -11,7 +11,7 @@ @dataclass class Parser: """ - Parser strategy to convert byte data into a MutableMapping[str, Any]. + Parser strategy to convert str, bytes, or bytearray data into MutableMapping[str, Any]. """ @abstractmethod @@ -20,5 +20,8 @@ def parse(self, data: bytes) -> Generator[MutableMapping[str, Any], None, None]: class JsonParser(Parser): + """ + Parser strategy for converting JSON-structure str, bytes, or bytearray data into MutableMapping[str, Any]. + """ def parse(self, data: Union[str, bytes, bytearray]) -> Generator[MutableMapping[str, Any], None, None]: yield json.loads(data) From 254f877254226eef3a506806bbcea31a70d8d0d5 Mon Sep 17 00:00:00 2001 From: pnilan Date: Wed, 11 Dec 2024 07:27:29 -0800 Subject: [PATCH 03/22] add tests for json parser --- .../declarative/decoders/parsers/__init__.py | 3 +++ .../decoders/parsers/test_parsers.py | 22 +++++++++++++++++++ 2 files changed, 25 insertions(+) create mode 100644 unit_tests/sources/declarative/decoders/parsers/__init__.py create mode 100644 unit_tests/sources/declarative/decoders/parsers/test_parsers.py diff --git a/unit_tests/sources/declarative/decoders/parsers/__init__.py b/unit_tests/sources/declarative/decoders/parsers/__init__.py new file mode 100644 index 000000000..66f6de8cb --- /dev/null +++ b/unit_tests/sources/declarative/decoders/parsers/__init__.py @@ -0,0 +1,3 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# diff --git a/unit_tests/sources/declarative/decoders/parsers/test_parsers.py b/unit_tests/sources/declarative/decoders/parsers/test_parsers.py new file mode 100644 index 000000000..f2c6fee8a --- /dev/null +++ b/unit_tests/sources/declarative/decoders/parsers/test_parsers.py @@ -0,0 +1,22 @@ +# +# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# + +import json + +import pytest + +from airbyte_cdk.sources.declarative.decoders.parsers import JsonParser + + +@pytest.mark.parametrize( + "raw_data, expected", + [ + (json.dumps({"data-type": "string"}), {"data-type": "string"}), + (json.dumps({"data-type": "bytes"}).encode("utf-8"), {"data-type": "bytes"}), + (bytearray(json.dumps({"data-type": "bytearray"}).encode("utf-8")), {"data-type": "bytearray"}), + ], + ids=["test_with_str", "test_with_bytes", "test_with_bytearray"] +) +def test_json_parser_with_valid_data(raw_data, expected): + assert next(JsonParser().parse(raw_data)) == expected From 8df239ac0823d0fa23995561c3fae376ca6f5e13 Mon Sep 17 00:00:00 2001 From: pnilan Date: Wed, 11 Dec 2024 08:07:55 -0800 Subject: [PATCH 04/22] update parser and tests to yield empty dict if unparseable. --- .../declarative/decoders/parsers/parsers.py | 15 ++++++++++++++- .../declarative/decoders/parsers/test_parsers.py | 9 +++++++-- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/parsers/parsers.py b/airbyte_cdk/sources/declarative/decoders/parsers/parsers.py index a23e61101..8b0f8dfcb 100644 --- a/airbyte_cdk/sources/declarative/decoders/parsers/parsers.py +++ b/airbyte_cdk/sources/declarative/decoders/parsers/parsers.py @@ -3,10 +3,12 @@ # import json +import logging from abc import abstractmethod from dataclasses import dataclass from typing import Any, Generator, MutableMapping, Union +logger = logging.getLogger("airbyte") @dataclass class Parser: @@ -24,4 +26,15 @@ class JsonParser(Parser): Parser strategy for converting JSON-structure str, bytes, or bytearray data into MutableMapping[str, Any]. """ def parse(self, data: Union[str, bytes, bytearray]) -> Generator[MutableMapping[str, Any], None, None]: - yield json.loads(data) + try: + body_json = json.loads(data) + except json.JSONDecodeError: + logger.warning(f"Data cannot be parsed into json: {data=}") + yield {} + + if not isinstance(body_json, list): + body_json = [body_json] + if len(body_json) == 0: + yield {} + else: + yield from body_json diff --git a/unit_tests/sources/declarative/decoders/parsers/test_parsers.py b/unit_tests/sources/declarative/decoders/parsers/test_parsers.py index f2c6fee8a..298ac1de3 100644 --- a/unit_tests/sources/declarative/decoders/parsers/test_parsers.py +++ b/unit_tests/sources/declarative/decoders/parsers/test_parsers.py @@ -15,8 +15,13 @@ (json.dumps({"data-type": "string"}), {"data-type": "string"}), (json.dumps({"data-type": "bytes"}).encode("utf-8"), {"data-type": "bytes"}), (bytearray(json.dumps({"data-type": "bytearray"}).encode("utf-8")), {"data-type": "bytearray"}), + (json.dumps([{"id": 1}, {"id": 2}]), [{"id": 1}, {"id": 2}]) ], - ids=["test_with_str", "test_with_bytes", "test_with_bytearray"] + ids=["test_with_str", "test_with_bytes", "test_with_bytearray", "test_with_string_data_containing_list"] ) def test_json_parser_with_valid_data(raw_data, expected): - assert next(JsonParser().parse(raw_data)) == expected + for i, actual in enumerate(JsonParser().parse(raw_data)): + if isinstance(expected, list): + assert actual == expected[i] + else: + assert actual == expected From 92574df32a50bf416c7c8ad4ca10d7d5201f6c2c Mon Sep 17 00:00:00 2001 From: pnilan Date: Wed, 11 Dec 2024 15:36:45 -0800 Subject: [PATCH 05/22] chore: format code --- .../declarative/decoders/parsers/parsers.py | 6 +- .../models/declarative_component_schema.py | 88 +++++++------------ .../parsers/model_to_component_factory.py | 4 +- .../decoders/parsers/test_parsers.py | 24 +++-- 4 files changed, 55 insertions(+), 67 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/parsers/parsers.py b/airbyte_cdk/sources/declarative/decoders/parsers/parsers.py index 8b0f8dfcb..e7e69b851 100644 --- a/airbyte_cdk/sources/declarative/decoders/parsers/parsers.py +++ b/airbyte_cdk/sources/declarative/decoders/parsers/parsers.py @@ -10,6 +10,7 @@ logger = logging.getLogger("airbyte") + @dataclass class Parser: """ @@ -25,7 +26,10 @@ class JsonParser(Parser): """ Parser strategy for converting JSON-structure str, bytes, or bytearray data into MutableMapping[str, Any]. """ - def parse(self, data: Union[str, bytes, bytearray]) -> Generator[MutableMapping[str, Any], None, None]: + + def parse( + self, data: Union[str, bytes, bytearray] + ) -> Generator[MutableMapping[str, Any], None, None]: try: body_json = json.loads(data) except json.JSONDecodeError: diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 48b6abbab..e8b5dff4f 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -528,9 +528,7 @@ class OAuthAuthenticator(BaseModel): scopes: Optional[List[str]] = Field( None, description="List of scopes that should be granted to the access token.", - examples=[ - ["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"] - ], + examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], title="Scopes", ) token_expiry_date: Optional[str] = Field( @@ -883,28 +881,24 @@ class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = ( - Field( - None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", - examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, - { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], - } - }, - ], - title="OAuth user input", - ) + oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( + None, + description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", + examples=[ + {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + { + "app_id": { + "type": "string", + "path_in_connector_config": ["info", "app_id"], + } + }, + ], + title="OAuth user input", ) - oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = ( - Field( - None, - description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{my_var}`.\n- The nested resolution variables like `{{my_nested_var}}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {base64Encoder:{my_var_a}:{my_var_b}}\n + base64Decorer - decode from `base64` encoded string, {base64Decoder:{my_string_variable_or_string_value}}\n + urlEncoder - encode the input string to URL-like format, {urlEncoder:https://test.host.com/endpoint}\n + urlDecorer - decode the input url-encoded string into text format, {urlDecoder:https%3A%2F%2Fairbyte.io}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {codeChallengeS256:{state_value}}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{client_id_key}={{client_id_key}}&{redirect_uri_key}={urlEncoder:{{redirect_uri_key}}}&{state_key}={{state_key}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{auth_code_key}": "{{auth_code_key}}",\n "{client_id_key}": "{{client_id_key}}",\n "{client_secret_key}": "{{client_secret_key}}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', - title="DeclarativeOAuth Connector Specification", - ) + oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field( + None, + description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{my_var}`.\n- The nested resolution variables like `{{my_nested_var}}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {base64Encoder:{my_var_a}:{my_var_b}}\n + base64Decorer - decode from `base64` encoded string, {base64Decoder:{my_string_variable_or_string_value}}\n + urlEncoder - encode the input string to URL-like format, {urlEncoder:https://test.host.com/endpoint}\n + urlDecorer - decode the input url-encoded string into text format, {urlDecoder:https%3A%2F%2Fairbyte.io}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {codeChallengeS256:{state_value}}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{client_id_key}={{client_id_key}}&{redirect_uri_key}={urlEncoder:{{redirect_uri_key}}}&{state_key}={{state_key}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{auth_code_key}": "{{auth_code_key}}",\n "{client_id_key}": "{{client_id_key}}",\n "{client_secret_key}": "{{client_secret_key}}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', + title="DeclarativeOAuth Connector Specification", ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, @@ -922,9 +916,7 @@ class Config: complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[ - {"client_id": {"type": "string"}, "client_secret": {"type": "string"}} - ], + examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], title="OAuth input specification", ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( @@ -1615,25 +1607,21 @@ class Config: description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = ( - Field( - None, - description="Component used to fetch data incrementally based on a time field in the data.", - title="Incremental Sync", - ) - ) - name: Optional[str] = Field( - "", description="The stream name.", example=["Users"], title="Name" + incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = Field( + None, + description="Component used to fetch data incrementally based on a time field in the data.", + title="Incremental Sync", ) + name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") primary_key: Optional[PrimaryKey] = Field( "", description="The primary key of the stream.", title="Primary Key" ) - schema_loader: Optional[ - Union[InlineSchemaLoader, JsonFileSchemaLoader, CustomSchemaLoader] - ] = Field( - None, - description="Component used to retrieve the schema for the current stream.", - title="Schema Loader", + schema_loader: Optional[Union[InlineSchemaLoader, JsonFileSchemaLoader, CustomSchemaLoader]] = ( + Field( + None, + description="Component used to retrieve the schema for the current stream.", + title="Schema Loader", + ) ) transformations: Optional[ List[Union[AddFields, CustomTransformation, RemoveFields, KeysToLower]] @@ -1851,11 +1839,7 @@ class SimpleRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[ - Union[ - CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter - ] - ], + List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], ] ] = Field( [], @@ -1897,9 +1881,7 @@ class AsyncRetriever(BaseModel): ) download_extractor: Optional[ Union[CustomRecordExtractor, DpathExtractor, ResponseToFileExtractor] - ] = Field( - None, description="Responsible for fetching the records from provided urls." - ) + ] = Field(None, description="Responsible for fetching the records from provided urls.") creation_requester: Union[CustomRequester, HttpRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.", @@ -1929,11 +1911,7 @@ class AsyncRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[ - Union[ - CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter - ] - ], + List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], ] ] = Field( [], diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 6eefec528..3baff26d4 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -1606,9 +1606,7 @@ def create_gzipjson_decoder( return GzipJsonDecoder(parameters={}, encoding=model.encoding) @staticmethod - def create_json_parser( - model: JsonParserModel, config: Config, **kwargs: Any - ) -> JsonParser: + def create_json_parser(model: JsonParserModel, config: Config, **kwargs: Any) -> JsonParser: return JsonParser(parameters={}) @staticmethod diff --git a/unit_tests/sources/declarative/decoders/parsers/test_parsers.py b/unit_tests/sources/declarative/decoders/parsers/test_parsers.py index 298ac1de3..e939a58a9 100644 --- a/unit_tests/sources/declarative/decoders/parsers/test_parsers.py +++ b/unit_tests/sources/declarative/decoders/parsers/test_parsers.py @@ -10,14 +10,22 @@ @pytest.mark.parametrize( - "raw_data, expected", - [ - (json.dumps({"data-type": "string"}), {"data-type": "string"}), - (json.dumps({"data-type": "bytes"}).encode("utf-8"), {"data-type": "bytes"}), - (bytearray(json.dumps({"data-type": "bytearray"}).encode("utf-8")), {"data-type": "bytearray"}), - (json.dumps([{"id": 1}, {"id": 2}]), [{"id": 1}, {"id": 2}]) - ], - ids=["test_with_str", "test_with_bytes", "test_with_bytearray", "test_with_string_data_containing_list"] + "raw_data, expected", + [ + (json.dumps({"data-type": "string"}), {"data-type": "string"}), + (json.dumps({"data-type": "bytes"}).encode("utf-8"), {"data-type": "bytes"}), + ( + bytearray(json.dumps({"data-type": "bytearray"}).encode("utf-8")), + {"data-type": "bytearray"}, + ), + (json.dumps([{"id": 1}, {"id": 2}]), [{"id": 1}, {"id": 2}]), + ], + ids=[ + "test_with_str", + "test_with_bytes", + "test_with_bytearray", + "test_with_string_data_containing_list", + ], ) def test_json_parser_with_valid_data(raw_data, expected): for i, actual in enumerate(JsonParser().parse(raw_data)): From 9fd93cb4a4534586d96ad5d83cef238535fd3934 Mon Sep 17 00:00:00 2001 From: pnilan Date: Thu, 9 Jan 2025 16:21:53 -0800 Subject: [PATCH 06/22] conform tests --- .../declarative/decoders/parsers/__init__.py | 3 -- .../decoders/parsers/test_parsers.py | 35 ------------------- .../decoders/test_composite_decoder.py | 26 ++++++++++++++ 3 files changed, 26 insertions(+), 38 deletions(-) delete mode 100644 unit_tests/sources/declarative/decoders/parsers/__init__.py delete mode 100644 unit_tests/sources/declarative/decoders/parsers/test_parsers.py diff --git a/unit_tests/sources/declarative/decoders/parsers/__init__.py b/unit_tests/sources/declarative/decoders/parsers/__init__.py deleted file mode 100644 index 66f6de8cb..000000000 --- a/unit_tests/sources/declarative/decoders/parsers/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -# -# Copyright (c) 2024 Airbyte, Inc., all rights reserved. -# diff --git a/unit_tests/sources/declarative/decoders/parsers/test_parsers.py b/unit_tests/sources/declarative/decoders/parsers/test_parsers.py deleted file mode 100644 index e939a58a9..000000000 --- a/unit_tests/sources/declarative/decoders/parsers/test_parsers.py +++ /dev/null @@ -1,35 +0,0 @@ -# -# Copyright (c) 2024 Airbyte, Inc., all rights reserved. -# - -import json - -import pytest - -from airbyte_cdk.sources.declarative.decoders.parsers import JsonParser - - -@pytest.mark.parametrize( - "raw_data, expected", - [ - (json.dumps({"data-type": "string"}), {"data-type": "string"}), - (json.dumps({"data-type": "bytes"}).encode("utf-8"), {"data-type": "bytes"}), - ( - bytearray(json.dumps({"data-type": "bytearray"}).encode("utf-8")), - {"data-type": "bytearray"}, - ), - (json.dumps([{"id": 1}, {"id": 2}]), [{"id": 1}, {"id": 2}]), - ], - ids=[ - "test_with_str", - "test_with_bytes", - "test_with_bytearray", - "test_with_string_data_containing_list", - ], -) -def test_json_parser_with_valid_data(raw_data, expected): - for i, actual in enumerate(JsonParser().parse(raw_data)): - if isinstance(expected, list): - assert actual == expected[i] - else: - assert actual == expected diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py index 9cd057791..aa7b86c4a 100644 --- a/unit_tests/sources/declarative/decoders/test_composite_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -14,6 +14,7 @@ CsvParser, GzipParser, JsonLineParser, + JsonParser ) @@ -117,3 +118,28 @@ def test_composite_raw_decoder_jsonline_parser(requests_mock, encoding: str): for _ in composite_raw_decoder.decode(response): counter += 1 assert counter == 3 + +@pytest.mark.parametrize( + "raw_data, expected", + [ + (json.dumps({"data-type": "string"}), {"data-type": "string"}), + (json.dumps({"data-type": "bytes"}).encode("utf-8"), {"data-type": "bytes"}), + ( + bytearray(json.dumps({"data-type": "bytearray"}).encode("utf-8")), + {"data-type": "bytearray"}, + ), + (json.dumps([{"id": 1}, {"id": 2}]), [{"id": 1}, {"id": 2}]), + ], + ids=[ + "test_with_str", + "test_with_bytes", + "test_with_bytearray", + "test_with_string_data_containing_list", + ], +) +def test_json_parser_with_valid_data(raw_data, expected): + for i, actual in enumerate(JsonParser().parse(raw_data)): + if isinstance(expected, list): + assert actual == expected[i] + else: + assert actual == expected From 1892a034f34231d7bd011f89b941a22c990ca486 Mon Sep 17 00:00:00 2001 From: pnilan Date: Fri, 10 Jan 2025 09:14:08 -0800 Subject: [PATCH 07/22] initial test updates --- .../decoders/test_composite_decoder.py | 17 +++++------------ 1 file changed, 5 insertions(+), 12 deletions(-) diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py index aa7b86c4a..750ace47b 100644 --- a/unit_tests/sources/declarative/decoders/test_composite_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -4,7 +4,7 @@ import csv import gzip import json -from io import BytesIO, StringIO +from io import BufferedReader, BytesIO, StringIO import pytest import requests @@ -122,19 +122,12 @@ def test_composite_raw_decoder_jsonline_parser(requests_mock, encoding: str): @pytest.mark.parametrize( "raw_data, expected", [ - (json.dumps({"data-type": "string"}), {"data-type": "string"}), - (json.dumps({"data-type": "bytes"}).encode("utf-8"), {"data-type": "bytes"}), - ( - bytearray(json.dumps({"data-type": "bytearray"}).encode("utf-8")), - {"data-type": "bytearray"}, - ), - (json.dumps([{"id": 1}, {"id": 2}]), [{"id": 1}, {"id": 2}]), + (BufferedReader(BytesIO(b'{"data-type": "string"}')), {"data-type": "string"}), + (BufferedReader(BytesIO(b'[{"id": 1}, {"id": 2}]')), [{"id": 1}, {"id": 2}]), ], ids=[ - "test_with_str", - "test_with_bytes", - "test_with_bytearray", - "test_with_string_data_containing_list", + "test_with_buffered_io_base_data_containing_string", + "test_with_buffered_io_base_data_containing_list", ], ) def test_json_parser_with_valid_data(raw_data, expected): From 51118f1e4320120618e80be0d9a7f063f54d090c Mon Sep 17 00:00:00 2001 From: pnilan Date: Fri, 10 Jan 2025 13:53:33 -0800 Subject: [PATCH 08/22] update JsonParser and relevant tests --- .../declarative_component_schema.yaml | 5 +- .../decoders/composite_raw_decoder.py | 11 ++- .../models/declarative_component_schema.py | 95 +++++++------------ .../parsers/model_to_component_factory.py | 8 +- docs/RELEASES.md | 12 +-- .../decoders/test_composite_decoder.py | 27 ++++-- 6 files changed, 71 insertions(+), 87 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 3bc07ca26..dbda04518 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -678,7 +678,7 @@ definitions: properties: type: type: string - enum: [ CustomSchemaNormalization ] + enum: [CustomSchemaNormalization] class_name: title: Class Name description: Fully-qualified name of the class that will be implementing the custom normalization. The format is `source_..`. @@ -2020,6 +2020,9 @@ definitions: type: type: string enum: [JsonParser] + encoding: + type: string + default: utf-8 ListPartitionRouter: title: List Partition Router description: A Partition router that specifies a list of attributes where each attribute describes a portion of the complete data set for a stream. During a sync, each value is iterated over and can be used as input to outbound API requests. diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index 504abaaab..066122243 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -41,18 +41,19 @@ def parse( with gzip.GzipFile(fileobj=data, mode="rb") as gzipobj: yield from self.inner_parser.parse(gzipobj) + @dataclass class JsonParser(Parser): """ Parser strategy for converting JSON-structure str, bytes, or bytearray data into MutableMapping[str, Any]. """ - def parse( - self, - data: BufferedIOBase - ) -> Generator[MutableMapping[str, Any], None, None]: + encoding: Optional[str] = "utf-8" + + def parse(self, data: BufferedIOBase) -> Generator[MutableMapping[str, Any], None, None]: + raw_data = data.read() try: - body_json = json.loads(data.decode("utf-8")) + body_json = json.loads(raw_data.decode(self.encoding)) except json.JSONDecodeError: logger.warning(f"Data cannot be parsed into json: {data=}") yield {} diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 59d9b3509..df83d4e7b 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -550,9 +550,7 @@ class OAuthAuthenticator(BaseModel): scopes: Optional[List[str]] = Field( None, description="List of scopes that should be granted to the access token.", - examples=[ - ["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"] - ], + examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], title="Scopes", ) token_expiry_date: Optional[str] = Field( @@ -807,6 +805,7 @@ class Config: extra = Extra.allow type: Literal["JsonParser"] + encoding: Optional[str] = "utf-8" class MinMaxDatetime(BaseModel): @@ -885,9 +884,7 @@ class Config: access_token_headers: Optional[Dict[str, Any]] = Field( None, description="The DeclarativeOAuth Specific optional headers to inject while exchanging the `auth_code` to `access_token` during `completeOAuthFlow` step.", - examples=[ - {"Authorization": "Basic {base64Encoder:{client_id}:{client_secret}}"} - ], + examples=[{"Authorization": "Basic {base64Encoder:{client_id}:{client_secret}}"}], title="Access Token Headers", ) access_token_params: Optional[Dict[str, Any]] = Field( @@ -956,28 +953,24 @@ class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = ( - Field( - None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", - examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, - { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], - } - }, - ], - title="OAuth user input", - ) + oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( + None, + description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", + examples=[ + {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + { + "app_id": { + "type": "string", + "path_in_connector_config": ["info", "app_id"], + } + }, + ], + title="OAuth user input", ) - oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = ( - Field( - None, - description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{my_var}`.\n- The nested resolution variables like `{{my_nested_var}}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {base64Encoder:{my_var_a}:{my_var_b}}\n + base64Decorer - decode from `base64` encoded string, {base64Decoder:{my_string_variable_or_string_value}}\n + urlEncoder - encode the input string to URL-like format, {urlEncoder:https://test.host.com/endpoint}\n + urlDecorer - decode the input url-encoded string into text format, {urlDecoder:https%3A%2F%2Fairbyte.io}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {codeChallengeS256:{state_value}}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{client_id_key}={{client_id_key}}&{redirect_uri_key}={urlEncoder:{{redirect_uri_key}}}&{state_key}={{state_key}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{auth_code_key}": "{{auth_code_key}}",\n "{client_id_key}": "{{client_id_key}}",\n "{client_secret_key}": "{{client_secret_key}}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', - title="DeclarativeOAuth Connector Specification", - ) + oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field( + None, + description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{my_var}`.\n- The nested resolution variables like `{{my_nested_var}}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {base64Encoder:{my_var_a}:{my_var_b}}\n + base64Decorer - decode from `base64` encoded string, {base64Decoder:{my_string_variable_or_string_value}}\n + urlEncoder - encode the input string to URL-like format, {urlEncoder:https://test.host.com/endpoint}\n + urlDecorer - decode the input url-encoded string into text format, {urlDecoder:https%3A%2F%2Fairbyte.io}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {codeChallengeS256:{state_value}}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{client_id_key}={{client_id_key}}&{redirect_uri_key}={urlEncoder:{{redirect_uri_key}}}&{state_key}={{state_key}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{auth_code_key}": "{{auth_code_key}}",\n "{client_id_key}": "{{client_id_key}}",\n "{client_secret_key}": "{{client_secret_key}}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', + title="DeclarativeOAuth Connector Specification", ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, @@ -995,9 +988,7 @@ class Config: complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[ - {"client_id": {"type": "string"}, "client_secret": {"type": "string"}} - ], + examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], title="OAuth input specification", ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( @@ -1573,9 +1564,7 @@ class RecordSelector(BaseModel): description="Responsible for filtering records to be emitted by the Source.", title="Record Filter", ) - schema_normalization: Optional[ - Union[SchemaNormalization, CustomSchemaNormalization] - ] = Field( + schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field( SchemaNormalization.None_, description="Responsible for normalization according to the schema.", title="Schema Normalization", @@ -1737,16 +1726,12 @@ class Config: description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = ( - Field( - None, - description="Component used to fetch data incrementally based on a time field in the data.", - title="Incremental Sync", - ) - ) - name: Optional[str] = Field( - "", description="The stream name.", example=["Users"], title="Name" + incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = Field( + None, + description="Component used to fetch data incrementally based on a time field in the data.", + title="Incremental Sync", ) + name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") primary_key: Optional[PrimaryKey] = Field( "", description="The primary key of the stream.", title="Primary Key" ) @@ -2016,11 +2001,7 @@ class SimpleRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[ - Union[ - CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter - ] - ], + List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], ] ] = Field( [], @@ -2063,9 +2044,7 @@ class AsyncRetriever(BaseModel): ) download_extractor: Optional[ Union[CustomRecordExtractor, DpathExtractor, ResponseToFileExtractor] - ] = Field( - None, description="Responsible for fetching the records from provided urls." - ) + ] = Field(None, description="Responsible for fetching the records from provided urls.") creation_requester: Union[CustomRequester, HttpRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.", @@ -2095,11 +2074,7 @@ class AsyncRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[ - Union[ - CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter - ] - ], + List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], ] ] = Field( [], @@ -2163,12 +2138,10 @@ class DynamicDeclarativeStream(BaseModel): stream_template: DeclarativeStream = Field( ..., description="Reference to the stream template.", title="Stream Template" ) - components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = ( - Field( - ..., - description="Component resolve and populates stream templates with components values.", - title="Components Resolver", - ) + components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = Field( + ..., + description="Component resolve and populates stream templates with components values.", + title="Components Resolver", ) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 3c1f22490..475478ef2 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -1751,6 +1751,10 @@ def create_dynamic_schema_loader( def create_json_decoder(model: JsonDecoderModel, config: Config, **kwargs: Any) -> JsonDecoder: return JsonDecoder(parameters={}) + @staticmethod + def create_json_parser(model: JsonParserModel, config: Config, **kwargs: Any) -> JsonParser: + return JsonParser(encoding=model.encoding) + @staticmethod def create_jsonl_decoder( model: JsonlDecoderModel, config: Config, **kwargs: Any @@ -1795,10 +1799,6 @@ def create_composite_raw_decoder( parser = self._create_component_from_model(model=model.parser, config=config) return CompositeRawDecoder(parser=parser) - @staticmethod - def create_json_parser(model: JsonParserModel, config: Config, **kwargs: Any) -> JsonParser: - return JsonParser(parameters={}) - @staticmethod def create_json_file_schema_loader( model: JsonFileSchemaLoaderModel, config: Config, **kwargs: Any diff --git a/docs/RELEASES.md b/docs/RELEASES.md index cf74d4232..1f53c7256 100644 --- a/docs/RELEASES.md +++ b/docs/RELEASES.md @@ -9,9 +9,9 @@ A few seconds after any PR is merged to `main` , a release draft will be created 3. Optionally tweak the text in the release notes - for instance to call out contributors, to make a specific change more intuitive for readers to understand, or to move updates into a different category than they were assigned by default. (Note: You can also do this retroactively after publishing the release.) 4. Publish the release by pressing the “Publish release” button. -*Note:* +_Note:_ -- *Only maintainers can see release drafts. Non-maintainers will only see published releases.* +- _Only maintainers can see release drafts. Non-maintainers will only see published releases._ - If you create a tag on accident that you need to remove, contact a maintainer to delete the tag and the release. - You can monitor the PyPI release process here in the GitHub Actions view: https://github.com/airbytehq/airbyte-python-cdk/actions/workflows/pypi_publish.yml @@ -49,7 +49,7 @@ The first option is to look in the `declarative_manifest_image_version` database If that is not available as an option, you can run an Builder-created connector in Cloud and note the version number printed in the logs. Warning: this may not be indicative if that connector instance has been manually pinned to a specific version. -TODO: Would be great to find a way to inspect directly without requiring direct prod DB access. +TODO: Would be great to find a way to inspect directly without requiring direct prod DB access. ### How to pretest changes to SDM images manually @@ -57,15 +57,15 @@ To manually test changes against a dev image of SDM before committing to a relea #### Pretesting Manifest-Only connectors -Once the publish pipeline has completed, choose a connector to test. Set the base_image in the connector's metadata to your pre-release version in Dockerhub (make sure to update the SHA as well). -Next, build the pre-release image locally using `airbyte-ci connectors —name= build`. +Once the publish pipeline has completed, choose a connector to test. Set the base_image in the connector's metadata to your pre-release version in Dockerhub (make sure to update the SHA as well). +Next, build the pre-release image locally using `airbyte-ci connectors —name= build`. You can now run connector interfaces against the built image using the pattern
`docker run airbyte/:dev `. The connector's README should include a list of these commands, which can be copy/pasted and run from the connector's directory for quick testing against a local config. You can also run `airbyte-ci connectors —name= test` to run the CI test suite against the dev image. #### Pretesting Low-Code Python connectors -Once the publish pipeline has completed, set the version of `airbyte-cdk` in the connector's pyproject.toml file to the pre-release version in PyPI. +Once the publish pipeline has completed, set the version of `airbyte-cdk` in the connector's pyproject.toml file to the pre-release version in PyPI. Update the lockfile and run connector interfaces via poetry:
`poetry run source- spec/check/discover/read`. You can also run `airbyte-ci connectors —name= test` to run the CI test suite against the dev image.

 diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py index 750ace47b..e0ddef0e1 100644 --- a/unit_tests/sources/declarative/decoders/test_composite_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -14,7 +14,7 @@ CsvParser, GzipParser, JsonLineParser, - JsonParser + JsonParser, ) @@ -119,20 +119,27 @@ def test_composite_raw_decoder_jsonline_parser(requests_mock, encoding: str): counter += 1 assert counter == 3 + @pytest.mark.parametrize( - "raw_data, expected", + "data, expected", [ - (BufferedReader(BytesIO(b'{"data-type": "string"}')), {"data-type": "string"}), - (BufferedReader(BytesIO(b'[{"id": 1}, {"id": 2}]')), [{"id": 1}, {"id": 2}]), + ({"data-type": "string"}, {"data-type": "string"}), + ([{"id": 1}, {"id": 2}], [{"id": 1}, {"id": 2}]), ], ids=[ "test_with_buffered_io_base_data_containing_string", "test_with_buffered_io_base_data_containing_list", ], ) -def test_json_parser_with_valid_data(raw_data, expected): - for i, actual in enumerate(JsonParser().parse(raw_data)): - if isinstance(expected, list): - assert actual == expected[i] - else: - assert actual == expected +def test_json_parser_with_valid_data(data, expected): + encodings = ["utf-8", "utf", "iso-8859-1"] + + for encoding in encodings: + raw_data = json.dumps(data).encode(encoding) + for i, actual in enumerate( + JsonParser(encoding=encoding).parse(BufferedReader(BytesIO(raw_data))) + ): + if isinstance(expected, list): + assert actual == expected[i] + else: + assert actual == expected From 34a710df66f788641b60388916dd546037cc86d3 Mon Sep 17 00:00:00 2001 From: pnilan Date: Fri, 10 Jan 2025 14:53:01 -0800 Subject: [PATCH 09/22] chore: format/type-check --- .../sources/declarative/decoders/composite_raw_decoder.py | 2 +- .../declarative/parsers/model_to_component_factory.py | 5 +++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index 066122243..5dbbf7a12 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -48,7 +48,7 @@ class JsonParser(Parser): Parser strategy for converting JSON-structure str, bytes, or bytearray data into MutableMapping[str, Any]. """ - encoding: Optional[str] = "utf-8" + encoding: str = "utf-8" def parse(self, data: BufferedIOBase) -> Generator[MutableMapping[str, Any], None, None]: raw_data = data.read() diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 475478ef2..02e605f62 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -525,8 +525,8 @@ def _init_mappings(self) -> None: InlineSchemaLoaderModel: self.create_inline_schema_loader, JsonDecoderModel: self.create_json_decoder, JsonlDecoderModel: self.create_jsonl_decoder, - JsonParser: self.create_json_parser, JsonLineParserModel: self.create_json_line_parser, + JsonParserModel: self.create_json_parser, GzipJsonDecoderModel: self.create_gzipjson_decoder, GzipParserModel: self.create_gzip_parser, KeysToLowerModel: self.create_keys_to_lower_transformation, @@ -1753,7 +1753,8 @@ def create_json_decoder(model: JsonDecoderModel, config: Config, **kwargs: Any) @staticmethod def create_json_parser(model: JsonParserModel, config: Config, **kwargs: Any) -> JsonParser: - return JsonParser(encoding=model.encoding) + encoding = model.encoding or "utf-8" + return JsonParser(encoding=encoding) @staticmethod def create_jsonl_decoder( From 060178a0787cf6f098bcf226a4f045f521ca9a93 Mon Sep 17 00:00:00 2001 From: pnilan Date: Tue, 14 Jan 2025 08:24:08 -0800 Subject: [PATCH 10/22] remove orjson from composite_raw_decoder file --- .../decoders/composite_raw_decoder.py | 12 +++++++++--- .../decoders/test_composite_decoder.py | 16 +++++++++------- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index 5dbbf7a12..99f748ead 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -9,7 +9,9 @@ import requests +from airbyte_cdk.models import FailureType from airbyte_cdk.sources.declarative.decoders.decoder import Decoder +from airbyte_cdk.utils import AirbyteTracedException logger = logging.getLogger("airbyte") @@ -54,9 +56,13 @@ def parse(self, data: BufferedIOBase) -> Generator[MutableMapping[str, Any], Non raw_data = data.read() try: body_json = json.loads(raw_data.decode(self.encoding)) - except json.JSONDecodeError: - logger.warning(f"Data cannot be parsed into json: {data=}") - yield {} + except json.JSONDecodeError as exc: + raise AirbyteTracedException( + message="JSON data failed to be parsed. See logs for more inforation.", + internal_message=f"JSON data faild to be parsed: {exc=}", + failure_type=FailureType.system_error, + exception=exc, + ) if not isinstance(body_json, list): body_json = [body_json] diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py index e0ddef0e1..d5443dee7 100644 --- a/unit_tests/sources/declarative/decoders/test_composite_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -121,17 +121,19 @@ def test_composite_raw_decoder_jsonline_parser(requests_mock, encoding: str): @pytest.mark.parametrize( - "data, expected", + "data", [ - ({"data-type": "string"}, {"data-type": "string"}), - ([{"id": 1}, {"id": 2}], [{"id": 1}, {"id": 2}]), + ({"data-type": "string"}), + ([{"id": 1}, {"id": 2}]), + ({"id": 170_141_183_460_469_231_731_687_303_715_884_105_727}), ], ids=[ "test_with_buffered_io_base_data_containing_string", "test_with_buffered_io_base_data_containing_list", + "test_with_buffered_io_base_data_containing_int128", ], ) -def test_json_parser_with_valid_data(data, expected): +def test_json_parser_with_valid_data(data): encodings = ["utf-8", "utf", "iso-8859-1"] for encoding in encodings: @@ -139,7 +141,7 @@ def test_json_parser_with_valid_data(data, expected): for i, actual in enumerate( JsonParser(encoding=encoding).parse(BufferedReader(BytesIO(raw_data))) ): - if isinstance(expected, list): - assert actual == expected[i] + if isinstance(data, list): + assert actual == data[i] else: - assert actual == expected + assert actual == data From d9b6df343eb4e28c86e148794d5be57640198ca1 Mon Sep 17 00:00:00 2001 From: pnilan Date: Tue, 14 Jan 2025 08:37:45 -0800 Subject: [PATCH 11/22] chore: format code --- .../models/declarative_component_schema.py | 94 +++++++------------ 1 file changed, 33 insertions(+), 61 deletions(-) diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 84277eb7e..e998f5515 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -550,9 +550,7 @@ class OAuthAuthenticator(BaseModel): scopes: Optional[List[str]] = Field( None, description="List of scopes that should be granted to the access token.", - examples=[ - ["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"] - ], + examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], title="Scopes", ) token_expiry_date: Optional[str] = Field( @@ -891,9 +889,7 @@ class Config: access_token_headers: Optional[Dict[str, Any]] = Field( None, description="The DeclarativeOAuth Specific optional headers to inject while exchanging the `auth_code` to `access_token` during `completeOAuthFlow` step.", - examples=[ - {"Authorization": "Basic {base64Encoder:{client_id}:{client_secret}}"} - ], + examples=[{"Authorization": "Basic {base64Encoder:{client_id}:{client_secret}}"}], title="Access Token Headers", ) access_token_params: Optional[Dict[str, Any]] = Field( @@ -962,28 +958,24 @@ class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = ( - Field( - None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", - examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, - { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], - } - }, - ], - title="OAuth user input", - ) + oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( + None, + description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", + examples=[ + {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + { + "app_id": { + "type": "string", + "path_in_connector_config": ["info", "app_id"], + } + }, + ], + title="OAuth user input", ) - oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = ( - Field( - None, - description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{my_var}`.\n- The nested resolution variables like `{{my_nested_var}}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {base64Encoder:{my_var_a}:{my_var_b}}\n + base64Decorer - decode from `base64` encoded string, {base64Decoder:{my_string_variable_or_string_value}}\n + urlEncoder - encode the input string to URL-like format, {urlEncoder:https://test.host.com/endpoint}\n + urlDecorer - decode the input url-encoded string into text format, {urlDecoder:https%3A%2F%2Fairbyte.io}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {codeChallengeS256:{state_value}}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{client_id_key}={{client_id_key}}&{redirect_uri_key}={urlEncoder:{{redirect_uri_key}}}&{state_key}={{state_key}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{auth_code_key}": "{{auth_code_key}}",\n "{client_id_key}": "{{client_id_key}}",\n "{client_secret_key}": "{{client_secret_key}}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', - title="DeclarativeOAuth Connector Specification", - ) + oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field( + None, + description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{my_var}`.\n- The nested resolution variables like `{{my_nested_var}}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {base64Encoder:{my_var_a}:{my_var_b}}\n + base64Decorer - decode from `base64` encoded string, {base64Decoder:{my_string_variable_or_string_value}}\n + urlEncoder - encode the input string to URL-like format, {urlEncoder:https://test.host.com/endpoint}\n + urlDecorer - decode the input url-encoded string into text format, {urlDecoder:https%3A%2F%2Fairbyte.io}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {codeChallengeS256:{state_value}}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{client_id_key}={{client_id_key}}&{redirect_uri_key}={urlEncoder:{{redirect_uri_key}}}&{state_key}={{state_key}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{auth_code_key}": "{{auth_code_key}}",\n "{client_id_key}": "{{client_id_key}}",\n "{client_secret_key}": "{{client_secret_key}}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', + title="DeclarativeOAuth Connector Specification", ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, @@ -1001,9 +993,7 @@ class Config: complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[ - {"client_id": {"type": "string"}, "client_secret": {"type": "string"}} - ], + examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], title="OAuth input specification", ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( @@ -1579,9 +1569,7 @@ class RecordSelector(BaseModel): description="Responsible for filtering records to be emitted by the Source.", title="Record Filter", ) - schema_normalization: Optional[ - Union[SchemaNormalization, CustomSchemaNormalization] - ] = Field( + schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field( SchemaNormalization.None_, description="Responsible for normalization according to the schema.", title="Schema Normalization", @@ -1743,16 +1731,12 @@ class Config: description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = ( - Field( - None, - description="Component used to fetch data incrementally based on a time field in the data.", - title="Incremental Sync", - ) - ) - name: Optional[str] = Field( - "", description="The stream name.", example=["Users"], title="Name" + incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = Field( + None, + description="Component used to fetch data incrementally based on a time field in the data.", + title="Incremental Sync", ) + name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") primary_key: Optional[PrimaryKey] = Field( "", description="The primary key of the stream.", title="Primary Key" ) @@ -2022,11 +2006,7 @@ class SimpleRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[ - Union[ - CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter - ] - ], + List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], ] ] = Field( [], @@ -2069,9 +2049,7 @@ class AsyncRetriever(BaseModel): ) download_extractor: Optional[ Union[CustomRecordExtractor, DpathExtractor, ResponseToFileExtractor] - ] = Field( - None, description="Responsible for fetching the records from provided urls." - ) + ] = Field(None, description="Responsible for fetching the records from provided urls.") creation_requester: Union[CustomRequester, HttpRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.", @@ -2101,11 +2079,7 @@ class AsyncRetriever(BaseModel): CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter, - List[ - Union[ - CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter - ] - ], + List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]], ] ] = Field( [], @@ -2169,12 +2143,10 @@ class DynamicDeclarativeStream(BaseModel): stream_template: DeclarativeStream = Field( ..., description="Reference to the stream template.", title="Stream Template" ) - components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = ( - Field( - ..., - description="Component resolve and populates stream templates with components values.", - title="Components Resolver", - ) + components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = Field( + ..., + description="Component resolve and populates stream templates with components values.", + title="Components Resolver", ) From f20fffc7673efa166544dbbe2068889b7a242e1c Mon Sep 17 00:00:00 2001 From: pnilan Date: Tue, 14 Jan 2025 09:48:20 -0800 Subject: [PATCH 12/22] add additional test --- .../declarative/decoders/test_composite_decoder.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py index d5443dee7..6d3a48cae 100644 --- a/unit_tests/sources/declarative/decoders/test_composite_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -126,11 +126,15 @@ def test_composite_raw_decoder_jsonline_parser(requests_mock, encoding: str): ({"data-type": "string"}), ([{"id": 1}, {"id": 2}]), ({"id": 170_141_183_460_469_231_731_687_303_715_884_105_727}), + ({}), + ({"nested": {"foo": {"bar": "baz"}}}), ], ids=[ - "test_with_buffered_io_base_data_containing_string", - "test_with_buffered_io_base_data_containing_list", - "test_with_buffered_io_base_data_containing_int128", + "valid_dict", + "list_of_dicts", + "int128", + "empty_object", + "nested_structure", ], ) def test_json_parser_with_valid_data(data): From 9ce2c283608255aa996006fc2e637354c88abe4c Mon Sep 17 00:00:00 2001 From: pnilan Date: Tue, 14 Jan 2025 11:54:45 -0800 Subject: [PATCH 13/22] update to fallback to json library if orjson fails, update test to use `CompositeRawDecoder` --- .../decoders/composite_raw_decoder.py | 34 +++++++++---------- .../decoders/test_composite_decoder.py | 26 +++++++------- 2 files changed, 29 insertions(+), 31 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index 99f748ead..d7fdcfba0 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -7,6 +7,7 @@ from io import BufferedIOBase, TextIOWrapper from typing import Any, Generator, MutableMapping, Optional +import orjson import requests from airbyte_cdk.models import FailureType @@ -46,30 +47,27 @@ def parse( @dataclass class JsonParser(Parser): - """ - Parser strategy for converting JSON-structure str, bytes, or bytearray data into MutableMapping[str, Any]. - """ - encoding: str = "utf-8" def parse(self, data: BufferedIOBase) -> Generator[MutableMapping[str, Any], None, None]: raw_data = data.read() try: - body_json = json.loads(raw_data.decode(self.encoding)) - except json.JSONDecodeError as exc: - raise AirbyteTracedException( - message="JSON data failed to be parsed. See logs for more inforation.", - internal_message=f"JSON data faild to be parsed: {exc=}", - failure_type=FailureType.system_error, - exception=exc, - ) - - if not isinstance(body_json, list): - body_json = [body_json] - if len(body_json) == 0: - yield {} - else: + body_json = orjson.loads(raw_data.decode(self.encoding)) + except Exception: + try: + body_json = json.loads(raw_data.decode(self.encoding)) + except Exception as exc: + raise AirbyteTracedException( + message="Response JSON data failed to be parsed. See logs for more inforation.", + internal_message=f"Response JSON data faild to be parsed: {exc=}, {raw_data=}", + failure_type=FailureType.system_error, + exception=exc, + ) + + if isinstance(body_json, list): yield from body_json + else: + yield from [body_json] @dataclass diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py index 6d3a48cae..cccb1a835 100644 --- a/unit_tests/sources/declarative/decoders/test_composite_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -121,11 +121,11 @@ def test_composite_raw_decoder_jsonline_parser(requests_mock, encoding: str): @pytest.mark.parametrize( - "data", + "test_data", [ ({"data-type": "string"}), - ([{"id": 1}, {"id": 2}]), - ({"id": 170_141_183_460_469_231_731_687_303_715_884_105_727}), + ([{"id": "1"}, {"id": "2"}]), + ({"id": "170141183460469231731687303715884105727"}), ({}), ({"nested": {"foo": {"bar": "baz"}}}), ], @@ -137,15 +137,15 @@ def test_composite_raw_decoder_jsonline_parser(requests_mock, encoding: str): "nested_structure", ], ) -def test_json_parser_with_valid_data(data): +def test_composite_raw_decoder_json_parser(requests_mock, test_data): encodings = ["utf-8", "utf", "iso-8859-1"] - for encoding in encodings: - raw_data = json.dumps(data).encode(encoding) - for i, actual in enumerate( - JsonParser(encoding=encoding).parse(BufferedReader(BytesIO(raw_data))) - ): - if isinstance(data, list): - assert actual == data[i] - else: - assert actual == data + raw_data = json.dumps(test_data).encode(encoding=encoding) + requests_mock.register_uri("GET", "https://airbyte.io/", content=raw_data) + response = requests.get("https://airbyte.io/", stream=True) + composite_raw_decoder = CompositeRawDecoder(parser=JsonParser(encoding=encoding)) + actual = list(composite_raw_decoder.decode(response)) + if isinstance(test_data, list): + assert actual == test_data + else: + assert actual == [test_data] From 7e7b2c4c4c22d16b5644770bc3e4416e440b0335 Mon Sep 17 00:00:00 2001 From: pnilan Date: Tue, 14 Jan 2025 13:42:03 -0800 Subject: [PATCH 14/22] add `JsonParser` to GzipDecoder and CompositeRawDecoder "anyOf" list --- .../declarative_component_schema.yaml | 30 ++++++++++--------- .../decoders/composite_raw_decoder.py | 3 ++ .../models/declarative_component_schema.py | 20 ++++++------- 3 files changed, 29 insertions(+), 24 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index a8cbaf195..8170f519b 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2014,20 +2014,6 @@ definitions: $parameters: type: object additionalProperties: true - JsonParser: - title: JsonParser - description: Parser used for parsing str, bytes, or bytearray data and returning data in a dictionary format. - type: object - additionalProperties: true - required: - - type - properties: - type: - type: string - enum: [JsonParser] - encoding: - type: string - default: utf-8 ListPartitionRouter: title: List Partition Router description: A Partition router that specifies a list of attributes where each attribute describes a portion of the complete data set for a stream. During a sync, each value is iterated over and can be used as input to outbound API requests. @@ -2873,6 +2859,7 @@ definitions: parser: anyOf: - "$ref": "#/definitions/GzipParser" + - "$ref": "#/definitions/JsonParser" - "$ref": "#/definitions/JsonLineParser" - "$ref": "#/definitions/CsvParser" # PARSERS @@ -2889,6 +2876,21 @@ definitions: anyOf: - "$ref": "#/definitions/JsonLineParser" - "$ref": "#/definitions/CsvParser" + - "$ref": "#/definitions/JsonParser" + JsonParser: + title: JsonParser + description: Parser used for parsing str, bytes, or bytearray data and returning data in a dictionary format. + type: object + additionalProperties: true + required: + - type + properties: + type: + type: string + enum: [JsonParser] + encoding: + type: string + default: utf-8 JsonLineParser: type: object required: diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index d7fdcfba0..f03899334 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -50,6 +50,9 @@ class JsonParser(Parser): encoding: str = "utf-8" def parse(self, data: BufferedIOBase) -> Generator[MutableMapping[str, Any], None, None]: + """ + Attempts to deserialize data using orjson library. As an extra layer of safety we fallback on the json library to deserialize the data. + """ raw_data = data.read() try: body_json = orjson.loads(raw_data.decode(self.encoding)) diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index e998f5515..01264c2de 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -805,14 +805,6 @@ class Config: parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") -class JsonParser(BaseModel): - class Config: - extra = Extra.allow - - type: Literal["JsonParser"] - encoding: Optional[str] = "utf-8" - - class MinMaxDatetime(BaseModel): type: Literal["MinMaxDatetime"] datetime: str = Field( @@ -1181,6 +1173,14 @@ class LegacySessionTokenAuthenticator(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class JsonParser(BaseModel): + class Config: + extra = Extra.allow + + type: Literal["JsonParser"] + encoding: Optional[str] = "utf-8" + + class JsonLineParser(BaseModel): type: Literal["JsonLineParser"] encoding: Optional[str] = "utf-8" @@ -1579,7 +1579,7 @@ class RecordSelector(BaseModel): class GzipParser(BaseModel): type: Literal["GzipParser"] - inner_parser: Union[JsonLineParser, CsvParser] + inner_parser: Union[JsonLineParser, CsvParser, JsonParser] class Spec(BaseModel): @@ -1614,7 +1614,7 @@ class CompositeErrorHandler(BaseModel): class CompositeRawDecoder(BaseModel): type: Literal["CompositeRawDecoder"] - parser: Union[GzipParser, JsonLineParser, CsvParser] + parser: Union[GzipParser, JsonParser, JsonLineParser, CsvParser] class DeclarativeSource1(BaseModel): From 23cbfb78b2f30b4be1a8da5d8445dcd7bb77e07d Mon Sep 17 00:00:00 2001 From: pnilan Date: Tue, 14 Jan 2025 14:08:08 -0800 Subject: [PATCH 15/22] update to simplify orjson/json parsing --- .../decoders/composite_raw_decoder.py | 37 +++++++++++++------ .../decoders/test_composite_decoder.py | 28 +++++++++++++- 2 files changed, 52 insertions(+), 13 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index f03899334..afde39e1b 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -54,24 +54,37 @@ def parse(self, data: BufferedIOBase) -> Generator[MutableMapping[str, Any], Non Attempts to deserialize data using orjson library. As an extra layer of safety we fallback on the json library to deserialize the data. """ raw_data = data.read() - try: - body_json = orjson.loads(raw_data.decode(self.encoding)) - except Exception: - try: - body_json = json.loads(raw_data.decode(self.encoding)) - except Exception as exc: - raise AirbyteTracedException( - message="Response JSON data failed to be parsed. See logs for more inforation.", - internal_message=f"Response JSON data faild to be parsed: {exc=}, {raw_data=}", - failure_type=FailureType.system_error, - exception=exc, - ) + + body_json = self._parse_orjson(raw_data) or self._parse_json(raw_data) + + if body_json is None: + raise AirbyteTracedException( + message="Response JSON data failed to be parsed. See logs for more information.", + internal_message=f"Response JSON data failed to be parsed.", + failure_type=FailureType.system_error, + ) if isinstance(body_json, list): yield from body_json else: yield from [body_json] + def _parse_orjson(self, raw_data: bytes) -> Optional[MutableMapping[str, Any]]: + try: + return orjson.loads(raw_data.decode(self.encoding)) + except Exception as exc: + logger.warning( + f"Failed to parse JSON data using orjson library. Falling back to json library. {exc=}" + ) + return None + + def _parse_json(self, raw_data: bytes) -> Optional[MutableMapping[str, Any]]: + try: + return json.loads(raw_data.decode(self.encoding)) + except Exception as exc: + logger.error(f"Failed to parse JSON data using json library. {exc=}") + return None + @dataclass class JsonLineParser(Parser): diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py index cccb1a835..2c8931383 100644 --- a/unit_tests/sources/declarative/decoders/test_composite_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -4,7 +4,8 @@ import csv import gzip import json -from io import BufferedReader, BytesIO, StringIO +from io import BytesIO, StringIO +from unittest.mock import patch import pytest import requests @@ -16,6 +17,7 @@ JsonLineParser, JsonParser, ) +from airbyte_cdk.utils import AirbyteTracedException def compress_with_gzip(data: str, encoding: str = "utf-8"): @@ -149,3 +151,27 @@ def test_composite_raw_decoder_json_parser(requests_mock, test_data): assert actual == test_data else: assert actual == [test_data] + + +def test_composite_raw_decoder_orjson_parser_error(requests_mock): + raw_data = json.dumps({"test": "test"}).encode("utf-8") + requests_mock.register_uri("GET", "https://airbyte.io/", content=raw_data) + response = requests.get("https://airbyte.io/", stream=True) + + composite_raw_decoder = CompositeRawDecoder(parser=JsonParser(encoding="utf-8")) + + with patch("orjson.loads", side_effect=Exception("test")): + assert [{"test": "test"}] == list(composite_raw_decoder.decode(response)) + + +def test_composite_raw_decoder_raises_traced_exception_when_both_parsers_fail(requests_mock): + raw_data = json.dumps({"test": "test"}).encode("utf-8") + requests_mock.register_uri("GET", "https://airbyte.io/", content=raw_data) + response = requests.get("https://airbyte.io/", stream=True) + + composite_raw_decoder = CompositeRawDecoder(parser=JsonParser(encoding="utf-8")) + + with patch("orjson.loads", side_effect=Exception("test")): + with patch("json.loads", side_effect=Exception("test")): + with pytest.raises(AirbyteTracedException): + list(composite_raw_decoder.decode(response)) From 1c2a832b4e955e3c46933943b43dca57166af92a Mon Sep 17 00:00:00 2001 From: pnilan Date: Tue, 14 Jan 2025 14:13:24 -0800 Subject: [PATCH 16/22] chore: type-check --- .../sources/declarative/decoders/composite_raw_decoder.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index afde39e1b..f3ca6f313 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -69,7 +69,7 @@ def parse(self, data: BufferedIOBase) -> Generator[MutableMapping[str, Any], Non else: yield from [body_json] - def _parse_orjson(self, raw_data: bytes) -> Optional[MutableMapping[str, Any]]: + def _parse_orjson(self, raw_data: bytes) -> Optional[Any]: try: return orjson.loads(raw_data.decode(self.encoding)) except Exception as exc: @@ -78,7 +78,7 @@ def _parse_orjson(self, raw_data: bytes) -> Optional[MutableMapping[str, Any]]: ) return None - def _parse_json(self, raw_data: bytes) -> Optional[MutableMapping[str, Any]]: + def _parse_json(self, raw_data: bytes) -> Optional[Any]: try: return json.loads(raw_data.decode(self.encoding)) except Exception as exc: From 66aaae949ef635b3371ff1d9d608d497148a7c82 Mon Sep 17 00:00:00 2001 From: pnilan Date: Tue, 14 Jan 2025 14:36:52 -0800 Subject: [PATCH 17/22] unlock `CompositeRawDecoder` w/ `JsonParser` support for pagination --- .../parsers/model_to_component_factory.py | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 8fb8dcebc..79e344a4e 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -1037,13 +1037,19 @@ def create_cursor_pagination( self, model: CursorPaginationModel, config: Config, decoder: Decoder, **kwargs: Any ) -> CursorPaginationStrategy: if isinstance(decoder, PaginationDecoderDecorator): - if not isinstance(decoder.decoder, (JsonDecoder, XmlDecoder)): + if not isinstance(decoder.decoder, (JsonDecoder, XmlDecoder)) or ( + isinstance(decoder.decoder, CompositeRawDecoder) + and not isinstance(decoder.decoder.parser, JsonParser) + ): raise ValueError( f"Provided decoder of {type(decoder.decoder)=} is not supported. Please set JsonDecoder or XmlDecoder instead." ) decoder_to_use = decoder else: - if not isinstance(decoder, (JsonDecoder, XmlDecoder)): + if not isinstance(decoder, (JsonDecoder, XmlDecoder)) or ( + isinstance(decoder, CompositeRawDecoder) + and not isinstance(decoder.parser, JsonParser) + ): raise ValueError( f"Provided decoder of {type(decoder)=} is not supported. Please set JsonDecoder or XmlDecoder instead." ) @@ -1520,7 +1526,10 @@ def create_default_paginator( cursor_used_for_stop_condition: Optional[DeclarativeCursor] = None, ) -> Union[DefaultPaginator, PaginatorTestReadDecorator]: if decoder: - if not isinstance(decoder, (JsonDecoder, XmlDecoder)): + if not isinstance(decoder, (JsonDecoder, XmlDecoder)) or ( + isinstance(decoder, CompositeRawDecoder) + and not isinstance(decoder.parser, JsonParser) + ): raise ValueError( f"Provided decoder of {type(decoder)=} is not supported. Please set JsonDecoder or XmlDecoder instead." ) @@ -1942,7 +1951,10 @@ def create_offset_increment( model: OffsetIncrementModel, config: Config, decoder: Decoder, **kwargs: Any ) -> OffsetIncrement: if isinstance(decoder, PaginationDecoderDecorator): - if not isinstance(decoder.decoder, (JsonDecoder, XmlDecoder)): + if not isinstance(decoder.decoder, (JsonDecoder, XmlDecoder)) or ( + isinstance(decoder.decoder, CompositeRawDecoder) + and not isinstance(decoder.decoder.parser, JsonParser) + ): raise ValueError( f"Provided decoder of {type(decoder.decoder)=} is not supported. Please set JsonDecoder or XmlDecoder instead." ) From 00cf7b14ce124b704b5696a7bb5a1e4ae5e2cd3e Mon Sep 17 00:00:00 2001 From: pnilan Date: Wed, 15 Jan 2025 10:45:36 -0800 Subject: [PATCH 18/22] update conditional validations for decoders/parsers for pagination --- .../decoders/composite_raw_decoder.py | 9 ++- .../parsers/model_to_component_factory.py | 80 ++++++++++--------- 2 files changed, 50 insertions(+), 39 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index f3ca6f313..25b384dab 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -47,13 +47,16 @@ def parse( @dataclass class JsonParser(Parser): - encoding: str = "utf-8" + encoding: str def parse(self, data: BufferedIOBase) -> Generator[MutableMapping[str, Any], None, None]: """ Attempts to deserialize data using orjson library. As an extra layer of safety we fallback on the json library to deserialize the data. """ raw_data = data.read() + print("\n\n=====================\n\n") + print(raw_data.decode(self.encoding)) + print("\n\n=====================\n\n") body_json = self._parse_orjson(raw_data) or self._parse_json(raw_data) @@ -74,7 +77,7 @@ def _parse_orjson(self, raw_data: bytes) -> Optional[Any]: return orjson.loads(raw_data.decode(self.encoding)) except Exception as exc: logger.warning( - f"Failed to parse JSON data using orjson library. Falling back to json library. {exc=}" + f"Failed to parse JSON data using orjson library. Falling back to json library. {exc}" ) return None @@ -82,7 +85,7 @@ def _parse_json(self, raw_data: bytes) -> Optional[Any]: try: return json.loads(raw_data.decode(self.encoding)) except Exception as exc: - logger.error(f"Failed to parse JSON data using json library. {exc=}") + logger.error(f"Failed to parse JSON data using json library. {exc}") return None diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 79e344a4e..8a9486f30 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -73,6 +73,7 @@ GzipParser, JsonLineParser, JsonParser, + Parser, ) from airbyte_cdk.sources.declarative.extractors import ( DpathExtractor, @@ -1037,23 +1038,18 @@ def create_cursor_pagination( self, model: CursorPaginationModel, config: Config, decoder: Decoder, **kwargs: Any ) -> CursorPaginationStrategy: if isinstance(decoder, PaginationDecoderDecorator): - if not isinstance(decoder.decoder, (JsonDecoder, XmlDecoder)) or ( - isinstance(decoder.decoder, CompositeRawDecoder) - and not isinstance(decoder.decoder.parser, JsonParser) - ): - raise ValueError( - f"Provided decoder of {type(decoder.decoder)=} is not supported. Please set JsonDecoder or XmlDecoder instead." - ) + inner_decoder = decoder.decoder + else: + inner_decoder = decoder + decoder = PaginationDecoderDecorator(decoder=decoder) + + if self._is_supported_decoder_for_pagination(inner_decoder): decoder_to_use = decoder else: - if not isinstance(decoder, (JsonDecoder, XmlDecoder)) or ( - isinstance(decoder, CompositeRawDecoder) - and not isinstance(decoder.parser, JsonParser) - ): - raise ValueError( - f"Provided decoder of {type(decoder)=} is not supported. Please set JsonDecoder or XmlDecoder instead." - ) - decoder_to_use = PaginationDecoderDecorator(decoder=decoder) + raise ValueError( + f"Provided decoder of {type(inner_decoder)=} is not supported. " + "Please set as JsonDecoder, XmlDecoder, or a CompositeRawDecoder with JsonParser instead." + ) return CursorPaginationStrategy( cursor_value=model.cursor_value, @@ -1526,14 +1522,12 @@ def create_default_paginator( cursor_used_for_stop_condition: Optional[DeclarativeCursor] = None, ) -> Union[DefaultPaginator, PaginatorTestReadDecorator]: if decoder: - if not isinstance(decoder, (JsonDecoder, XmlDecoder)) or ( - isinstance(decoder, CompositeRawDecoder) - and not isinstance(decoder.parser, JsonParser) - ): + if self._is_supported_decoder_for_pagination(decoder): + decoder_to_use = PaginationDecoderDecorator(decoder=decoder) + else: raise ValueError( - f"Provided decoder of {type(decoder)=} is not supported. Please set JsonDecoder or XmlDecoder instead." + f"Provided decoder of {type(decoder)=} is not supported. Please set as JsonDecoder, XmlDecoder, or a CompositeRawDecoder with JsonParser instead." ) - decoder_to_use = PaginationDecoderDecorator(decoder=decoder) else: decoder_to_use = PaginationDecoderDecorator(decoder=JsonDecoder(parameters={})) page_size_option = ( @@ -1764,7 +1758,7 @@ def create_json_decoder(model: JsonDecoderModel, config: Config, **kwargs: Any) @staticmethod def create_json_parser(model: JsonParserModel, config: Config, **kwargs: Any) -> JsonParser: - encoding = model.encoding or "utf-8" + encoding = model.encoding if model.encoding else "utf-8" return JsonParser(encoding=encoding) @staticmethod @@ -1946,25 +1940,23 @@ def create_oauth_authenticator( message_repository=self._message_repository, ) - @staticmethod def create_offset_increment( - model: OffsetIncrementModel, config: Config, decoder: Decoder, **kwargs: Any + self, model: OffsetIncrementModel, config: Config, decoder: Decoder, **kwargs: Any ) -> OffsetIncrement: if isinstance(decoder, PaginationDecoderDecorator): - if not isinstance(decoder.decoder, (JsonDecoder, XmlDecoder)) or ( - isinstance(decoder.decoder, CompositeRawDecoder) - and not isinstance(decoder.decoder.parser, JsonParser) - ): - raise ValueError( - f"Provided decoder of {type(decoder.decoder)=} is not supported. Please set JsonDecoder or XmlDecoder instead." - ) + inner_decoder = decoder.decoder + else: + inner_decoder = decoder + decoder = PaginationDecoderDecorator(decoder=decoder) + + if self._is_supported_decoder_for_pagination(inner_decoder): decoder_to_use = decoder else: - if not isinstance(decoder, (JsonDecoder, XmlDecoder)): - raise ValueError( - f"Provided decoder of {type(decoder)=} is not supported. Please set JsonDecoder or XmlDecoder instead." - ) - decoder_to_use = PaginationDecoderDecorator(decoder=decoder) + raise ValueError( + f"Provided decoder of {type(inner_decoder)=} is not supported. " + "Please set as JsonDecoder, XmlDecoder, or a CompositeRawDecoder with JsonParser instead." + ) + return OffsetIncrement( page_size=model.page_size, config=config, @@ -2553,3 +2545,19 @@ def create_config_components_resolver( components_mapping=components_mapping, parameters=model.parameters or {}, ) + + def _is_supported_decoder_for_pagination(self, decoder: Decoder) -> bool: + if isinstance(decoder, (JsonDecoder, XmlDecoder)): + return True + elif isinstance(decoder, CompositeRawDecoder): + return self._is_supported_parser_for_pagination(decoder.parser) + else: + return False + + def _is_supported_parser_for_pagination(self, parser: Parser) -> bool: + if isinstance(parser, JsonParser): + return True + elif isinstance(parser, GzipParser): + return self._is_supported_parser_for_pagination(parser.inner_parser) + else: + return False From b7aa78fe1bbd9105b6510d245f2ca45fdaee92b2 Mon Sep 17 00:00:00 2001 From: pnilan Date: Wed, 15 Jan 2025 10:47:20 -0800 Subject: [PATCH 19/22] remove errant print --- .../sources/declarative/decoders/composite_raw_decoder.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index 25b384dab..39c9e85f2 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -54,10 +54,6 @@ def parse(self, data: BufferedIOBase) -> Generator[MutableMapping[str, Any], Non Attempts to deserialize data using orjson library. As an extra layer of safety we fallback on the json library to deserialize the data. """ raw_data = data.read() - print("\n\n=====================\n\n") - print(raw_data.decode(self.encoding)) - print("\n\n=====================\n\n") - body_json = self._parse_orjson(raw_data) or self._parse_json(raw_data) if body_json is None: From 7b417327655ce9e69ee863780bdc4ee6f13ec5c5 Mon Sep 17 00:00:00 2001 From: pnilan Date: Wed, 15 Jan 2025 11:07:05 -0800 Subject: [PATCH 20/22] chore: coderabbitai suggestions --- .../decoders/composite_raw_decoder.py | 4 ++-- .../parsers/model_to_component_factory.py | 16 +++++++++------- 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index 39c9e85f2..4d670db11 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -47,7 +47,7 @@ def parse( @dataclass class JsonParser(Parser): - encoding: str + encoding: str = "utf-8" def parse(self, data: BufferedIOBase) -> Generator[MutableMapping[str, Any], None, None]: """ @@ -72,7 +72,7 @@ def _parse_orjson(self, raw_data: bytes) -> Optional[Any]: try: return orjson.loads(raw_data.decode(self.encoding)) except Exception as exc: - logger.warning( + logger.debug( f"Failed to parse JSON data using orjson library. Falling back to json library. {exc}" ) return None diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 8a9486f30..79e039232 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -1047,8 +1047,7 @@ def create_cursor_pagination( decoder_to_use = decoder else: raise ValueError( - f"Provided decoder of {type(inner_decoder)=} is not supported. " - "Please set as JsonDecoder, XmlDecoder, or a CompositeRawDecoder with JsonParser instead." + self._UNSUPPORTED_DECODER_ERROR.format(decoder_type=type(inner_decoder)) ) return CursorPaginationStrategy( @@ -1525,9 +1524,7 @@ def create_default_paginator( if self._is_supported_decoder_for_pagination(decoder): decoder_to_use = PaginationDecoderDecorator(decoder=decoder) else: - raise ValueError( - f"Provided decoder of {type(decoder)=} is not supported. Please set as JsonDecoder, XmlDecoder, or a CompositeRawDecoder with JsonParser instead." - ) + raise ValueError(self._UNSUPPORTED_DECODER_ERROR.format(decoder_type=type(decoder))) else: decoder_to_use = PaginationDecoderDecorator(decoder=JsonDecoder(parameters={})) page_size_option = ( @@ -1953,8 +1950,7 @@ def create_offset_increment( decoder_to_use = decoder else: raise ValueError( - f"Provided decoder of {type(inner_decoder)=} is not supported. " - "Please set as JsonDecoder, XmlDecoder, or a CompositeRawDecoder with JsonParser instead." + self._UNSUPPORTED_DECODER_ERROR.format(decoder_type=type(inner_decoder)) ) return OffsetIncrement( @@ -2546,6 +2542,12 @@ def create_config_components_resolver( parameters=model.parameters or {}, ) + _UNSUPPORTED_DECODER_ERROR = ( + "Specified decoder of {decoder_type} is not supported for pagination." + "Please set as `JsonDecoder`, `XmlDecoder`, or a `CompositeRawDecoder` with an inner_parser of `JsonParser` or `GzipParser` instead." + "If using `GzipParser`, please ensure that the lowest level inner_parser is a `JsonParser`." + ) + def _is_supported_decoder_for_pagination(self, decoder: Decoder) -> bool: if isinstance(decoder, (JsonDecoder, XmlDecoder)): return True From 3f550f230ddcb8d1ef5e26cba9a8374510c54abb Mon Sep 17 00:00:00 2001 From: pnilan Date: Wed, 15 Jan 2025 12:04:25 -0800 Subject: [PATCH 21/22] update parservalidation method --- .../sources/declarative/parsers/model_to_component_factory.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 79e039232..102ff3d52 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -2560,6 +2560,6 @@ def _is_supported_parser_for_pagination(self, parser: Parser) -> bool: if isinstance(parser, JsonParser): return True elif isinstance(parser, GzipParser): - return self._is_supported_parser_for_pagination(parser.inner_parser) + return isinstance(parser.inner_parser, JsonParser) else: return False From bb63934ba6d2c9eb54f1d8bc3e6d9beb64599d3d Mon Sep 17 00:00:00 2001 From: Patrick Nilan Date: Wed, 15 Jan 2025 18:48:52 -0800 Subject: [PATCH 22/22] Update airbyte_cdk/sources/declarative/declarative_component_schema.yaml Co-authored-by: Natik Gadzhi --- .../sources/declarative/declarative_component_schema.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index dbbd3e743..272fad750 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -2908,7 +2908,6 @@ definitions: title: JsonParser description: Parser used for parsing str, bytes, or bytearray data and returning data in a dictionary format. type: object - additionalProperties: true required: - type properties: