From 7ff1b7def2edb9aa3e63fd6655b9d5af8a8ceec3 Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Thu, 27 Mar 2025 21:45:34 +0100 Subject: [PATCH 1/7] Update check streams to use dynamic streams configs --- .../sources/declarative/checks/__init__.py | 4 +- .../declarative/checks/check_stream.py | 59 +++- .../declarative_component_schema.yaml | 28 +- .../models/declarative_component_schema.py | 118 +++++--- .../parsers/model_to_component_factory.py | 28 +- .../declarative/checks/test_check_stream.py | 276 +++++++++++++++++- 6 files changed, 457 insertions(+), 56 deletions(-) diff --git a/airbyte_cdk/sources/declarative/checks/__init__.py b/airbyte_cdk/sources/declarative/checks/__init__.py index 6362e0613..fb3aece0e 100644 --- a/airbyte_cdk/sources/declarative/checks/__init__.py +++ b/airbyte_cdk/sources/declarative/checks/__init__.py @@ -7,7 +7,7 @@ from pydantic.v1 import BaseModel from airbyte_cdk.sources.declarative.checks.check_dynamic_stream import CheckDynamicStream -from airbyte_cdk.sources.declarative.checks.check_stream import CheckStream +from airbyte_cdk.sources.declarative.checks.check_stream import CheckStream, DynamicStreamCheckConfig from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker from airbyte_cdk.sources.declarative.models import ( CheckDynamicStream as CheckDynamicStreamModel, @@ -21,4 +21,4 @@ "CheckDynamicStream": CheckDynamicStreamModel, } -__all__ = ["CheckStream", "CheckDynamicStream", "ConnectionChecker"] +__all__ = ["CheckStream", "CheckDynamicStream", "ConnectionChecker", "DynamicStreamCheckConfig"] diff --git a/airbyte_cdk/sources/declarative/checks/check_stream.py b/airbyte_cdk/sources/declarative/checks/check_stream.py index c45159ec9..49c1176ad 100644 --- a/airbyte_cdk/sources/declarative/checks/check_stream.py +++ b/airbyte_cdk/sources/declarative/checks/check_stream.py @@ -8,10 +8,21 @@ from typing import Any, List, Mapping, Tuple from airbyte_cdk import AbstractSource +from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy +@dataclass(frozen=True) +class DynamicStreamCheckConfig: + """Defines the configuration for dynamic stream during connection checking. This class specifies + what dynamic streams in the stream template should be updated with value, supporting dynamic interpolation + and type enforcement.""" + + dynamic_stream_name: str + stream_count: int = 0 + + @dataclass class CheckStream(ConnectionChecker): """ @@ -22,6 +33,7 @@ class CheckStream(ConnectionChecker): """ stream_names: List[str] + dynamic_streams_check_configs: List[DynamicStreamCheckConfig] parameters: InitVar[Mapping[str, Any]] def __post_init__(self, parameters: Mapping[str, Any]) -> None: @@ -30,7 +42,15 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: def check_connection( self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any] ) -> Tuple[bool, Any]: - streams = source.streams(config=config) + try: + streams = source.streams(config=config) + except Exception as error: + error_message = ( + f"Encountered an error trying to connect to streams. Error: {error}" + ) + logger.error(error_message, exc_info=True) + return False, error_message + stream_name_to_stream = {s.name: s for s in streams} if len(streams) == 0: return False, f"No streams to connect to from source {source}" @@ -53,4 +73,41 @@ def check_connection( f"Encountered an error trying to connect to stream {stream_name}. Error: \n {traceback.format_exc()}" ) return False, f"Unable to connect to stream {stream_name} - {error}" + + if isinstance(source, ManifestDeclarativeSource) and self.dynamic_streams_check_configs: + dynamic_stream_name_to_dynamic_stream = {dynamic_stream["name"]: dynamic_stream for dynamic_stream in + source.resolved_manifest.get("dynamic_streams", [])} + + dynamic_stream_name_to_generated_streams = {} + for stream in source.dynamic_streams: + dynamic_stream_name_to_generated_streams[ + stream["dynamic_stream_name"]] = dynamic_stream_name_to_generated_streams.setdefault( + stream["dynamic_stream_name"], []).append(stream) + + for dynamic_streams_check_config in self.dynamic_streams_check_configs: + dynamic_stream = dynamic_stream_name_to_dynamic_stream.get(dynamic_streams_check_config.dynamic_stream_name) + + is_config_depend = dynamic_stream["components_resolver"]["type"] == "ConfigComponentsResolver" + + if not is_config_depend and not bool(dynamic_streams_check_config.stream_count): + continue + + generated_streams = dynamic_stream_name_to_generated_streams.get(dynamic_streams_check_config.dynamic_stream_name) + availability_strategy = HttpAvailabilityStrategy() + + for stream in generated_streams[: min(dynamic_streams_check_config.stream_count, len(generated_streams))]: + try: + stream_is_available, reason = availability_strategy.check_availability( + stream, logger + ) + if not stream_is_available: + logger.warning(f"Stream {stream.name} is not available: {reason}") + return False, reason + except Exception as error: + error_message = ( + f"Encountered an error trying to connect to stream {stream.name}. Error: {error}" + ) + logger.error(error_message, exc_info=True) + return False, error_message + return True, None diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index b7c0d84a0..acb630bc0 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -316,7 +316,11 @@ definitions: type: object required: - type - - stream_names + anyOf: + - required: + - stream_names + - required: + - dynamic_streams_check_configs properties: type: type: string @@ -330,6 +334,28 @@ definitions: examples: - ["users"] - ["users", "contacts"] + dynamic_streams_check_configs: + type: array + items: + "$ref": "#/definitions/DynamicStreamCheckConfig" + DynamicStreamCheckConfig: + type: object + required: + - type + - dynamic_stream_name + properties: + type: + type: string + enum: [ DynamicStreamCheckConfig ] + dynamic_stream_name: + title: Dynamic Stream Name + description: The dynamic stream name. + type: string + stream_count: + title: Stream Count + description: Numbers of the streams to try reading from when running a check operation. + type: integer + default: 0 CheckDynamicStream: title: Dynamic Streams to Check description: (This component is experimental. Use at your own risk.) Defines the dynamic streams to try reading when running a check operation. diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index c43f550db..e8f11ad57 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -42,13 +42,15 @@ class BearerAuthenticator(BaseModel): parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") -class CheckStream(BaseModel): - type: Literal["CheckStream"] - stream_names: List[str] = Field( - ..., - description="Names of the streams to try reading from when running a check operation.", - examples=[["users"], ["users", "contacts"]], - title="Stream Names", +class DynamicStreamCheckConfig(BaseModel): + type: Literal["DynamicStreamCheckConfig"] + dynamic_stream_name: str = Field( + ..., description="The dynamic stream name.", title="Dynamic Stream Name" + ) + stream_count: Optional[int] = Field( + 0, + description="Numbers of the streams to try reading from when running a check operation.", + title="Stream Count", ) @@ -609,7 +611,9 @@ class OAuthAuthenticator(BaseModel): scopes: Optional[List[str]] = Field( None, description="List of scopes that should be granted to the access token.", - examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], + examples=[ + ["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"] + ], title="Scopes", ) token_expiry_date: Optional[str] = Field( @@ -1083,24 +1087,28 @@ class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( - None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", - examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, - { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], - } - }, - ], - title="OAuth user input", + oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = ( + Field( + None, + description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", + examples=[ + {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + { + "app_id": { + "type": "string", + "path_in_connector_config": ["info", "app_id"], + } + }, + ], + title="OAuth user input", + ) ) - oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field( - None, - description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', - title="DeclarativeOAuth Connector Specification", + oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = ( + Field( + None, + description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', + title="DeclarativeOAuth Connector Specification", + ) ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, @@ -1118,7 +1126,9 @@ class Config: complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], + examples=[ + {"client_id": {"type": "string"}, "client_secret": {"type": "string"}} + ], title="OAuth input specification", ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( @@ -1523,6 +1533,36 @@ class AuthFlow(BaseModel): oauth_config_specification: Optional[OAuthConfigSpecification] = None +class CheckStream1(BaseModel): + type: Literal["CheckStream"] + stream_names: List[str] = Field( + ..., + description="Names of the streams to try reading from when running a check operation.", + examples=[["users"], ["users", "contacts"]], + title="Stream Names", + ) + dynamic_streams_check_configs: Optional[List[DynamicStreamCheckConfig]] = None + + +class CheckStream2(BaseModel): + type: Literal["CheckStream"] + stream_names: Optional[List[str]] = Field( + None, + description="Names of the streams to try reading from when running a check operation.", + examples=[["users"], ["users", "contacts"]], + title="Stream Names", + ) + dynamic_streams_check_configs: List[DynamicStreamCheckConfig] + + +class CheckStream(BaseModel): + __root__: Union[CheckStream1, CheckStream2] = Field( + ..., + description="Defines the streams to try reading when running a check operation.", + title="Streams to Check", + ) + + class IncrementingCountCursor(BaseModel): type: Literal["IncrementingCountCursor"] cursor_field: str = Field( @@ -1781,7 +1821,9 @@ class RecordSelector(BaseModel): description="Responsible for filtering records to be emitted by the Source.", title="Record Filter", ) - schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field( + schema_normalization: Optional[ + Union[SchemaNormalization, CustomSchemaNormalization] + ] = Field( SchemaNormalization.None_, description="Responsible for normalization according to the schema.", title="Schema Normalization", @@ -2008,7 +2050,9 @@ class Config: description="Component used to fetch data incrementally based on a time field in the data.", title="Incremental Sync", ) - name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") + name: Optional[str] = Field( + "", description="The stream name.", example=["Users"], title="Name" + ) primary_key: Optional[PrimaryKey] = Field( "", description="The primary key of the stream.", title="Primary Key" ) @@ -2266,7 +2310,9 @@ class ParentStreamConfig(BaseModel): class StateDelegatingStream(BaseModel): type: Literal["StateDelegatingStream"] - name: str = Field(..., description="The stream name.", example=["Users"], title="Name") + name: str = Field( + ..., description="The stream name.", example=["Users"], title="Name" + ) full_refresh_stream: DeclarativeStream = Field( ..., description="Component used to coordinate how records are extracted across stream slices and request pages when the state is empty or not provided.", @@ -2355,7 +2401,9 @@ class AsyncRetriever(BaseModel): ) download_extractor: Optional[ Union[CustomRecordExtractor, DpathExtractor, ResponseToFileExtractor] - ] = Field(None, description="Responsible for fetching the records from provided urls.") + ] = Field( + None, description="Responsible for fetching the records from provided urls." + ) creation_requester: Union[CustomRequester, HttpRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.", @@ -2492,10 +2540,12 @@ class DynamicDeclarativeStream(BaseModel): stream_template: DeclarativeStream = Field( ..., description="Reference to the stream template.", title="Stream Template" ) - components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = Field( - ..., - description="Component resolve and populates stream templates with components values.", - title="Components Resolver", + components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = ( + Field( + ..., + description="Component resolve and populates stream templates with components values.", + title="Components Resolver", + ) ) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 22ef82112..232135b14 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -54,7 +54,7 @@ SessionTokenProvider, TokenProvider, ) -from airbyte_cdk.sources.declarative.checks import CheckDynamicStream, CheckStream +from airbyte_cdk.sources.declarative.checks import CheckDynamicStream, CheckStream, DynamicStreamCheckConfig from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream @@ -131,6 +131,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( CheckStream as CheckStreamModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + DynamicStreamCheckConfig as DynamicStreamCheckConfigModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ComplexFieldType as ComplexFieldTypeModel, ) @@ -559,6 +562,7 @@ def _init_mappings(self) -> None: BasicHttpAuthenticatorModel: self.create_basic_http_authenticator, BearerAuthenticatorModel: self.create_bearer_authenticator, CheckStreamModel: self.create_check_stream, + DynamicStreamCheckConfigModel: self.create_dynamic_stream_check_config, CheckDynamicStreamModel: self.create_check_dynamic_stream, CompositeErrorHandlerModel: self.create_composite_error_handler, ConcurrencyLevelModel: self.create_concurrency_level, @@ -936,8 +940,26 @@ def create_bearer_authenticator( ) @staticmethod - def create_check_stream(model: CheckStreamModel, config: Config, **kwargs: Any) -> CheckStream: - return CheckStream(stream_names=model.stream_names, parameters={}) + def create_dynamic_stream_check_config( + model: DynamicStreamCheckConfigModel, config: Config, **kwargs: Any + ) -> DynamicStreamCheckConfig: + return DynamicStreamCheckConfig( + dynamic_stream_name=model.dynamic_stream_name, + stream_count=model.stream_count or 0, + ) + + def create_check_stream(self, model: CheckStreamModel, config: Config, **kwargs: Any) -> CheckStream: + if model.dynamic_streams_check_configs is None and model.stream_names is None: + raise ValueError( + "Expected either stream_names or dynamic_streams_check_configs to be set for CheckStream" + ) + + dynamic_streams_check_configs = [ + self._create_component_from_model(model=dynamic_stream_check_config, config=config) + for dynamic_stream_check_config in model.dynamic_streams_check_configs + ] + + return CheckStream(stream_names=model.stream_names or [], dynamic_streams_check_configs=dynamic_streams_check_configs or [], parameters={}) @staticmethod def create_check_dynamic_stream( diff --git a/unit_tests/sources/declarative/checks/test_check_stream.py b/unit_tests/sources/declarative/checks/test_check_stream.py index b5367b2b0..848b1820c 100644 --- a/unit_tests/sources/declarative/checks/test_check_stream.py +++ b/unit_tests/sources/declarative/checks/test_check_stream.py @@ -2,15 +2,21 @@ # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # +import json import logging from typing import Any, Iterable, Mapping, Optional from unittest.mock import MagicMock +from copy import deepcopy import pytest import requests from airbyte_cdk.sources.declarative.checks.check_stream import CheckStream from airbyte_cdk.sources.streams.http import HttpStream +from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( + ConcurrentDeclarativeSource, +) +from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse logger = logging.getLogger("test") config = dict() @@ -24,11 +30,11 @@ [ ("test_success_check", record, stream_names, {}, (True, None)), ( - "test_success_check_stream_slice", - record, - stream_names, - {"slice": "slice_value"}, - (True, None), + "test_success_check_stream_slice", + record, + stream_names, + {"slice": "slice_value"}, + (True, None), ), ("test_fail_check", None, stream_names, {}, (True, None)), ("test_try_to_check_invalid stream", record, ["invalid_stream_name"], {}, None), @@ -36,7 +42,7 @@ ) @pytest.mark.parametrize("slices_as_list", [True, False]) def test_check_stream_with_slices_as_list( - test_name, record, streams_to_check, stream_slice, expectation, slices_as_list + test_name, record, streams_to_check, stream_slice, expectation, slices_as_list ): stream = MagicMock() stream.name = "s1" @@ -101,22 +107,22 @@ def test_check_stream_with_no_stream_slices_aborts(): "test_name, response_code, available_expectation, expected_messages", [ ( - "test_stream_unavailable_unhandled_error", - 404, - False, - ["Not found. The requested resource was not found on the server."], + "test_stream_unavailable_unhandled_error", + 404, + False, + ["Not found. The requested resource was not found on the server."], ), ( - "test_stream_unavailable_handled_error", - 403, - False, - ["Forbidden. You don't have permission to access this resource."], + "test_stream_unavailable_handled_error", + 403, + False, + ["Forbidden. You don't have permission to access this resource."], ), ("test_stream_available", 200, True, []), ], ) def test_check_http_stream_via_availability_strategy( - mocker, test_name, response_code, available_expectation, expected_messages + mocker, test_name, response_code, available_expectation, expected_messages ): class MockHttpStream(HttpStream): url_base = "https://test_base_url.com" @@ -157,3 +163,243 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp assert stream_is_available == available_expectation for message in expected_messages: assert message in reason + + +_CONFIG = {"start_date": "2024-07-01T00:00:00.000Z", "custom_streams": [ + {"id": 1, "name": "item_1"}, + {"id": 2, "name": "item_2"}, +]} + +_MANIFEST_WITHOUT_CHECK_COMPONENT = { + "version": "6.7.0", + "type": "DeclarativeSource", + "dynamic_streams": [ + { + "type": "DynamicDeclarativeStream", + "name": "http_dynamic_stream", + "stream_template": { + "type": "DeclarativeStream", + "name": "", + "primary_key": [], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "ABC": {"type": "number"}, + "AED": {"type": "number"}, + }, + "type": "object", + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "$parameters": {"item_id": ""}, + "url_base": "https://api.test.com", + "path": "/items/{{parameters['item_id']}}", + "http_method": "GET", + "authenticator": { + "type": "ApiKeyAuthenticator", + "header": "apikey", + "api_token": "{{ config['api_key'] }}", + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "paginator": {"type": "NoPagination"}, + }, + }, + "components_resolver": { + "type": "HttpComponentsResolver", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "url_base": "https://api.test.com", + "path": "items", + "http_method": "GET", + "authenticator": { + "type": "ApiKeyAuthenticator", + "header": "apikey", + "api_token": "{{ config['api_key'] }}", + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "paginator": {"type": "NoPagination"}, + }, + "components_mapping": [ + { + "type": "ComponentMappingDefinition", + "field_path": ["name"], + "value": "{{components_values['name']}}", + }, + { + "type": "ComponentMappingDefinition", + "field_path": [ + "retriever", + "requester", + "$parameters", + "item_id", + ], + "value": "{{components_values['id']}}", + }, + ], + }, + }, + { + "type": "DynamicDeclarativeStream", + "stream_template": { + "type": "DeclarativeStream", + "name": "", + "primary_key": [], + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "ABC": {"type": "number"}, + "AED": {"type": "number"}, + }, + "type": "object", + }, + }, + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "$parameters": {"item_id": ""}, + "url_base": "https://api.test.com", + "path": "/items/{{parameters['item_id']}}", + "http_method": "GET", + "authenticator": { + "type": "ApiKeyAuthenticator", + "header": "apikey", + "api_token": "{{ config['api_key'] }}", + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "paginator": {"type": "NoPagination"}, + }, + }, + "components_resolver": { + "type": "ConfigComponentsResolver", + "stream_config": { + "type": "StreamConfig", + "configs_pointer": ["custom_streams"], + }, + "components_mapping": [ + { + "type": "ComponentMappingDefinition", + "field_path": ["name"], + "value": "{{components_values['name']}}", + }, + { + "type": "ComponentMappingDefinition", + "field_path": [ + "retriever", + "requester", + "$parameters", + "item_id", + ], + "value": "{{components_values['id']}}", + }, + ], + }, + } + ], + "streams": [ + { + "type": "DeclarativeStream", + "retriever": { + "type": "SimpleRetriever", + "requester": { + "type": "HttpRequester", + "$parameters": {"item_id": ""}, + "url_base": "https://api.test.com", + "path": "/static", + "http_method": "GET", + "authenticator": { + "type": "ApiKeyAuthenticator", + "header": "apikey", + "api_token": "{{ config['api_key'] }}", + }, + }, + "record_selector": { + "type": "RecordSelector", + "extractor": {"type": "DpathExtractor", "field_path": []}, + }, + "paginator": {"type": "NoPagination"}, + }, + "name": "static_stream", + "primary_key": "id", + } + ] +} + + +@pytest.mark.parametrize( + "check_component", + [ + pytest.param({"check": {"type": "CheckStream", "stream_names": ["static_stream"]}}, + id="test_check_only_static_streams"), + pytest.param({"check": {"type": "CheckStream", "stream_names": ["static_stream"], + "dynamic_streams_check_configs": [ + {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "http_dynamic_stream", + "stream_count": 2}]}}, id="test_check_static_streams_and_http_dynamic_stream"), + pytest.param({"check": {"type": "CheckStream", "stream_names": ["static_stream"], + "dynamic_streams_check_configs": [ + {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "http_dynamic_stream", + "stream_count": 2}]}}, id="test_check_static_streams_and_config_dynamic_stream"), + pytest.param({"check": {"type": "CheckStream", "dynamic_streams_check_configs": [ + {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "http_dynamic_stream", "stream_count": 2}]}}, + id="test_check_http_dynamic_stream_and_config_dynamic_stream"), + pytest.param({"check": {"type": "CheckStream", "stream_names": ["static_stream"], + "dynamic_streams_check_configs": [ + {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "http_dynamic_stream", + "stream_count": 2}]}}, + id="test_check_static_streams_and_http_dynamic_stream_and_config_dynamic_stream"), + ], +) +def test_check_stream(check_component): + manifest = {**deepcopy(_MANIFEST_WITHOUT_CHECK_COMPONENT), **check_component} + + with HttpMocker() as http_mocker: + static_stream_request = HttpRequest(url="https://api.test.com/static") + static_stream_response = HttpResponse( + body=json.dumps([{"id": 1, "name": "static_1"}, {"id": 2, "name": "static_2"}]) + ) + http_mocker.get(static_stream_request, static_stream_response) + + items_request = HttpRequest(url="https://api.test.com/items") + items_response = HttpResponse( + body=json.dumps([{"id": 1, "name": "item_1"}, {"id": 2, "name": "item_2"}]) + ) + http_mocker.get(items_request, items_response) + + item_request = HttpRequest(url="https://api.test.com/items/1") + item_response = HttpResponse(body=json.dumps([]), status_code=200) + item_request_count = 1 + http_mocker.get(item_request, item_response) + + source = ConcurrentDeclarativeSource( + source_config=manifest, + config=_CONFIG, + catalog=None, + state=None, + ) + + stream_is_available, reason = source.check_connection(logger, _CONFIG) + + http_mocker.assert_number_of_calls(item_request, item_request_count) + + assert stream_is_available From 739c4295c2c893d9ec25954baac0468c8e76fc3d Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Fri, 28 Mar 2025 00:58:15 +0100 Subject: [PATCH 2/7] Add unit tests --- .../declarative/checks/check_stream.py | 11 ++--- .../declarative_component_schema.yaml | 5 -- .../models/declarative_component_schema.py | 23 +-------- .../parsers/model_to_component_factory.py | 4 +- .../declarative/checks/test_check_stream.py | 49 ++++++++++++++----- 5 files changed, 46 insertions(+), 46 deletions(-) diff --git a/airbyte_cdk/sources/declarative/checks/check_stream.py b/airbyte_cdk/sources/declarative/checks/check_stream.py index 49c1176ad..135c3de7b 100644 --- a/airbyte_cdk/sources/declarative/checks/check_stream.py +++ b/airbyte_cdk/sources/declarative/checks/check_stream.py @@ -8,7 +8,6 @@ from typing import Any, List, Mapping, Tuple from airbyte_cdk import AbstractSource -from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy @@ -74,15 +73,14 @@ def check_connection( ) return False, f"Unable to connect to stream {stream_name} - {error}" - if isinstance(source, ManifestDeclarativeSource) and self.dynamic_streams_check_configs: - dynamic_stream_name_to_dynamic_stream = {dynamic_stream["name"]: dynamic_stream for dynamic_stream in - source.resolved_manifest.get("dynamic_streams", [])} + if hasattr(source, "resolved_manifest") and hasattr(source, "dynamic_streams") and self.dynamic_streams_check_configs: + dynamic_stream_name_to_dynamic_stream = {dynamic_stream.get("name", f"dynamic_stream_{i}"): dynamic_stream for i, dynamic_stream in enumerate(source.resolved_manifest.get("dynamic_streams", []))} dynamic_stream_name_to_generated_streams = {} for stream in source.dynamic_streams: dynamic_stream_name_to_generated_streams[ stream["dynamic_stream_name"]] = dynamic_stream_name_to_generated_streams.setdefault( - stream["dynamic_stream_name"], []).append(stream) + stream["dynamic_stream_name"], []) + [stream] for dynamic_streams_check_config in self.dynamic_streams_check_configs: dynamic_stream = dynamic_stream_name_to_dynamic_stream.get(dynamic_streams_check_config.dynamic_stream_name) @@ -95,7 +93,8 @@ def check_connection( generated_streams = dynamic_stream_name_to_generated_streams.get(dynamic_streams_check_config.dynamic_stream_name) availability_strategy = HttpAvailabilityStrategy() - for stream in generated_streams[: min(dynamic_streams_check_config.stream_count, len(generated_streams))]: + for declarative_stream in generated_streams[: min(dynamic_streams_check_config.stream_count, len(generated_streams))]: + stream = stream_name_to_stream.get(declarative_stream["name"]) try: stream_is_available, reason = availability_strategy.check_availability( stream, logger diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 3be9f46e7..43aec54f3 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -316,11 +316,6 @@ definitions: type: object required: - type - anyOf: - - required: - - stream_names - - required: - - dynamic_streams_check_configs properties: type: type: string diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 483ce597a..8043ce6a6 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -1533,18 +1533,7 @@ class AuthFlow(BaseModel): oauth_config_specification: Optional[OAuthConfigSpecification] = None -class CheckStream1(BaseModel): - type: Literal["CheckStream"] - stream_names: List[str] = Field( - ..., - description="Names of the streams to try reading from when running a check operation.", - examples=[["users"], ["users", "contacts"]], - title="Stream Names", - ) - dynamic_streams_check_configs: Optional[List[DynamicStreamCheckConfig]] = None - - -class CheckStream2(BaseModel): +class CheckStream(BaseModel): type: Literal["CheckStream"] stream_names: Optional[List[str]] = Field( None, @@ -1552,15 +1541,7 @@ class CheckStream2(BaseModel): examples=[["users"], ["users", "contacts"]], title="Stream Names", ) - dynamic_streams_check_configs: List[DynamicStreamCheckConfig] - - -class CheckStream(BaseModel): - __root__: Union[CheckStream1, CheckStream2] = Field( - ..., - description="Defines the streams to try reading when running a check operation.", - title="Streams to Check", - ) + dynamic_streams_check_configs: Optional[List[DynamicStreamCheckConfig]] = None class IncrementingCountCursor(BaseModel): diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 232135b14..c1bdcd955 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -957,9 +957,9 @@ def create_check_stream(self, model: CheckStreamModel, config: Config, **kwargs: dynamic_streams_check_configs = [ self._create_component_from_model(model=dynamic_stream_check_config, config=config) for dynamic_stream_check_config in model.dynamic_streams_check_configs - ] + ] if model.dynamic_streams_check_configs else [] - return CheckStream(stream_names=model.stream_names or [], dynamic_streams_check_configs=dynamic_streams_check_configs or [], parameters={}) + return CheckStream(stream_names=model.stream_names or [], dynamic_streams_check_configs=dynamic_streams_check_configs, parameters={}) @staticmethod def create_check_dynamic_stream( diff --git a/unit_tests/sources/declarative/checks/test_check_stream.py b/unit_tests/sources/declarative/checks/test_check_stream.py index 848b1820c..4f0a42c52 100644 --- a/unit_tests/sources/declarative/checks/test_check_stream.py +++ b/unit_tests/sources/declarative/checks/test_check_stream.py @@ -166,8 +166,8 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp _CONFIG = {"start_date": "2024-07-01T00:00:00.000Z", "custom_streams": [ - {"id": 1, "name": "item_1"}, - {"id": 2, "name": "item_2"}, + {"id": 3, "name": "item_3"}, + {"id": 4, "name": "item_4"}, ]} _MANIFEST_WITHOUT_CHECK_COMPONENT = { @@ -342,6 +342,17 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp }, "name": "static_stream", "primary_key": "id", + "schema_loader": { + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "id": {"type": "integer"}, + "name": {"type": "string"}, + }, + "type": "object", + }, + } } ] } @@ -355,18 +366,19 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp pytest.param({"check": {"type": "CheckStream", "stream_names": ["static_stream"], "dynamic_streams_check_configs": [ {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "http_dynamic_stream", - "stream_count": 2}]}}, id="test_check_static_streams_and_http_dynamic_stream"), + "stream_count": 1}]}}, id="test_check_static_streams_and_http_dynamic_stream"), pytest.param({"check": {"type": "CheckStream", "stream_names": ["static_stream"], "dynamic_streams_check_configs": [ - {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "http_dynamic_stream", - "stream_count": 2}]}}, id="test_check_static_streams_and_config_dynamic_stream"), - pytest.param({"check": {"type": "CheckStream", "dynamic_streams_check_configs": [ - {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "http_dynamic_stream", "stream_count": 2}]}}, + {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "dynamic_stream_1", + "stream_count": 1}]}}, id="test_check_static_streams_and_config_dynamic_stream"), + pytest.param({"check": {"type": "CheckStream", "dynamic_streams_check_configs": [{"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "dynamic_stream_1", + "stream_count": 1}, {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "http_dynamic_stream"}]}}, id="test_check_http_dynamic_stream_and_config_dynamic_stream"), pytest.param({"check": {"type": "CheckStream", "stream_names": ["static_stream"], "dynamic_streams_check_configs": [ - {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "http_dynamic_stream", - "stream_count": 2}]}}, + {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "dynamic_stream_1", + "stream_count": 1}, + {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "http_dynamic_stream"}]}}, id="test_check_static_streams_and_http_dynamic_stream_and_config_dynamic_stream"), ], ) @@ -388,7 +400,10 @@ def test_check_stream(check_component): item_request = HttpRequest(url="https://api.test.com/items/1") item_response = HttpResponse(body=json.dumps([]), status_code=200) - item_request_count = 1 + http_mocker.get(item_request, item_response) + + item_request = HttpRequest(url="https://api.test.com/items/3") + item_response = HttpResponse(body=json.dumps([]), status_code=200) http_mocker.get(item_request, item_response) source = ConcurrentDeclarativeSource( @@ -400,6 +415,16 @@ def test_check_stream(check_component): stream_is_available, reason = source.check_connection(logger, _CONFIG) - http_mocker.assert_number_of_calls(item_request, item_request_count) - assert stream_is_available + + +def test_check_stream_only_type_provided(): + manifest = {**deepcopy(_MANIFEST_WITHOUT_CHECK_COMPONENT), **{"check": {"type": "CheckStream"}}} + source = ConcurrentDeclarativeSource( + source_config=manifest, + config=_CONFIG, + catalog=None, + state=None, + ) + with pytest.raises(ValueError): + source.check_connection(logger, _CONFIG) From 650165f62395f5206afd66452e61c89998ad0a28 Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Fri, 28 Mar 2025 00:03:05 +0000 Subject: [PATCH 3/7] Auto-fix lint and format issues --- .../sources/declarative/checks/__init__.py | 5 +- .../declarative/checks/check_stream.py | 46 +++-- .../models/declarative_component_schema.py | 72 +++----- .../parsers/model_to_component_factory.py | 36 ++-- .../declarative/checks/test_check_stream.py | 161 ++++++++++++------ 5 files changed, 197 insertions(+), 123 deletions(-) diff --git a/airbyte_cdk/sources/declarative/checks/__init__.py b/airbyte_cdk/sources/declarative/checks/__init__.py index fb3aece0e..87bcaa24d 100644 --- a/airbyte_cdk/sources/declarative/checks/__init__.py +++ b/airbyte_cdk/sources/declarative/checks/__init__.py @@ -7,7 +7,10 @@ from pydantic.v1 import BaseModel from airbyte_cdk.sources.declarative.checks.check_dynamic_stream import CheckDynamicStream -from airbyte_cdk.sources.declarative.checks.check_stream import CheckStream, DynamicStreamCheckConfig +from airbyte_cdk.sources.declarative.checks.check_stream import ( + CheckStream, + DynamicStreamCheckConfig, +) from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker from airbyte_cdk.sources.declarative.models import ( CheckDynamicStream as CheckDynamicStreamModel, diff --git a/airbyte_cdk/sources/declarative/checks/check_stream.py b/airbyte_cdk/sources/declarative/checks/check_stream.py index 135c3de7b..fcd5d0c59 100644 --- a/airbyte_cdk/sources/declarative/checks/check_stream.py +++ b/airbyte_cdk/sources/declarative/checks/check_stream.py @@ -44,9 +44,7 @@ def check_connection( try: streams = source.streams(config=config) except Exception as error: - error_message = ( - f"Encountered an error trying to connect to streams. Error: {error}" - ) + error_message = f"Encountered an error trying to connect to streams. Error: {error}" logger.error(error_message, exc_info=True) return False, error_message @@ -73,27 +71,47 @@ def check_connection( ) return False, f"Unable to connect to stream {stream_name} - {error}" - if hasattr(source, "resolved_manifest") and hasattr(source, "dynamic_streams") and self.dynamic_streams_check_configs: - dynamic_stream_name_to_dynamic_stream = {dynamic_stream.get("name", f"dynamic_stream_{i}"): dynamic_stream for i, dynamic_stream in enumerate(source.resolved_manifest.get("dynamic_streams", []))} + if ( + hasattr(source, "resolved_manifest") + and hasattr(source, "dynamic_streams") + and self.dynamic_streams_check_configs + ): + dynamic_stream_name_to_dynamic_stream = { + dynamic_stream.get("name", f"dynamic_stream_{i}"): dynamic_stream + for i, dynamic_stream in enumerate( + source.resolved_manifest.get("dynamic_streams", []) + ) + } dynamic_stream_name_to_generated_streams = {} for stream in source.dynamic_streams: - dynamic_stream_name_to_generated_streams[ - stream["dynamic_stream_name"]] = dynamic_stream_name_to_generated_streams.setdefault( - stream["dynamic_stream_name"], []) + [stream] + dynamic_stream_name_to_generated_streams[stream["dynamic_stream_name"]] = ( + dynamic_stream_name_to_generated_streams.setdefault( + stream["dynamic_stream_name"], [] + ) + + [stream] + ) for dynamic_streams_check_config in self.dynamic_streams_check_configs: - dynamic_stream = dynamic_stream_name_to_dynamic_stream.get(dynamic_streams_check_config.dynamic_stream_name) + dynamic_stream = dynamic_stream_name_to_dynamic_stream.get( + dynamic_streams_check_config.dynamic_stream_name + ) - is_config_depend = dynamic_stream["components_resolver"]["type"] == "ConfigComponentsResolver" + is_config_depend = ( + dynamic_stream["components_resolver"]["type"] == "ConfigComponentsResolver" + ) if not is_config_depend and not bool(dynamic_streams_check_config.stream_count): continue - generated_streams = dynamic_stream_name_to_generated_streams.get(dynamic_streams_check_config.dynamic_stream_name) + generated_streams = dynamic_stream_name_to_generated_streams.get( + dynamic_streams_check_config.dynamic_stream_name + ) availability_strategy = HttpAvailabilityStrategy() - for declarative_stream in generated_streams[: min(dynamic_streams_check_config.stream_count, len(generated_streams))]: + for declarative_stream in generated_streams[ + : min(dynamic_streams_check_config.stream_count, len(generated_streams)) + ]: stream = stream_name_to_stream.get(declarative_stream["name"]) try: stream_is_available, reason = availability_strategy.check_availability( @@ -103,9 +121,7 @@ def check_connection( logger.warning(f"Stream {stream.name} is not available: {reason}") return False, reason except Exception as error: - error_message = ( - f"Encountered an error trying to connect to stream {stream.name}. Error: {error}" - ) + error_message = f"Encountered an error trying to connect to stream {stream.name}. Error: {error}" logger.error(error_message, exc_info=True) return False, error_message diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index 8043ce6a6..c67cd958b 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -611,9 +611,7 @@ class OAuthAuthenticator(BaseModel): scopes: Optional[List[str]] = Field( None, description="List of scopes that should be granted to the access token.", - examples=[ - ["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"] - ], + examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]], title="Scopes", ) token_expiry_date: Optional[str] = Field( @@ -1087,28 +1085,24 @@ class OAuthConfigSpecification(BaseModel): class Config: extra = Extra.allow - oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = ( - Field( - None, - description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", - examples=[ - {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, - { - "app_id": { - "type": "string", - "path_in_connector_config": ["info", "app_id"], - } - }, - ], - title="OAuth user input", - ) + oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field( + None, + description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }", + examples=[ + {"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}}, + { + "app_id": { + "type": "string", + "path_in_connector_config": ["info", "app_id"], + } + }, + ], + title="OAuth user input", ) - oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = ( - Field( - None, - description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', - title="DeclarativeOAuth Connector Specification", - ) + oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field( + None, + description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }', + title="DeclarativeOAuth Connector Specification", ) complete_oauth_output_specification: Optional[Dict[str, Any]] = Field( None, @@ -1126,9 +1120,7 @@ class Config: complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field( None, description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }", - examples=[ - {"client_id": {"type": "string"}, "client_secret": {"type": "string"}} - ], + examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}], title="OAuth input specification", ) complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field( @@ -1802,9 +1794,7 @@ class RecordSelector(BaseModel): description="Responsible for filtering records to be emitted by the Source.", title="Record Filter", ) - schema_normalization: Optional[ - Union[SchemaNormalization, CustomSchemaNormalization] - ] = Field( + schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field( SchemaNormalization.None_, description="Responsible for normalization according to the schema.", title="Schema Normalization", @@ -2031,9 +2021,7 @@ class Config: description="Component used to fetch data incrementally based on a time field in the data.", title="Incremental Sync", ) - name: Optional[str] = Field( - "", description="The stream name.", example=["Users"], title="Name" - ) + name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name") primary_key: Optional[PrimaryKey] = Field( "", description="The primary key of the stream.", title="Primary Key" ) @@ -2291,9 +2279,7 @@ class ParentStreamConfig(BaseModel): class StateDelegatingStream(BaseModel): type: Literal["StateDelegatingStream"] - name: str = Field( - ..., description="The stream name.", example=["Users"], title="Name" - ) + name: str = Field(..., description="The stream name.", example=["Users"], title="Name") full_refresh_stream: DeclarativeStream = Field( ..., description="Component used to coordinate how records are extracted across stream slices and request pages when the state is empty or not provided.", @@ -2382,9 +2368,7 @@ class AsyncRetriever(BaseModel): ) download_extractor: Optional[ Union[CustomRecordExtractor, DpathExtractor, ResponseToFileExtractor] - ] = Field( - None, description="Responsible for fetching the records from provided urls." - ) + ] = Field(None, description="Responsible for fetching the records from provided urls.") creation_requester: Union[CustomRequester, HttpRequester] = Field( ..., description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.", @@ -2524,12 +2508,10 @@ class DynamicDeclarativeStream(BaseModel): stream_template: DeclarativeStream = Field( ..., description="Reference to the stream template.", title="Stream Template" ) - components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = ( - Field( - ..., - description="Component resolve and populates stream templates with components values.", - title="Components Resolver", - ) + components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = Field( + ..., + description="Component resolve and populates stream templates with components values.", + title="Components Resolver", ) diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index c1bdcd955..4f4638190 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -54,7 +54,11 @@ SessionTokenProvider, TokenProvider, ) -from airbyte_cdk.sources.declarative.checks import CheckDynamicStream, CheckStream, DynamicStreamCheckConfig +from airbyte_cdk.sources.declarative.checks import ( + CheckDynamicStream, + CheckStream, + DynamicStreamCheckConfig, +) from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream @@ -131,9 +135,6 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( CheckStream as CheckStreamModel, ) -from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - DynamicStreamCheckConfig as DynamicStreamCheckConfigModel, -) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ComplexFieldType as ComplexFieldTypeModel, ) @@ -221,6 +222,9 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( DynamicSchemaLoader as DynamicSchemaLoaderModel, ) +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + DynamicStreamCheckConfig as DynamicStreamCheckConfigModel, +) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ExponentialBackoffStrategy as ExponentialBackoffStrategyModel, ) @@ -941,25 +945,35 @@ def create_bearer_authenticator( @staticmethod def create_dynamic_stream_check_config( - model: DynamicStreamCheckConfigModel, config: Config, **kwargs: Any + model: DynamicStreamCheckConfigModel, config: Config, **kwargs: Any ) -> DynamicStreamCheckConfig: return DynamicStreamCheckConfig( dynamic_stream_name=model.dynamic_stream_name, stream_count=model.stream_count or 0, ) - def create_check_stream(self, model: CheckStreamModel, config: Config, **kwargs: Any) -> CheckStream: + def create_check_stream( + self, model: CheckStreamModel, config: Config, **kwargs: Any + ) -> CheckStream: if model.dynamic_streams_check_configs is None and model.stream_names is None: raise ValueError( "Expected either stream_names or dynamic_streams_check_configs to be set for CheckStream" ) - dynamic_streams_check_configs = [ - self._create_component_from_model(model=dynamic_stream_check_config, config=config) - for dynamic_stream_check_config in model.dynamic_streams_check_configs - ] if model.dynamic_streams_check_configs else [] + dynamic_streams_check_configs = ( + [ + self._create_component_from_model(model=dynamic_stream_check_config, config=config) + for dynamic_stream_check_config in model.dynamic_streams_check_configs + ] + if model.dynamic_streams_check_configs + else [] + ) - return CheckStream(stream_names=model.stream_names or [], dynamic_streams_check_configs=dynamic_streams_check_configs, parameters={}) + return CheckStream( + stream_names=model.stream_names or [], + dynamic_streams_check_configs=dynamic_streams_check_configs, + parameters={}, + ) @staticmethod def create_check_dynamic_stream( diff --git a/unit_tests/sources/declarative/checks/test_check_stream.py b/unit_tests/sources/declarative/checks/test_check_stream.py index 4f0a42c52..523fdec42 100644 --- a/unit_tests/sources/declarative/checks/test_check_stream.py +++ b/unit_tests/sources/declarative/checks/test_check_stream.py @@ -4,18 +4,18 @@ import json import logging +from copy import deepcopy from typing import Any, Iterable, Mapping, Optional from unittest.mock import MagicMock -from copy import deepcopy import pytest import requests from airbyte_cdk.sources.declarative.checks.check_stream import CheckStream -from airbyte_cdk.sources.streams.http import HttpStream from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( ConcurrentDeclarativeSource, ) +from airbyte_cdk.sources.streams.http import HttpStream from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse logger = logging.getLogger("test") @@ -30,11 +30,11 @@ [ ("test_success_check", record, stream_names, {}, (True, None)), ( - "test_success_check_stream_slice", - record, - stream_names, - {"slice": "slice_value"}, - (True, None), + "test_success_check_stream_slice", + record, + stream_names, + {"slice": "slice_value"}, + (True, None), ), ("test_fail_check", None, stream_names, {}, (True, None)), ("test_try_to_check_invalid stream", record, ["invalid_stream_name"], {}, None), @@ -42,7 +42,7 @@ ) @pytest.mark.parametrize("slices_as_list", [True, False]) def test_check_stream_with_slices_as_list( - test_name, record, streams_to_check, stream_slice, expectation, slices_as_list + test_name, record, streams_to_check, stream_slice, expectation, slices_as_list ): stream = MagicMock() stream.name = "s1" @@ -107,22 +107,22 @@ def test_check_stream_with_no_stream_slices_aborts(): "test_name, response_code, available_expectation, expected_messages", [ ( - "test_stream_unavailable_unhandled_error", - 404, - False, - ["Not found. The requested resource was not found on the server."], + "test_stream_unavailable_unhandled_error", + 404, + False, + ["Not found. The requested resource was not found on the server."], ), ( - "test_stream_unavailable_handled_error", - 403, - False, - ["Forbidden. You don't have permission to access this resource."], + "test_stream_unavailable_handled_error", + 403, + False, + ["Forbidden. You don't have permission to access this resource."], ), ("test_stream_available", 200, True, []), ], ) def test_check_http_stream_via_availability_strategy( - mocker, test_name, response_code, available_expectation, expected_messages + mocker, test_name, response_code, available_expectation, expected_messages ): class MockHttpStream(HttpStream): url_base = "https://test_base_url.com" @@ -165,10 +165,13 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp assert message in reason -_CONFIG = {"start_date": "2024-07-01T00:00:00.000Z", "custom_streams": [ - {"id": 3, "name": "item_3"}, - {"id": 4, "name": "item_4"}, -]} +_CONFIG = { + "start_date": "2024-07-01T00:00:00.000Z", + "custom_streams": [ + {"id": 3, "name": "item_3"}, + {"id": 4, "name": "item_4"}, + ], +} _MANIFEST_WITHOUT_CHECK_COMPONENT = { "version": "6.7.0", @@ -315,7 +318,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp }, ], }, - } + }, ], "streams": [ { @@ -343,43 +346,99 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp "name": "static_stream", "primary_key": "id", "schema_loader": { - "type": "InlineSchemaLoader", - "schema": { - "$schema": "http://json-schema.org/schema#", - "properties": { - "id": {"type": "integer"}, - "name": {"type": "string"}, - }, - "type": "object", + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "id": {"type": "integer"}, + "name": {"type": "string"}, }, - } + "type": "object", + }, + }, } - ] + ], } @pytest.mark.parametrize( "check_component", [ - pytest.param({"check": {"type": "CheckStream", "stream_names": ["static_stream"]}}, - id="test_check_only_static_streams"), - pytest.param({"check": {"type": "CheckStream", "stream_names": ["static_stream"], - "dynamic_streams_check_configs": [ - {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "http_dynamic_stream", - "stream_count": 1}]}}, id="test_check_static_streams_and_http_dynamic_stream"), - pytest.param({"check": {"type": "CheckStream", "stream_names": ["static_stream"], - "dynamic_streams_check_configs": [ - {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "dynamic_stream_1", - "stream_count": 1}]}}, id="test_check_static_streams_and_config_dynamic_stream"), - pytest.param({"check": {"type": "CheckStream", "dynamic_streams_check_configs": [{"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "dynamic_stream_1", - "stream_count": 1}, {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "http_dynamic_stream"}]}}, - id="test_check_http_dynamic_stream_and_config_dynamic_stream"), - pytest.param({"check": {"type": "CheckStream", "stream_names": ["static_stream"], - "dynamic_streams_check_configs": [ - {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "dynamic_stream_1", - "stream_count": 1}, - {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "http_dynamic_stream"}]}}, - id="test_check_static_streams_and_http_dynamic_stream_and_config_dynamic_stream"), + pytest.param( + {"check": {"type": "CheckStream", "stream_names": ["static_stream"]}}, + id="test_check_only_static_streams", + ), + pytest.param( + { + "check": { + "type": "CheckStream", + "stream_names": ["static_stream"], + "dynamic_streams_check_configs": [ + { + "type": "DynamicStreamCheckConfig", + "dynamic_stream_name": "http_dynamic_stream", + "stream_count": 1, + } + ], + } + }, + id="test_check_static_streams_and_http_dynamic_stream", + ), + pytest.param( + { + "check": { + "type": "CheckStream", + "stream_names": ["static_stream"], + "dynamic_streams_check_configs": [ + { + "type": "DynamicStreamCheckConfig", + "dynamic_stream_name": "dynamic_stream_1", + "stream_count": 1, + } + ], + } + }, + id="test_check_static_streams_and_config_dynamic_stream", + ), + pytest.param( + { + "check": { + "type": "CheckStream", + "dynamic_streams_check_configs": [ + { + "type": "DynamicStreamCheckConfig", + "dynamic_stream_name": "dynamic_stream_1", + "stream_count": 1, + }, + { + "type": "DynamicStreamCheckConfig", + "dynamic_stream_name": "http_dynamic_stream", + }, + ], + } + }, + id="test_check_http_dynamic_stream_and_config_dynamic_stream", + ), + pytest.param( + { + "check": { + "type": "CheckStream", + "stream_names": ["static_stream"], + "dynamic_streams_check_configs": [ + { + "type": "DynamicStreamCheckConfig", + "dynamic_stream_name": "dynamic_stream_1", + "stream_count": 1, + }, + { + "type": "DynamicStreamCheckConfig", + "dynamic_stream_name": "http_dynamic_stream", + }, + ], + } + }, + id="test_check_static_streams_and_http_dynamic_stream_and_config_dynamic_stream", + ), ], ) def test_check_stream(check_component): From 706ba21288a37fbc50027c0dc44e77453f10dd0d Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Fri, 28 Mar 2025 17:28:35 +0100 Subject: [PATCH 4/7] Fix mypy and unittest --- .../declarative/checks/check_stream.py | 140 ++++++++++-------- .../declarative/checks/test_check_stream.py | 110 +++++++++++--- 2 files changed, 169 insertions(+), 81 deletions(-) diff --git a/airbyte_cdk/sources/declarative/checks/check_stream.py b/airbyte_cdk/sources/declarative/checks/check_stream.py index 135c3de7b..ec499309b 100644 --- a/airbyte_cdk/sources/declarative/checks/check_stream.py +++ b/airbyte_cdk/sources/declarative/checks/check_stream.py @@ -5,7 +5,7 @@ import logging import traceback from dataclasses import InitVar, dataclass -from typing import Any, List, Mapping, Tuple +from typing import Any, List, Mapping, Tuple, Dict, Optional from airbyte_cdk import AbstractSource from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker @@ -32,81 +32,105 @@ class CheckStream(ConnectionChecker): """ stream_names: List[str] - dynamic_streams_check_configs: List[DynamicStreamCheckConfig] parameters: InitVar[Mapping[str, Any]] + dynamic_streams_check_configs: Optional[List[DynamicStreamCheckConfig]] = None def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._parameters = parameters + if self.dynamic_streams_check_configs is None: + self.dynamic_streams_check_configs = [] + + def _log_error(self, logger: logging.Logger, action: str, error: Exception) -> Tuple[bool, str]: + """Logs an error and returns a formatted error message.""" + error_message = f"Encountered an error while {action}. Error: {error}" + logger.error(error_message + f"Error traceback: \n {traceback.format_exc()}", exc_info=True) + return False, error_message def check_connection( - self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any] + self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any] ) -> Tuple[bool, Any]: + """Checks the connection to the source and its streams.""" try: streams = source.streams(config=config) + if not streams: + return False, f"No streams to connect to from source {source}" except Exception as error: - error_message = ( - f"Encountered an error trying to connect to streams. Error: {error}" - ) - logger.error(error_message, exc_info=True) - return False, error_message + return self._log_error(logger, "connecting to streams", error) stream_name_to_stream = {s.name: s for s in streams} - if len(streams) == 0: - return False, f"No streams to connect to from source {source}" for stream_name in self.stream_names: - if stream_name not in stream_name_to_stream.keys(): + if stream_name not in stream_name_to_stream: raise ValueError( - f"{stream_name} is not part of the catalog. Expected one of {stream_name_to_stream.keys()}." + f"{stream_name} is not part of the catalog. Expected one of {list(stream_name_to_stream.keys())}." ) + stream_availability, message = self._check_stream_availability(stream_name_to_stream, stream_name, logger) + if not stream_availability: + return stream_availability, message + + should_check_dynamic_streams = hasattr(source, "resolved_manifest") and hasattr(source, "dynamic_streams") and self.dynamic_streams_check_configs + + if should_check_dynamic_streams: + return self._check_dynamic_streams_availability(source, stream_name_to_stream, logger) + + return True, None + + def _check_stream_availability(self, stream_name_to_stream: Dict[str, Any], stream_name: str, logger: logging.Logger) -> Tuple[bool, Any]: + """Checks if streams are available.""" + availability_strategy = HttpAvailabilityStrategy() + try: stream = stream_name_to_stream[stream_name] - availability_strategy = HttpAvailabilityStrategy() + stream_is_available, reason = availability_strategy.check_availability(stream, logger) + if not stream_is_available: + message = f"Stream {stream_name} is not available: {reason}" + logger.warning(message) + return stream_is_available, message + except Exception as error: + return self._log_error(logger, f"checking availability of stream {stream_name}", error) + return True, None + + def _check_dynamic_streams_availability( + self, source: AbstractSource, stream_name_to_stream: Dict[str, Any], logger: logging.Logger + ) -> Tuple[bool, Any]: + """Checks the availability of dynamic streams.""" + dynamic_streams = source.resolved_manifest.get("dynamic_streams", []) # type: ignore[attr-defined] # The source's resolved_manifest manifest is checked before calling this method + dynamic_stream_name_to_dynamic_stream = { + ds.get("name", f"dynamic_stream_{i}"): ds for i, ds in enumerate(dynamic_streams) + } + generated_streams = self._map_generated_streams(source.dynamic_streams) # type: ignore[attr-defined] # The source's dynamic_streams manifest is checked before calling this method + + for check_config in self.dynamic_streams_check_configs: # type: ignore[union-attr] # None value for self.dynamic_streams_check_configs handled in __post_init__ + if check_config.dynamic_stream_name not in dynamic_stream_name_to_dynamic_stream: + return False, f"Dynamic stream {check_config.dynamic_stream_name} is not found in manifest." + + generated = generated_streams.get(check_config.dynamic_stream_name, []) + stream_availability, message = self._check_generated_streams_availability(generated, stream_name_to_stream, logger, check_config.stream_count) + if not stream_availability: + return stream_availability, message + + return True, None + + def _map_generated_streams(self, dynamic_streams: List[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]: + """Maps dynamic stream names to their corresponding generated streams.""" + mapped_streams: Dict[str, List[Dict[str, Any]]] = {} + for stream in dynamic_streams: + mapped_streams.setdefault(stream["dynamic_stream_name"], []).append(stream) + return mapped_streams + + def _check_generated_streams_availability( + self, generated_streams: List[Dict[str, Any]], stream_name_to_stream: Dict[str, Any], + logger: logging.Logger, max_count: int + ) -> Tuple[bool, Any]: + """Checks availability of generated dynamic streams.""" + availability_strategy = HttpAvailabilityStrategy() + for declarative_stream in generated_streams[: min(max_count, len(generated_streams))]: + stream = stream_name_to_stream[declarative_stream["name"]] try: - stream_is_available, reason = availability_strategy.check_availability( - stream, logger - ) + stream_is_available, reason = availability_strategy.check_availability(stream, logger) if not stream_is_available: - return False, reason + message = f"Dynamic Stream {stream.name} is not available: {reason}" + logger.warning(message) + return False, message except Exception as error: - logger.error( - f"Encountered an error trying to connect to stream {stream_name}. Error: \n {traceback.format_exc()}" - ) - return False, f"Unable to connect to stream {stream_name} - {error}" - - if hasattr(source, "resolved_manifest") and hasattr(source, "dynamic_streams") and self.dynamic_streams_check_configs: - dynamic_stream_name_to_dynamic_stream = {dynamic_stream.get("name", f"dynamic_stream_{i}"): dynamic_stream for i, dynamic_stream in enumerate(source.resolved_manifest.get("dynamic_streams", []))} - - dynamic_stream_name_to_generated_streams = {} - for stream in source.dynamic_streams: - dynamic_stream_name_to_generated_streams[ - stream["dynamic_stream_name"]] = dynamic_stream_name_to_generated_streams.setdefault( - stream["dynamic_stream_name"], []) + [stream] - - for dynamic_streams_check_config in self.dynamic_streams_check_configs: - dynamic_stream = dynamic_stream_name_to_dynamic_stream.get(dynamic_streams_check_config.dynamic_stream_name) - - is_config_depend = dynamic_stream["components_resolver"]["type"] == "ConfigComponentsResolver" - - if not is_config_depend and not bool(dynamic_streams_check_config.stream_count): - continue - - generated_streams = dynamic_stream_name_to_generated_streams.get(dynamic_streams_check_config.dynamic_stream_name) - availability_strategy = HttpAvailabilityStrategy() - - for declarative_stream in generated_streams[: min(dynamic_streams_check_config.stream_count, len(generated_streams))]: - stream = stream_name_to_stream.get(declarative_stream["name"]) - try: - stream_is_available, reason = availability_strategy.check_availability( - stream, logger - ) - if not stream_is_available: - logger.warning(f"Stream {stream.name} is not available: {reason}") - return False, reason - except Exception as error: - error_message = ( - f"Encountered an error trying to connect to stream {stream.name}. Error: {error}" - ) - logger.error(error_message, exc_info=True) - return False, error_message - + return self._log_error(logger, f"checking availability of dynamic stream {stream.name}", error) return True, None diff --git a/unit_tests/sources/declarative/checks/test_check_stream.py b/unit_tests/sources/declarative/checks/test_check_stream.py index 4f0a42c52..10da6870c 100644 --- a/unit_tests/sources/declarative/checks/test_check_stream.py +++ b/unit_tests/sources/declarative/checks/test_check_stream.py @@ -11,6 +11,7 @@ import pytest import requests +from jsonschema.exceptions import ValidationError from airbyte_cdk.sources.declarative.checks.check_stream import CheckStream from airbyte_cdk.sources.streams.http import HttpStream from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( @@ -343,52 +344,97 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp "name": "static_stream", "primary_key": "id", "schema_loader": { - "type": "InlineSchemaLoader", - "schema": { - "$schema": "http://json-schema.org/schema#", - "properties": { - "id": {"type": "integer"}, - "name": {"type": "string"}, - }, - "type": "object", + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "id": {"type": "integer"}, + "name": {"type": "string"}, }, - } + "type": "object", + }, + } } ] } @pytest.mark.parametrize( - "check_component", + "check_component, expected_result, expectation, response_code, expected_messages", [ - pytest.param({"check": {"type": "CheckStream", "stream_names": ["static_stream"]}}, + pytest.param({"check": {"type": "CheckStream", "stream_names": ["static_stream"]}}, True, False, 200, + [{"id": 1, "name": "static_1"}, {"id": 2, "name": "static_2"}], id="test_check_only_static_streams"), pytest.param({"check": {"type": "CheckStream", "stream_names": ["static_stream"], "dynamic_streams_check_configs": [ {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "http_dynamic_stream", - "stream_count": 1}]}}, id="test_check_static_streams_and_http_dynamic_stream"), + "stream_count": 1}]}}, True, False, 200, [], + id="test_check_static_streams_and_http_dynamic_stream"), pytest.param({"check": {"type": "CheckStream", "stream_names": ["static_stream"], "dynamic_streams_check_configs": [ {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "dynamic_stream_1", - "stream_count": 1}]}}, id="test_check_static_streams_and_config_dynamic_stream"), - pytest.param({"check": {"type": "CheckStream", "dynamic_streams_check_configs": [{"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "dynamic_stream_1", - "stream_count": 1}, {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "http_dynamic_stream"}]}}, + "stream_count": 1}]}}, True, False, 200, [], + id="test_check_static_streams_and_config_dynamic_stream"), + pytest.param({"check": {"type": "CheckStream", "dynamic_streams_check_configs": [ + {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "dynamic_stream_1", + "stream_count": 1}, {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "http_dynamic_stream"}]}}, + True, False, 200, [], id="test_check_http_dynamic_stream_and_config_dynamic_stream"), pytest.param({"check": {"type": "CheckStream", "stream_names": ["static_stream"], "dynamic_streams_check_configs": [ {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "dynamic_stream_1", "stream_count": 1}, - {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "http_dynamic_stream"}]}}, + {"type": "DynamicStreamCheckConfig", + "dynamic_stream_name": "http_dynamic_stream"}]}}, True, False, 200, [], id="test_check_static_streams_and_http_dynamic_stream_and_config_dynamic_stream"), + pytest.param( + {"check": {"type": "CheckStream", "stream_names": ["non_existent_stream"]}}, + False, + True, 200, [], + id="test_non_existent_static_stream" + ), + pytest.param( + {"check": {"type": "CheckStream", "dynamic_streams_check_configs": [ + {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "unknown_dynamic_stream", + "stream_count": 1}]}} + , + False, + False, 200, [], + id="test_non_existent_dynamic_stream" + ), + pytest.param({"check": {"type": "CheckStream", "stream_names": ["static_stream"]}}, False, False, 404, + ["Not found. The requested resource was not found on the server."], + id="test_stream_unavailable_unhandled_error"), + pytest.param({"check": {"type": "CheckStream", "stream_names": ["static_stream"]}}, False, False, 403, + ["Forbidden. You don't have permission to access this resource."], + id="test_stream_unavailable_handled_error"), + pytest.param({"check": {"type": "CheckStream", "stream_names": ["static_stream"]}}, False, False, 401, + ["Unauthorized. Please ensure you are authenticated correctly."], + id="test_stream_unauthorized_error"), + pytest.param({"check": {"type": "CheckStream", "dynamic_streams_check_configs": [ + {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "dynamic_stream_1", + "stream_count": 1}, {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "http_dynamic_stream"}]}}, + False, False, 404, ["Not found. The requested resource was not found on the server."], + id="test_dynamic_stream_unavailable_unhandled_error"), + pytest.param({"check": {"type": "CheckStream", "dynamic_streams_check_configs": [ + {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "dynamic_stream_1", + "stream_count": 1}, {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "http_dynamic_stream"}]}}, + False, False, 403, ["Forbidden. You don't have permission to access this resource."], + id="test_dynamic_stream_unavailable_handled_error"), + pytest.param({"check": {"type": "CheckStream", "dynamic_streams_check_configs": [ + {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "dynamic_stream_1", + "stream_count": 1}, {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "http_dynamic_stream"}]}}, + False, False, 401, ["Unauthorized. Please ensure you are authenticated correctly."], + id="test_dynamic_stream_unauthorized_error"), ], ) -def test_check_stream(check_component): +def test_check_stream1(check_component, expected_result, expectation, response_code, expected_messages): manifest = {**deepcopy(_MANIFEST_WITHOUT_CHECK_COMPONENT), **check_component} with HttpMocker() as http_mocker: static_stream_request = HttpRequest(url="https://api.test.com/static") static_stream_response = HttpResponse( - body=json.dumps([{"id": 1, "name": "static_1"}, {"id": 2, "name": "static_2"}]) + body=json.dumps(expected_messages), status_code=response_code ) http_mocker.get(static_stream_request, static_stream_response) @@ -399,11 +445,11 @@ def test_check_stream(check_component): http_mocker.get(items_request, items_response) item_request = HttpRequest(url="https://api.test.com/items/1") - item_response = HttpResponse(body=json.dumps([]), status_code=200) + item_response = HttpResponse(body=json.dumps(expected_messages), status_code=response_code) http_mocker.get(item_request, item_response) item_request = HttpRequest(url="https://api.test.com/items/3") - item_response = HttpResponse(body=json.dumps([]), status_code=200) + item_response = HttpResponse(body=json.dumps(expected_messages), status_code=response_code) http_mocker.get(item_request, item_response) source = ConcurrentDeclarativeSource( @@ -412,14 +458,32 @@ def test_check_stream(check_component): catalog=None, state=None, ) + if expectation: + with pytest.raises(ValueError): + source.check_connection(logger, _CONFIG) + else: + stream_is_available, reason = source.check_connection(logger, _CONFIG) - stream_is_available, reason = source.check_connection(logger, _CONFIG) + assert stream_is_available == expected_result - assert stream_is_available + +def test_check_stream_missing_fields(): + """Test if ValueError is raised when dynamic_streams_check_configs is missing required fields.""" + manifest = {**deepcopy(_MANIFEST_WITHOUT_CHECK_COMPONENT), + **{"check": {"type": "CheckStream", + "dynamic_streams_check_configs": [{"type": "DynamicStreamCheckConfig"}]}}} + with pytest.raises(ValidationError): + source = ConcurrentDeclarativeSource( + source_config=manifest, + config=_CONFIG, + catalog=None, + state=None, + ) def test_check_stream_only_type_provided(): - manifest = {**deepcopy(_MANIFEST_WITHOUT_CHECK_COMPONENT), **{"check": {"type": "CheckStream"}}} + manifest = {**deepcopy(_MANIFEST_WITHOUT_CHECK_COMPONENT), + **{"check": {"type": "CheckStream"}}} source = ConcurrentDeclarativeSource( source_config=manifest, config=_CONFIG, From 68741ec401ade22ca52a5e0f328e95489afbdec7 Mon Sep 17 00:00:00 2001 From: octavia-squidington-iii Date: Fri, 28 Mar 2025 16:32:04 +0000 Subject: [PATCH 5/7] Auto-fix lint and format issues --- .../declarative/checks/check_stream.py | 52 +++- .../declarative/checks/test_check_stream.py | 288 ++++++++++++++---- 2 files changed, 260 insertions(+), 80 deletions(-) diff --git a/airbyte_cdk/sources/declarative/checks/check_stream.py b/airbyte_cdk/sources/declarative/checks/check_stream.py index ec499309b..ebac89899 100644 --- a/airbyte_cdk/sources/declarative/checks/check_stream.py +++ b/airbyte_cdk/sources/declarative/checks/check_stream.py @@ -5,7 +5,7 @@ import logging import traceback from dataclasses import InitVar, dataclass -from typing import Any, List, Mapping, Tuple, Dict, Optional +from typing import Any, Dict, List, Mapping, Optional, Tuple from airbyte_cdk import AbstractSource from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker @@ -47,7 +47,7 @@ def _log_error(self, logger: logging.Logger, action: str, error: Exception) -> T return False, error_message def check_connection( - self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any] + self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any] ) -> Tuple[bool, Any]: """Checks the connection to the source and its streams.""" try: @@ -64,18 +64,26 @@ def check_connection( f"{stream_name} is not part of the catalog. Expected one of {list(stream_name_to_stream.keys())}." ) - stream_availability, message = self._check_stream_availability(stream_name_to_stream, stream_name, logger) + stream_availability, message = self._check_stream_availability( + stream_name_to_stream, stream_name, logger + ) if not stream_availability: return stream_availability, message - should_check_dynamic_streams = hasattr(source, "resolved_manifest") and hasattr(source, "dynamic_streams") and self.dynamic_streams_check_configs + should_check_dynamic_streams = ( + hasattr(source, "resolved_manifest") + and hasattr(source, "dynamic_streams") + and self.dynamic_streams_check_configs + ) if should_check_dynamic_streams: return self._check_dynamic_streams_availability(source, stream_name_to_stream, logger) return True, None - def _check_stream_availability(self, stream_name_to_stream: Dict[str, Any], stream_name: str, logger: logging.Logger) -> Tuple[bool, Any]: + def _check_stream_availability( + self, stream_name_to_stream: Dict[str, Any], stream_name: str, logger: logging.Logger + ) -> Tuple[bool, Any]: """Checks if streams are available.""" availability_strategy = HttpAvailabilityStrategy() try: @@ -90,27 +98,34 @@ def _check_stream_availability(self, stream_name_to_stream: Dict[str, Any], stre return True, None def _check_dynamic_streams_availability( - self, source: AbstractSource, stream_name_to_stream: Dict[str, Any], logger: logging.Logger + self, source: AbstractSource, stream_name_to_stream: Dict[str, Any], logger: logging.Logger ) -> Tuple[bool, Any]: """Checks the availability of dynamic streams.""" dynamic_streams = source.resolved_manifest.get("dynamic_streams", []) # type: ignore[attr-defined] # The source's resolved_manifest manifest is checked before calling this method dynamic_stream_name_to_dynamic_stream = { ds.get("name", f"dynamic_stream_{i}"): ds for i, ds in enumerate(dynamic_streams) } - generated_streams = self._map_generated_streams(source.dynamic_streams) # type: ignore[attr-defined] # The source's dynamic_streams manifest is checked before calling this method + generated_streams = self._map_generated_streams(source.dynamic_streams) # type: ignore[attr-defined] # The source's dynamic_streams manifest is checked before calling this method - for check_config in self.dynamic_streams_check_configs: # type: ignore[union-attr] # None value for self.dynamic_streams_check_configs handled in __post_init__ + for check_config in self.dynamic_streams_check_configs: # type: ignore[union-attr] # None value for self.dynamic_streams_check_configs handled in __post_init__ if check_config.dynamic_stream_name not in dynamic_stream_name_to_dynamic_stream: - return False, f"Dynamic stream {check_config.dynamic_stream_name} is not found in manifest." + return ( + False, + f"Dynamic stream {check_config.dynamic_stream_name} is not found in manifest.", + ) generated = generated_streams.get(check_config.dynamic_stream_name, []) - stream_availability, message = self._check_generated_streams_availability(generated, stream_name_to_stream, logger, check_config.stream_count) + stream_availability, message = self._check_generated_streams_availability( + generated, stream_name_to_stream, logger, check_config.stream_count + ) if not stream_availability: return stream_availability, message return True, None - def _map_generated_streams(self, dynamic_streams: List[Dict[str, Any]]) -> Dict[str, List[Dict[str, Any]]]: + def _map_generated_streams( + self, dynamic_streams: List[Dict[str, Any]] + ) -> Dict[str, List[Dict[str, Any]]]: """Maps dynamic stream names to their corresponding generated streams.""" mapped_streams: Dict[str, List[Dict[str, Any]]] = {} for stream in dynamic_streams: @@ -118,19 +133,26 @@ def _map_generated_streams(self, dynamic_streams: List[Dict[str, Any]]) -> Dict[ return mapped_streams def _check_generated_streams_availability( - self, generated_streams: List[Dict[str, Any]], stream_name_to_stream: Dict[str, Any], - logger: logging.Logger, max_count: int + self, + generated_streams: List[Dict[str, Any]], + stream_name_to_stream: Dict[str, Any], + logger: logging.Logger, + max_count: int, ) -> Tuple[bool, Any]: """Checks availability of generated dynamic streams.""" availability_strategy = HttpAvailabilityStrategy() for declarative_stream in generated_streams[: min(max_count, len(generated_streams))]: stream = stream_name_to_stream[declarative_stream["name"]] try: - stream_is_available, reason = availability_strategy.check_availability(stream, logger) + stream_is_available, reason = availability_strategy.check_availability( + stream, logger + ) if not stream_is_available: message = f"Dynamic Stream {stream.name} is not available: {reason}" logger.warning(message) return False, message except Exception as error: - return self._log_error(logger, f"checking availability of dynamic stream {stream.name}", error) + return self._log_error( + logger, f"checking availability of dynamic stream {stream.name}", error + ) return True, None diff --git a/unit_tests/sources/declarative/checks/test_check_stream.py b/unit_tests/sources/declarative/checks/test_check_stream.py index d870b6b4f..f4d3569b9 100644 --- a/unit_tests/sources/declarative/checks/test_check_stream.py +++ b/unit_tests/sources/declarative/checks/test_check_stream.py @@ -10,8 +10,8 @@ import pytest import requests - from jsonschema.exceptions import ValidationError + from airbyte_cdk.sources.declarative.checks.check_stream import CheckStream from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( ConcurrentDeclarativeSource, @@ -356,7 +356,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp }, "type": "object", }, - } + }, } ], } @@ -365,73 +365,226 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp @pytest.mark.parametrize( "check_component, expected_result, expectation, response_code, expected_messages", [ - pytest.param({"check": {"type": "CheckStream", "stream_names": ["static_stream"]}}, True, False, 200, - [{"id": 1, "name": "static_1"}, {"id": 2, "name": "static_2"}], - id="test_check_only_static_streams"), - pytest.param({"check": {"type": "CheckStream", "stream_names": ["static_stream"], - "dynamic_streams_check_configs": [ - {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "http_dynamic_stream", - "stream_count": 1}]}}, True, False, 200, [], - id="test_check_static_streams_and_http_dynamic_stream"), - pytest.param({"check": {"type": "CheckStream", "stream_names": ["static_stream"], - "dynamic_streams_check_configs": [ - {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "dynamic_stream_1", - "stream_count": 1}]}}, True, False, 200, [], - id="test_check_static_streams_and_config_dynamic_stream"), - pytest.param({"check": {"type": "CheckStream", "dynamic_streams_check_configs": [ - {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "dynamic_stream_1", - "stream_count": 1}, {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "http_dynamic_stream"}]}}, - True, False, 200, [], - id="test_check_http_dynamic_stream_and_config_dynamic_stream"), - pytest.param({"check": {"type": "CheckStream", "stream_names": ["static_stream"], - "dynamic_streams_check_configs": [ - {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "dynamic_stream_1", - "stream_count": 1}, - {"type": "DynamicStreamCheckConfig", - "dynamic_stream_name": "http_dynamic_stream"}]}}, True, False, 200, [], - id="test_check_static_streams_and_http_dynamic_stream_and_config_dynamic_stream"), + pytest.param( + {"check": {"type": "CheckStream", "stream_names": ["static_stream"]}}, + True, + False, + 200, + [{"id": 1, "name": "static_1"}, {"id": 2, "name": "static_2"}], + id="test_check_only_static_streams", + ), + pytest.param( + { + "check": { + "type": "CheckStream", + "stream_names": ["static_stream"], + "dynamic_streams_check_configs": [ + { + "type": "DynamicStreamCheckConfig", + "dynamic_stream_name": "http_dynamic_stream", + "stream_count": 1, + } + ], + } + }, + True, + False, + 200, + [], + id="test_check_static_streams_and_http_dynamic_stream", + ), + pytest.param( + { + "check": { + "type": "CheckStream", + "stream_names": ["static_stream"], + "dynamic_streams_check_configs": [ + { + "type": "DynamicStreamCheckConfig", + "dynamic_stream_name": "dynamic_stream_1", + "stream_count": 1, + } + ], + } + }, + True, + False, + 200, + [], + id="test_check_static_streams_and_config_dynamic_stream", + ), + pytest.param( + { + "check": { + "type": "CheckStream", + "dynamic_streams_check_configs": [ + { + "type": "DynamicStreamCheckConfig", + "dynamic_stream_name": "dynamic_stream_1", + "stream_count": 1, + }, + { + "type": "DynamicStreamCheckConfig", + "dynamic_stream_name": "http_dynamic_stream", + }, + ], + } + }, + True, + False, + 200, + [], + id="test_check_http_dynamic_stream_and_config_dynamic_stream", + ), + pytest.param( + { + "check": { + "type": "CheckStream", + "stream_names": ["static_stream"], + "dynamic_streams_check_configs": [ + { + "type": "DynamicStreamCheckConfig", + "dynamic_stream_name": "dynamic_stream_1", + "stream_count": 1, + }, + { + "type": "DynamicStreamCheckConfig", + "dynamic_stream_name": "http_dynamic_stream", + }, + ], + } + }, + True, + False, + 200, + [], + id="test_check_static_streams_and_http_dynamic_stream_and_config_dynamic_stream", + ), pytest.param( {"check": {"type": "CheckStream", "stream_names": ["non_existent_stream"]}}, False, - True, 200, [], - id="test_non_existent_static_stream" + True, + 200, + [], + id="test_non_existent_static_stream", + ), + pytest.param( + { + "check": { + "type": "CheckStream", + "dynamic_streams_check_configs": [ + { + "type": "DynamicStreamCheckConfig", + "dynamic_stream_name": "unknown_dynamic_stream", + "stream_count": 1, + } + ], + } + }, + False, + False, + 200, + [], + id="test_non_existent_dynamic_stream", ), pytest.param( - {"check": {"type": "CheckStream", "dynamic_streams_check_configs": [ - {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "unknown_dynamic_stream", - "stream_count": 1}]}} - , + {"check": {"type": "CheckStream", "stream_names": ["static_stream"]}}, + False, False, - False, 200, [], - id="test_non_existent_dynamic_stream" + 404, + ["Not found. The requested resource was not found on the server."], + id="test_stream_unavailable_unhandled_error", + ), + pytest.param( + {"check": {"type": "CheckStream", "stream_names": ["static_stream"]}}, + False, + False, + 403, + ["Forbidden. You don't have permission to access this resource."], + id="test_stream_unavailable_handled_error", + ), + pytest.param( + {"check": {"type": "CheckStream", "stream_names": ["static_stream"]}}, + False, + False, + 401, + ["Unauthorized. Please ensure you are authenticated correctly."], + id="test_stream_unauthorized_error", + ), + pytest.param( + { + "check": { + "type": "CheckStream", + "dynamic_streams_check_configs": [ + { + "type": "DynamicStreamCheckConfig", + "dynamic_stream_name": "dynamic_stream_1", + "stream_count": 1, + }, + { + "type": "DynamicStreamCheckConfig", + "dynamic_stream_name": "http_dynamic_stream", + }, + ], + } + }, + False, + False, + 404, + ["Not found. The requested resource was not found on the server."], + id="test_dynamic_stream_unavailable_unhandled_error", + ), + pytest.param( + { + "check": { + "type": "CheckStream", + "dynamic_streams_check_configs": [ + { + "type": "DynamicStreamCheckConfig", + "dynamic_stream_name": "dynamic_stream_1", + "stream_count": 1, + }, + { + "type": "DynamicStreamCheckConfig", + "dynamic_stream_name": "http_dynamic_stream", + }, + ], + } + }, + False, + False, + 403, + ["Forbidden. You don't have permission to access this resource."], + id="test_dynamic_stream_unavailable_handled_error", + ), + pytest.param( + { + "check": { + "type": "CheckStream", + "dynamic_streams_check_configs": [ + { + "type": "DynamicStreamCheckConfig", + "dynamic_stream_name": "dynamic_stream_1", + "stream_count": 1, + }, + { + "type": "DynamicStreamCheckConfig", + "dynamic_stream_name": "http_dynamic_stream", + }, + ], + } + }, + False, + False, + 401, + ["Unauthorized. Please ensure you are authenticated correctly."], + id="test_dynamic_stream_unauthorized_error", ), - pytest.param({"check": {"type": "CheckStream", "stream_names": ["static_stream"]}}, False, False, 404, - ["Not found. The requested resource was not found on the server."], - id="test_stream_unavailable_unhandled_error"), - pytest.param({"check": {"type": "CheckStream", "stream_names": ["static_stream"]}}, False, False, 403, - ["Forbidden. You don't have permission to access this resource."], - id="test_stream_unavailable_handled_error"), - pytest.param({"check": {"type": "CheckStream", "stream_names": ["static_stream"]}}, False, False, 401, - ["Unauthorized. Please ensure you are authenticated correctly."], - id="test_stream_unauthorized_error"), - pytest.param({"check": {"type": "CheckStream", "dynamic_streams_check_configs": [ - {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "dynamic_stream_1", - "stream_count": 1}, {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "http_dynamic_stream"}]}}, - False, False, 404, ["Not found. The requested resource was not found on the server."], - id="test_dynamic_stream_unavailable_unhandled_error"), - pytest.param({"check": {"type": "CheckStream", "dynamic_streams_check_configs": [ - {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "dynamic_stream_1", - "stream_count": 1}, {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "http_dynamic_stream"}]}}, - False, False, 403, ["Forbidden. You don't have permission to access this resource."], - id="test_dynamic_stream_unavailable_handled_error"), - pytest.param({"check": {"type": "CheckStream", "dynamic_streams_check_configs": [ - {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "dynamic_stream_1", - "stream_count": 1}, {"type": "DynamicStreamCheckConfig", "dynamic_stream_name": "http_dynamic_stream"}]}}, - False, False, 401, ["Unauthorized. Please ensure you are authenticated correctly."], - id="test_dynamic_stream_unauthorized_error"), ], ) -def test_check_stream1(check_component, expected_result, expectation, response_code, expected_messages): +def test_check_stream1( + check_component, expected_result, expectation, response_code, expected_messages +): manifest = {**deepcopy(_MANIFEST_WITHOUT_CHECK_COMPONENT), **check_component} with HttpMocker() as http_mocker: @@ -472,9 +625,15 @@ def test_check_stream1(check_component, expected_result, expectation, response_c def test_check_stream_missing_fields(): """Test if ValueError is raised when dynamic_streams_check_configs is missing required fields.""" - manifest = {**deepcopy(_MANIFEST_WITHOUT_CHECK_COMPONENT), - **{"check": {"type": "CheckStream", - "dynamic_streams_check_configs": [{"type": "DynamicStreamCheckConfig"}]}}} + manifest = { + **deepcopy(_MANIFEST_WITHOUT_CHECK_COMPONENT), + **{ + "check": { + "type": "CheckStream", + "dynamic_streams_check_configs": [{"type": "DynamicStreamCheckConfig"}], + } + }, + } with pytest.raises(ValidationError): source = ConcurrentDeclarativeSource( source_config=manifest, @@ -485,8 +644,7 @@ def test_check_stream_missing_fields(): def test_check_stream_only_type_provided(): - manifest = {**deepcopy(_MANIFEST_WITHOUT_CHECK_COMPONENT), - **{"check": {"type": "CheckStream"}}} + manifest = {**deepcopy(_MANIFEST_WITHOUT_CHECK_COMPONENT), **{"check": {"type": "CheckStream"}}} source = ConcurrentDeclarativeSource( source_config=manifest, config=_CONFIG, From 8f8659d59d207ce530bd4e0a2671523d5f884898 Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Tue, 1 Apr 2025 13:43:16 +0200 Subject: [PATCH 6/7] Update action name --- airbyte_cdk/sources/declarative/checks/check_stream.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte_cdk/sources/declarative/checks/check_stream.py b/airbyte_cdk/sources/declarative/checks/check_stream.py index ebac89899..1123349cb 100644 --- a/airbyte_cdk/sources/declarative/checks/check_stream.py +++ b/airbyte_cdk/sources/declarative/checks/check_stream.py @@ -55,7 +55,7 @@ def check_connection( if not streams: return False, f"No streams to connect to from source {source}" except Exception as error: - return self._log_error(logger, "connecting to streams", error) + return self._log_error(logger, "discovering streams", error) stream_name_to_stream = {s.name: s for s in streams} for stream_name in self.stream_names: From 6fc31f318156660898abfdb6a7094aea308f753f Mon Sep 17 00:00:00 2001 From: Serhii Lazebnyi Date: Tue, 1 Apr 2025 17:02:15 +0200 Subject: [PATCH 7/7] Add unit tase case and update description --- .../declarative_component_schema.yaml | 2 +- .../declarative/checks/test_check_stream.py | 51 ++++++++++++++++--- 2 files changed, 45 insertions(+), 8 deletions(-) diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index 43aec54f3..46a202b19 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -348,7 +348,7 @@ definitions: type: string stream_count: title: Stream Count - description: Numbers of the streams to try reading from when running a check operation. + description: The number of streams to attempt reading from during a check operation. If `stream_count` exceeds the total number of available streams, the minimum of the two values will be used. type: integer default: 0 CheckDynamicStream: diff --git a/unit_tests/sources/declarative/checks/test_check_stream.py b/unit_tests/sources/declarative/checks/test_check_stream.py index f4d3569b9..3cbaf8fd8 100644 --- a/unit_tests/sources/declarative/checks/test_check_stream.py +++ b/unit_tests/sources/declarative/checks/test_check_stream.py @@ -363,7 +363,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp @pytest.mark.parametrize( - "check_component, expected_result, expectation, response_code, expected_messages", + "check_component, expected_result, expectation, response_code, expected_messages, request_count", [ pytest.param( {"check": {"type": "CheckStream", "stream_names": ["static_stream"]}}, @@ -371,6 +371,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp False, 200, [{"id": 1, "name": "static_1"}, {"id": 2, "name": "static_2"}], + 0, id="test_check_only_static_streams", ), pytest.param( @@ -391,6 +392,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp False, 200, [], + 0, id="test_check_static_streams_and_http_dynamic_stream", ), pytest.param( @@ -411,6 +413,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp False, 200, [], + 0, id="test_check_static_streams_and_config_dynamic_stream", ), pytest.param( @@ -434,6 +437,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp False, 200, [], + 0, id="test_check_http_dynamic_stream_and_config_dynamic_stream", ), pytest.param( @@ -458,14 +462,36 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp False, 200, [], + 0, id="test_check_static_streams_and_http_dynamic_stream_and_config_dynamic_stream", ), + pytest.param( + { + "check": { + "type": "CheckStream", + "dynamic_streams_check_configs": [ + { + "type": "DynamicStreamCheckConfig", + "dynamic_stream_name": "http_dynamic_stream", + "stream_count": 1000, + }, + ], + } + }, + True, + False, + 200, + [], + 1, + id="test_stream_count_gt_generated_streams", + ), pytest.param( {"check": {"type": "CheckStream", "stream_names": ["non_existent_stream"]}}, False, True, 200, [], + 0, id="test_non_existent_static_stream", ), pytest.param( @@ -485,6 +511,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp False, 200, [], + 0, id="test_non_existent_dynamic_stream", ), pytest.param( @@ -493,6 +520,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp False, 404, ["Not found. The requested resource was not found on the server."], + 0, id="test_stream_unavailable_unhandled_error", ), pytest.param( @@ -501,6 +529,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp False, 403, ["Forbidden. You don't have permission to access this resource."], + 0, id="test_stream_unavailable_handled_error", ), pytest.param( @@ -509,6 +538,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp False, 401, ["Unauthorized. Please ensure you are authenticated correctly."], + 0, id="test_stream_unauthorized_error", ), pytest.param( @@ -532,6 +562,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp False, 404, ["Not found. The requested resource was not found on the server."], + 0, id="test_dynamic_stream_unavailable_unhandled_error", ), pytest.param( @@ -555,6 +586,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp False, 403, ["Forbidden. You don't have permission to access this resource."], + 0, id="test_dynamic_stream_unavailable_handled_error", ), pytest.param( @@ -578,12 +610,13 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp False, 401, ["Unauthorized. Please ensure you are authenticated correctly."], + 0, id="test_dynamic_stream_unauthorized_error", ), ], ) def test_check_stream1( - check_component, expected_result, expectation, response_code, expected_messages + check_component, expected_result, expectation, response_code, expected_messages, request_count ): manifest = {**deepcopy(_MANIFEST_WITHOUT_CHECK_COMPONENT), **check_component} @@ -600,13 +633,17 @@ def test_check_stream1( ) http_mocker.get(items_request, items_response) - item_request = HttpRequest(url="https://api.test.com/items/1") + item_request_1 = HttpRequest(url="https://api.test.com/items/1") item_response = HttpResponse(body=json.dumps(expected_messages), status_code=response_code) - http_mocker.get(item_request, item_response) + http_mocker.get(item_request_1, item_response) - item_request = HttpRequest(url="https://api.test.com/items/3") + item_request_2 = HttpRequest(url="https://api.test.com/items/2") item_response = HttpResponse(body=json.dumps(expected_messages), status_code=response_code) - http_mocker.get(item_request, item_response) + http_mocker.get(item_request_2, item_response) + + item_request_3 = HttpRequest(url="https://api.test.com/items/3") + item_response = HttpResponse(body=json.dumps(expected_messages), status_code=response_code) + http_mocker.get(item_request_3, item_response) source = ConcurrentDeclarativeSource( source_config=manifest, @@ -619,7 +656,7 @@ def test_check_stream1( source.check_connection(logger, _CONFIG) else: stream_is_available, reason = source.check_connection(logger, _CONFIG) - + http_mocker.assert_number_of_calls(item_request_2, request_count) assert stream_is_available == expected_result