Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add JsonParser component to declarative framework #166

Merged
merged 27 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
e68f36f
initial JsonParser component
pnilan Dec 10, 2024
a8a7bb3
update parser
pnilan Dec 11, 2024
254f877
add tests for json parser
pnilan Dec 11, 2024
8df239a
update parser and tests to yield empty dict if unparseable.
pnilan Dec 11, 2024
92574df
chore: format code
pnilan Dec 11, 2024
82a15c9
Merge branch 'main' into pnilan/declarative/parsers
pnilan Dec 12, 2024
0b3b5e1
Merge branch 'main' into pnilan/declarative/parsers
pnilan Jan 10, 2025
9fd93cb
conform tests
pnilan Jan 10, 2025
1892a03
initial test updates
pnilan Jan 10, 2025
51118f1
update JsonParser and relevant tests
pnilan Jan 10, 2025
34a710d
chore: format/type-check
pnilan Jan 10, 2025
060178a
remove orjson from composite_raw_decoder file
pnilan Jan 14, 2025
bf8dd26
Merge branch 'main' into pnilan/declarative/parsers
pnilan Jan 14, 2025
d9b6df3
chore: format code
pnilan Jan 14, 2025
f20fffc
add additional test
pnilan Jan 14, 2025
9ce2c28
update to fallback to json library if orjson fails, update test to us…
pnilan Jan 14, 2025
7e7b2c4
add `JsonParser` to GzipDecoder and CompositeRawDecoder "anyOf" list
pnilan Jan 14, 2025
23cbfb7
update to simplify orjson/json parsing
pnilan Jan 14, 2025
1c2a832
chore: type-check
pnilan Jan 14, 2025
66aaae9
unlock `CompositeRawDecoder` w/ `JsonParser` support for pagination
pnilan Jan 14, 2025
00cf7b1
update conditional validations for decoders/parsers for pagination
pnilan Jan 15, 2025
b7aa78f
remove errant print
pnilan Jan 15, 2025
7b41732
chore: coderabbitai suggestions
pnilan Jan 15, 2025
3f550f2
update parservalidation method
pnilan Jan 15, 2025
27bf5a7
Merge branch 'main' into pnilan/declarative/parsers
pnilan Jan 15, 2025
e691f79
Merge branch 'main' into pnilan/declarative/parsers
natikgadzhi Jan 15, 2025
bb63934
Update airbyte_cdk/sources/declarative/declarative_component_schema.yaml
pnilan Jan 16, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1810,6 +1810,17 @@ definitions:
$parameters:
type: object
additionalProperties: true
JsonParser:
title: JsonParser
description: Parser used for parsing str, bytes, or bytearray data and returning data in a dictionary format.
type: object
additionalProperties: true
required:
- type
properties:
type:
type: string
enum: [JsonParser]
ListPartitionRouter:
title: List Partition Router
description: A Partition router that specifies a list of attributes where each attribute describes a portion of the complete data set for a stream. During a sync, each value is iterated over and can be used as input to outbound API requests.
Expand Down
7 changes: 7 additions & 0 deletions airbyte_cdk/sources/declarative/decoders/parsers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

from airbyte_cdk.sources.declarative.decoders.parsers.parsers import Parser, JsonParser

__all__ = ["Parser", "JsonParser"]
27 changes: 27 additions & 0 deletions airbyte_cdk/sources/declarative/decoders/parsers/parsers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

import json
from abc import abstractmethod
from dataclasses import dataclass
from typing import Any, Generator, MutableMapping, Union


@dataclass
class Parser:
"""
Parser strategy to convert str, bytes, or bytearray data into MutableMapping[str, Any].
"""

@abstractmethod
def parse(self, data: bytes) -> Generator[MutableMapping[str, Any], None, None]:
pass


class JsonParser(Parser):
"""
Parser strategy for converting JSON-structure str, bytes, or bytearray data into MutableMapping[str, Any].
"""
def parse(self, data: Union[str, bytes, bytearray]) -> Generator[MutableMapping[str, Any], None, None]:
yield json.loads(data)
103 changes: 66 additions & 37 deletions airbyte_cdk/sources/declarative/models/declarative_component_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,9 @@ class OAuthAuthenticator(BaseModel):
scopes: Optional[List[str]] = Field(
None,
description="List of scopes that should be granted to the access token.",
examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]],
examples=[
["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]
],
title="Scopes",
)
token_expiry_date: Optional[str] = Field(
Expand Down Expand Up @@ -715,6 +717,13 @@ class Config:
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class JsonParser(BaseModel):
class Config:
extra = Extra.allow

type: Literal["JsonParser"]


class MinMaxDatetime(BaseModel):
type: Literal["MinMaxDatetime"]
datetime: str = Field(
Expand Down Expand Up @@ -822,13 +831,13 @@ class Config:
)
extract_output: List[str] = Field(
...,
description="The DeclarativeOAuth Specific list of strings to indicate which keys should be extracted and returned back to the input config. ",
description="The DeclarativeOAuth Specific list of strings to indicate which keys should be extracted and returned back to the input config.",
examples=[{"extract_output": ["access_token", "refresh_token", "other_field"]}],
title="DeclarativeOAuth Extract Output",
)
state: Optional[State] = Field(
None,
description="The DeclarativeOAuth Specific object to provide the criteria of how the `state` query param should be constructed,\nincluding length and complexity. ",
description="The DeclarativeOAuth Specific object to provide the criteria of how the `state` query param should be constructed,\nincluding length and complexity.",
examples=[{"state": {"min": 7, "max": 128}}],
title="(Optional) DeclarativeOAuth Configurable State Query Param",
)
Expand All @@ -852,13 +861,13 @@ class Config:
)
state_key: Optional[str] = Field(
None,
description="The DeclarativeOAuth Specific optional override to provide the custom `state` key name, if required by data-provider. ",
description="The DeclarativeOAuth Specific optional override to provide the custom `state` key name, if required by data-provider.",
examples=[{"state_key": "my_custom_state_key_key_name"}],
title="(Optional) DeclarativeOAuth State Key Override",
)
auth_code_key: Optional[str] = Field(
None,
description="The DeclarativeOAuth Specific optional override to provide the custom `code` key name to something like `auth_code` or `custom_auth_code`, if required by data-provider. ",
description="The DeclarativeOAuth Specific optional override to provide the custom `code` key name to something like `auth_code` or `custom_auth_code`, if required by data-provider.",
examples=[{"auth_code_key": "my_custom_auth_code_key_name"}],
title="(Optional) DeclarativeOAuth Auth Code Key Override",
)
Expand All @@ -874,24 +883,28 @@ class OAuthConfigSpecification(BaseModel):
class Config:
extra = Extra.allow

oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field(
None,
description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }",
examples=[
{"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}},
{
"app_id": {
"type": "string",
"path_in_connector_config": ["info", "app_id"],
}
},
],
title="OAuth user input",
oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = (
Field(
None,
description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }",
examples=[
{"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}},
{
"app_id": {
"type": "string",
"path_in_connector_config": ["info", "app_id"],
}
},
],
title="OAuth user input",
)
)
oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field(
None,
description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{my_var}`.\n- The nested resolution variables like `{{my_nested_var}}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {base64Encoder:{my_var_a}:{my_var_b}}\n + base64Decorer - decode from `base64` encoded string, {base64Decoder:{my_string_variable_or_string_value}}\n + urlEncoder - encode the input string to URL-like format, {urlEncoder:https://test.host.com/endpoint}\n + urlDecorer - decode the input url-encoded string into text format, {urlDecoder:https%3A%2F%2Fairbyte.io}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {codeChallengeS256:{state_value}}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{client_id_key}={{client_id_key}}&{redirect_uri_key}={urlEncoder:{{redirect_uri_key}}}&{state_key}={{state_key}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{auth_code_key}": "{{auth_code_key}}",\n "{client_id_key}": "{{client_id_key}}",\n "{client_secret_key}": "{{client_secret_key}}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }',
title="DeclarativeOAuth Connector Specification",
oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = (
Field(
None,
description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{my_var}`.\n- The nested resolution variables like `{{my_nested_var}}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {base64Encoder:{my_var_a}:{my_var_b}}\n + base64Decorer - decode from `base64` encoded string, {base64Decoder:{my_string_variable_or_string_value}}\n + urlEncoder - encode the input string to URL-like format, {urlEncoder:https://test.host.com/endpoint}\n + urlDecorer - decode the input url-encoded string into text format, {urlDecoder:https%3A%2F%2Fairbyte.io}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {codeChallengeS256:{state_value}}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{client_id_key}={{client_id_key}}&{redirect_uri_key}={urlEncoder:{{redirect_uri_key}}}&{state_key}={{state_key}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{auth_code_key}": "{{auth_code_key}}",\n "{client_id_key}": "{{client_id_key}}",\n "{client_secret_key}": "{{client_secret_key}}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }',
title="DeclarativeOAuth Connector Specification",
)
)
complete_oauth_output_specification: Optional[Dict[str, Any]] = Field(
None,
Expand All @@ -909,7 +922,9 @@ class Config:
complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field(
None,
description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }",
examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}],
examples=[
{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}
],
title="OAuth input specification",
)
complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field(
Expand Down Expand Up @@ -1600,21 +1615,25 @@ class Config:
description="Component used to coordinate how records are extracted across stream slices and request pages.",
title="Retriever",
)
incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = Field(
None,
description="Component used to fetch data incrementally based on a time field in the data.",
title="Incremental Sync",
incremental_sync: Optional[Union[CustomIncrementalSync, DatetimeBasedCursor]] = (
Field(
None,
description="Component used to fetch data incrementally based on a time field in the data.",
title="Incremental Sync",
)
)
name: Optional[str] = Field(
"", description="The stream name.", example=["Users"], title="Name"
)
name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name")
primary_key: Optional[PrimaryKey] = Field(
"", description="The primary key of the stream.", title="Primary Key"
)
schema_loader: Optional[Union[InlineSchemaLoader, JsonFileSchemaLoader, CustomSchemaLoader]] = (
Field(
None,
description="Component used to retrieve the schema for the current stream.",
title="Schema Loader",
)
schema_loader: Optional[
Union[InlineSchemaLoader, JsonFileSchemaLoader, CustomSchemaLoader]
] = Field(
None,
description="Component used to retrieve the schema for the current stream.",
title="Schema Loader",
)
transformations: Optional[
List[Union[AddFields, CustomTransformation, RemoveFields, KeysToLower]]
Expand Down Expand Up @@ -1832,7 +1851,11 @@ class SimpleRetriever(BaseModel):
CustomPartitionRouter,
ListPartitionRouter,
SubstreamPartitionRouter,
List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]],
List[
Union[
CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter
]
],
]
] = Field(
[],
Expand Down Expand Up @@ -1874,7 +1897,9 @@ class AsyncRetriever(BaseModel):
)
download_extractor: Optional[
Union[CustomRecordExtractor, DpathExtractor, ResponseToFileExtractor]
] = Field(None, description="Responsible for fetching the records from provided urls.")
] = Field(
None, description="Responsible for fetching the records from provided urls."
)
creation_requester: Union[CustomRequester, HttpRequester] = Field(
...,
description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.",
Expand Down Expand Up @@ -1904,7 +1929,11 @@ class AsyncRetriever(BaseModel):
CustomPartitionRouter,
ListPartitionRouter,
SubstreamPartitionRouter,
List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]],
List[
Union[
CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter
]
],
]
] = Field(
[],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
PaginationDecoderDecorator,
XmlDecoder,
)
from airbyte_cdk.sources.declarative.decoders.parsers import JsonParser
from airbyte_cdk.sources.declarative.extractors import (
DpathExtractor,
RecordFilter,
Expand Down Expand Up @@ -218,6 +219,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
JsonlDecoder as JsonlDecoderModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
JsonParser as JsonParserModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
JwtAuthenticator as JwtAuthenticatorModel,
)
Expand Down Expand Up @@ -450,6 +454,7 @@ def _init_mappings(self) -> None:
InlineSchemaLoaderModel: self.create_inline_schema_loader,
JsonDecoderModel: self.create_json_decoder,
JsonlDecoderModel: self.create_jsonl_decoder,
JsonParser: self.create_json_parser,
GzipJsonDecoderModel: self.create_gzipjson_decoder,
KeysToLowerModel: self.create_keys_to_lower_transformation,
IterableDecoderModel: self.create_iterable_decoder,
Expand Down Expand Up @@ -1600,6 +1605,12 @@ def create_gzipjson_decoder(
) -> GzipJsonDecoder:
return GzipJsonDecoder(parameters={}, encoding=model.encoding)

@staticmethod
def create_json_parser(
model: JsonParserModel, config: Config, **kwargs: Any
) -> JsonParser:
return JsonParser(parameters={})

@staticmethod
def create_json_file_schema_loader(
model: JsonFileSchemaLoaderModel, config: Config, **kwargs: Any
Expand Down
3 changes: 3 additions & 0 deletions unit_tests/sources/declarative/decoders/parsers/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
22 changes: 22 additions & 0 deletions unit_tests/sources/declarative/decoders/parsers/test_parsers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#

import json

import pytest

from airbyte_cdk.sources.declarative.decoders.parsers import JsonParser


@pytest.mark.parametrize(
"raw_data, expected",
[
(json.dumps({"data-type": "string"}), {"data-type": "string"}),
(json.dumps({"data-type": "bytes"}).encode("utf-8"), {"data-type": "bytes"}),
(bytearray(json.dumps({"data-type": "bytearray"}).encode("utf-8")), {"data-type": "bytearray"}),
],
ids=["test_with_str", "test_with_bytes", "test_with_bytearray"]
)
def test_json_parser_with_valid_data(raw_data, expected):
assert next(JsonParser().parse(raw_data)) == expected
Loading