diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 71a3b8084..272fad750 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -678,7 +678,7 @@ definitions: properties: type: type: string - enum: [ CustomSchemaNormalization ] + enum: [CustomSchemaNormalization] class_name: title: Class Name description: Fully-qualified name of the class that will be implementing the custom normalization. The format is `source_..`. @@ -2886,6 +2886,7 @@ definitions: parser: anyOf: - "$ref": "#/definitions/GzipParser" + - "$ref": "#/definitions/JsonParser" - "$ref": "#/definitions/JsonLineParser" - "$ref": "#/definitions/CsvParser" # PARSERS @@ -2902,6 +2903,20 @@ definitions: anyOf: - "$ref": "#/definitions/JsonLineParser" - "$ref": "#/definitions/CsvParser" + - "$ref": "#/definitions/JsonParser" + JsonParser: + title: JsonParser + description: Parser used for parsing str, bytes, or bytearray data and returning data in a dictionary format. + type: object + required: + - type + properties: + type: + type: string + enum: [JsonParser] + encoding: + type: string + default: utf-8 JsonLineParser: type: object required: diff --git a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py index 49653296d..4d670db11 100644 --- a/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py +++ b/airbyte_cdk/sources/declarative/decoders/composite_raw_decoder.py @@ -7,9 +7,12 @@ from io import BufferedIOBase, TextIOWrapper from typing import Any, Generator, MutableMapping, Optional +import orjson import requests +from airbyte_cdk.models import FailureType from airbyte_cdk.sources.declarative.decoders.decoder import Decoder +from airbyte_cdk.utils import AirbyteTracedException logger = logging.getLogger("airbyte") @@ -42,6 +45,46 @@ def parse( yield from self.inner_parser.parse(gzipobj) +@dataclass +class JsonParser(Parser): + encoding: str = "utf-8" + + def parse(self, data: BufferedIOBase) -> Generator[MutableMapping[str, Any], None, None]: + """ + Attempts to deserialize data using orjson library. As an extra layer of safety we fallback on the json library to deserialize the data. + """ + raw_data = data.read() + body_json = self._parse_orjson(raw_data) or self._parse_json(raw_data) + + if body_json is None: + raise AirbyteTracedException( + message="Response JSON data failed to be parsed. See logs for more information.", + internal_message=f"Response JSON data failed to be parsed.", + failure_type=FailureType.system_error, + ) + + if isinstance(body_json, list): + yield from body_json + else: + yield from [body_json] + + def _parse_orjson(self, raw_data: bytes) -> Optional[Any]: + try: + return orjson.loads(raw_data.decode(self.encoding)) + except Exception as exc: + logger.debug( + f"Failed to parse JSON data using orjson library. Falling back to json library. {exc}" + ) + return None + + def _parse_json(self, raw_data: bytes) -> Optional[Any]: + try: + return json.loads(raw_data.decode(self.encoding)) + except Exception as exc: + logger.error(f"Failed to parse JSON data using json library. {exc}") + return None + + @dataclass class JsonLineParser(Parser): encoding: Optional[str] = "utf-8" diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 3bce45d4a..2fc8b6b42 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1201,6 +1201,14 @@ class LegacySessionTokenAuthenticator(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") +class JsonParser(BaseModel): + class Config: + extra = Extra.allow + + type: Literal["JsonParser"] + encoding: Optional[str] = "utf-8" + + class JsonLineParser(BaseModel): type: Literal["JsonLineParser"] encoding: Optional[str] = "utf-8" @@ -1599,7 +1607,7 @@ class RecordSelector(BaseModel): class GzipParser(BaseModel): type: Literal["GzipParser"] - inner_parser: Union[JsonLineParser, CsvParser] + inner_parser: Union[JsonLineParser, CsvParser, JsonParser] class Spec(BaseModel): @@ -1634,7 +1642,7 @@ class CompositeErrorHandler(BaseModel): class CompositeRawDecoder(BaseModel): type: Literal["CompositeRawDecoder"] - parser: Union[GzipParser, JsonLineParser, CsvParser] + parser: Union[GzipParser, JsonParser, JsonLineParser, CsvParser] class DeclarativeSource1(BaseModel): diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index a40022587..4bcd18c1e 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -72,6 +72,8 @@ CsvParser, GzipParser, JsonLineParser, + JsonParser, + Parser, ) from airbyte_cdk.sources.declarative.extractors import ( DpathExtractor, @@ -247,6 +249,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( JsonLineParser as JsonLineParserModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + JsonParser as JsonParserModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( JwtAuthenticator as JwtAuthenticatorModel, ) @@ -522,6 +527,7 @@ def _init_mappings(self) -> None: JsonDecoderModel: self.create_json_decoder, JsonlDecoderModel: self.create_jsonl_decoder, JsonLineParserModel: self.create_json_line_parser, + JsonParserModel: self.create_json_parser, GzipJsonDecoderModel: self.create_gzipjson_decoder, GzipParserModel: self.create_gzip_parser, KeysToLowerModel: self.create_keys_to_lower_transformation, @@ -1032,17 +1038,17 @@ def create_cursor_pagination( self, model: CursorPaginationModel, config: Config, decoder: Decoder, **kwargs: Any ) -> CursorPaginationStrategy: if isinstance(decoder, PaginationDecoderDecorator): - if not isinstance(decoder.decoder, (JsonDecoder, XmlDecoder)): - raise ValueError( - f"Provided decoder of {type(decoder.decoder)=} is not supported. Please set JsonDecoder or XmlDecoder instead." - ) + inner_decoder = decoder.decoder + else: + inner_decoder = decoder + decoder = PaginationDecoderDecorator(decoder=decoder) + + if self._is_supported_decoder_for_pagination(inner_decoder): decoder_to_use = decoder else: - if not isinstance(decoder, (JsonDecoder, XmlDecoder)): - raise ValueError( - f"Provided decoder of {type(decoder)=} is not supported. Please set JsonDecoder or XmlDecoder instead." - ) - decoder_to_use = PaginationDecoderDecorator(decoder=decoder) + raise ValueError( + self._UNSUPPORTED_DECODER_ERROR.format(decoder_type=type(inner_decoder)) + ) return CursorPaginationStrategy( cursor_value=model.cursor_value, @@ -1515,11 +1521,10 @@ def create_default_paginator( cursor_used_for_stop_condition: Optional[DeclarativeCursor] = None, ) -> Union[DefaultPaginator, PaginatorTestReadDecorator]: if decoder: - if not isinstance(decoder, (JsonDecoder, XmlDecoder)): - raise ValueError( - f"Provided decoder of {type(decoder)=} is not supported. Please set JsonDecoder or XmlDecoder instead." - ) - decoder_to_use = PaginationDecoderDecorator(decoder=decoder) + if self._is_supported_decoder_for_pagination(decoder): + decoder_to_use = PaginationDecoderDecorator(decoder=decoder) + else: + raise ValueError(self._UNSUPPORTED_DECODER_ERROR.format(decoder_type=type(decoder))) else: decoder_to_use = PaginationDecoderDecorator(decoder=JsonDecoder(parameters={})) page_size_option = ( @@ -1748,6 +1753,11 @@ def create_dynamic_schema_loader( def create_json_decoder(model: JsonDecoderModel, config: Config, **kwargs: Any) -> JsonDecoder: return JsonDecoder(parameters={}) + @staticmethod + def create_json_parser(model: JsonParserModel, config: Config, **kwargs: Any) -> JsonParser: + encoding = model.encoding if model.encoding else "utf-8" + return JsonParser(encoding=encoding) + @staticmethod def create_jsonl_decoder( model: JsonlDecoderModel, config: Config, **kwargs: Any @@ -1940,22 +1950,22 @@ def create_oauth_authenticator( message_repository=self._message_repository, ) - @staticmethod def create_offset_increment( - model: OffsetIncrementModel, config: Config, decoder: Decoder, **kwargs: Any + self, model: OffsetIncrementModel, config: Config, decoder: Decoder, **kwargs: Any ) -> OffsetIncrement: if isinstance(decoder, PaginationDecoderDecorator): - if not isinstance(decoder.decoder, (JsonDecoder, XmlDecoder)): - raise ValueError( - f"Provided decoder of {type(decoder.decoder)=} is not supported. Please set JsonDecoder or XmlDecoder instead." - ) + inner_decoder = decoder.decoder + else: + inner_decoder = decoder + decoder = PaginationDecoderDecorator(decoder=decoder) + + if self._is_supported_decoder_for_pagination(inner_decoder): decoder_to_use = decoder else: - if not isinstance(decoder, (JsonDecoder, XmlDecoder)): - raise ValueError( - f"Provided decoder of {type(decoder)=} is not supported. Please set JsonDecoder or XmlDecoder instead." - ) - decoder_to_use = PaginationDecoderDecorator(decoder=decoder) + raise ValueError( + self._UNSUPPORTED_DECODER_ERROR.format(decoder_type=type(inner_decoder)) + ) + return OffsetIncrement( page_size=model.page_size, config=config, @@ -2555,3 +2565,25 @@ def create_config_components_resolver( components_mapping=components_mapping, parameters=model.parameters or {}, ) + + _UNSUPPORTED_DECODER_ERROR = ( + "Specified decoder of {decoder_type} is not supported for pagination." + "Please set as `JsonDecoder`, `XmlDecoder`, or a `CompositeRawDecoder` with an inner_parser of `JsonParser` or `GzipParser` instead." + "If using `GzipParser`, please ensure that the lowest level inner_parser is a `JsonParser`." + ) + + def _is_supported_decoder_for_pagination(self, decoder: Decoder) -> bool: + if isinstance(decoder, (JsonDecoder, XmlDecoder)): + return True + elif isinstance(decoder, CompositeRawDecoder): + return self._is_supported_parser_for_pagination(decoder.parser) + else: + return False + + def _is_supported_parser_for_pagination(self, parser: Parser) -> bool: + if isinstance(parser, JsonParser): + return True + elif isinstance(parser, GzipParser): + return isinstance(parser.inner_parser, JsonParser) + else: + return False diff --git a/docs/RELEASES.md b/docs/RELEASES.md index cf74d4232..1f53c7256 100644 --- a/docs/RELEASES.md +++ b/docs/RELEASES.md @@ -9,9 +9,9 @@ A few seconds after any PR is merged to `main` , a release draft will be created 3. Optionally tweak the text in the release notes - for instance to call out contributors, to make a specific change more intuitive for readers to understand, or to move updates into a different category than they were assigned by default. (Note: You can also do this retroactively after publishing the release.) 4. Publish the release by pressing the “Publish release” button. -*Note:* +_Note:_ -- *Only maintainers can see release drafts. Non-maintainers will only see published releases.* +- _Only maintainers can see release drafts. Non-maintainers will only see published releases._ - If you create a tag on accident that you need to remove, contact a maintainer to delete the tag and the release. - You can monitor the PyPI release process here in the GitHub Actions view: https://github.com/airbytehq/airbyte-python-cdk/actions/workflows/pypi_publish.yml @@ -49,7 +49,7 @@ The first option is to look in the `declarative_manifest_image_version` database If that is not available as an option, you can run an Builder-created connector in Cloud and note the version number printed in the logs. Warning: this may not be indicative if that connector instance has been manually pinned to a specific version. -TODO: Would be great to find a way to inspect directly without requiring direct prod DB access. +TODO: Would be great to find a way to inspect directly without requiring direct prod DB access. ### How to pretest changes to SDM images manually @@ -57,15 +57,15 @@ To manually test changes against a dev image of SDM before committing to a relea #### Pretesting Manifest-Only connectors -Once the publish pipeline has completed, choose a connector to test. Set the base_image in the connector's metadata to your pre-release version in Dockerhub (make sure to update the SHA as well). -Next, build the pre-release image locally using `airbyte-ci connectors —name= build`. +Once the publish pipeline has completed, choose a connector to test. Set the base_image in the connector's metadata to your pre-release version in Dockerhub (make sure to update the SHA as well). +Next, build the pre-release image locally using `airbyte-ci connectors —name= build`. You can now run connector interfaces against the built image using the pattern
`docker run airbyte/:dev `. The connector's README should include a list of these commands, which can be copy/pasted and run from the connector's directory for quick testing against a local config. You can also run `airbyte-ci connectors —name= test` to run the CI test suite against the dev image. #### Pretesting Low-Code Python connectors -Once the publish pipeline has completed, set the version of `airbyte-cdk` in the connector's pyproject.toml file to the pre-release version in PyPI. +Once the publish pipeline has completed, set the version of `airbyte-cdk` in the connector's pyproject.toml file to the pre-release version in PyPI. Update the lockfile and run connector interfaces via poetry:
`poetry run source- spec/check/discover/read`. You can also run `airbyte-ci connectors —name= test` to run the CI test suite against the dev image.

 diff --git a/unit_tests/sources/declarative/decoders/test_composite_decoder.py b/unit_tests/sources/declarative/decoders/test_composite_decoder.py index 9cd057791..2c8931383 100644 --- a/unit_tests/sources/declarative/decoders/test_composite_decoder.py +++ b/unit_tests/sources/declarative/decoders/test_composite_decoder.py @@ -5,6 +5,7 @@ import gzip import json from io import BytesIO, StringIO +from unittest.mock import patch import pytest import requests @@ -14,7 +15,9 @@ CsvParser, GzipParser, JsonLineParser, + JsonParser, ) +from airbyte_cdk.utils import AirbyteTracedException def compress_with_gzip(data: str, encoding: str = "utf-8"): @@ -117,3 +120,58 @@ def test_composite_raw_decoder_jsonline_parser(requests_mock, encoding: str): for _ in composite_raw_decoder.decode(response): counter += 1 assert counter == 3 + + +@pytest.mark.parametrize( + "test_data", + [ + ({"data-type": "string"}), + ([{"id": "1"}, {"id": "2"}]), + ({"id": "170141183460469231731687303715884105727"}), + ({}), + ({"nested": {"foo": {"bar": "baz"}}}), + ], + ids=[ + "valid_dict", + "list_of_dicts", + "int128", + "empty_object", + "nested_structure", + ], +) +def test_composite_raw_decoder_json_parser(requests_mock, test_data): + encodings = ["utf-8", "utf", "iso-8859-1"] + for encoding in encodings: + raw_data = json.dumps(test_data).encode(encoding=encoding) + requests_mock.register_uri("GET", "https://airbyte.io/", content=raw_data) + response = requests.get("https://airbyte.io/", stream=True) + composite_raw_decoder = CompositeRawDecoder(parser=JsonParser(encoding=encoding)) + actual = list(composite_raw_decoder.decode(response)) + if isinstance(test_data, list): + assert actual == test_data + else: + assert actual == [test_data] + + +def test_composite_raw_decoder_orjson_parser_error(requests_mock): + raw_data = json.dumps({"test": "test"}).encode("utf-8") + requests_mock.register_uri("GET", "https://airbyte.io/", content=raw_data) + response = requests.get("https://airbyte.io/", stream=True) + + composite_raw_decoder = CompositeRawDecoder(parser=JsonParser(encoding="utf-8")) + + with patch("orjson.loads", side_effect=Exception("test")): + assert [{"test": "test"}] == list(composite_raw_decoder.decode(response)) + + +def test_composite_raw_decoder_raises_traced_exception_when_both_parsers_fail(requests_mock): + raw_data = json.dumps({"test": "test"}).encode("utf-8") + requests_mock.register_uri("GET", "https://airbyte.io/", content=raw_data) + response = requests.get("https://airbyte.io/", stream=True) + + composite_raw_decoder = CompositeRawDecoder(parser=JsonParser(encoding="utf-8")) + + with patch("orjson.loads", side_effect=Exception("test")): + with patch("json.loads", side_effect=Exception("test")): + with pytest.raises(AirbyteTracedException): + list(composite_raw_decoder.decode(response))