diff --git a/airbyte_cdk/sources/declarative/extractors/record_selector.py b/airbyte_cdk/sources/declarative/extractors/record_selector.py index 73e854076..80d0155e3 100644 --- a/airbyte_cdk/sources/declarative/extractors/record_selector.py +++ b/airbyte_cdk/sources/declarative/extractors/record_selector.py @@ -15,7 +15,7 @@ ) from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.models import SchemaNormalization -from airbyte_cdk.sources.declarative.retrievers.file_uploader import FileUploader +from airbyte_cdk.sources.declarative.retrievers.file_uploader import DefaultFileUploader from airbyte_cdk.sources.declarative.transformations import RecordTransformation from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState from airbyte_cdk.sources.utils.transform import TypeTransformer @@ -43,7 +43,7 @@ class RecordSelector(HttpSelector): record_filter: Optional[RecordFilter] = None transformations: List[RecordTransformation] = field(default_factory=lambda: []) transform_before_filtering: bool = False - file_uploader: Optional[FileUploader] = None + file_uploader: Optional[DefaultFileUploader] = None def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._parameters = parameters diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 4d4cd9440..14bb184f3 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -481,7 +481,13 @@ SimpleRetriever, SimpleRetrieverTestReadDecorator, ) -from airbyte_cdk.sources.declarative.retrievers.file_uploader import FileUploader +from airbyte_cdk.sources.declarative.retrievers.file_uploader import ( + ConnectorBuilderFileUploader, + DefaultFileUploader, + FileUploader, + LocalFileSystemFileWriter, + NoopFileWriter, +) from airbyte_cdk.sources.declarative.schema import ( ComplexFieldType, DefaultSchemaLoader, @@ -2815,7 +2821,7 @@ def create_record_selector( transformations: List[RecordTransformation] | None = None, decoder: Decoder | None = None, client_side_incremental_sync: Dict[str, Any] | None = None, - file_uploader: Optional[FileUploader] = None, + file_uploader: Optional[DefaultFileUploader] = None, **kwargs: Any, ) -> RecordSelector: extractor = self._create_component_from_model( @@ -2919,7 +2925,7 @@ def create_simple_retriever( stop_condition_on_cursor: bool = False, client_side_incremental_sync: Optional[Dict[str, Any]] = None, transformations: List[RecordTransformation], - file_uploader: Optional[FileUploader] = None, + file_uploader: Optional[DefaultFileUploader] = None, incremental_sync: Optional[ Union[ IncrementingCountCursorModel, DatetimeBasedCursorModel, CustomIncrementalSyncModel @@ -3606,14 +3612,24 @@ def create_file_uploader( name=name, **kwargs, ) - return FileUploader( + emit_connector_builder_messages = self._emit_connector_builder_messages + file_uploader = DefaultFileUploader( requester=requester, download_target_extractor=download_target_extractor, config=config, + file_writer=NoopFileWriter() + if emit_connector_builder_messages + else LocalFileSystemFileWriter(), parameters=model.parameters or {}, filename_extractor=model.filename_extractor if model.filename_extractor else None, ) + return ( + ConnectorBuilderFileUploader(file_uploader) + if emit_connector_builder_messages + else file_uploader + ) + def create_moving_window_call_rate_policy( self, model: MovingWindowCallRatePolicyModel, config: Config, **kwargs: Any ) -> MovingWindowCallRatePolicy: diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/__init__.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/__init__.py new file mode 100644 index 000000000..e839d9ba1 --- /dev/null +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/__init__.py @@ -0,0 +1,15 @@ +from .connector_builder_file_uploader import ConnectorBuilderFileUploader +from .default_file_uploader import DefaultFileUploader +from .file_uploader import FileUploader +from .file_writer import FileWriter +from .local_file_system_file_writer import LocalFileSystemFileWriter +from .noop_file_writer import NoopFileWriter + +__all__ = [ + "DefaultFileUploader", + "LocalFileSystemFileWriter", + "NoopFileWriter", + "ConnectorBuilderFileUploader", + "FileUploader", + "FileWriter", +] diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/connector_builder_file_uploader.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/connector_builder_file_uploader.py new file mode 100644 index 000000000..6db5a608f --- /dev/null +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/connector_builder_file_uploader.py @@ -0,0 +1,26 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + +from dataclasses import dataclass + +from airbyte_cdk.sources.declarative.types import Record + +from .default_file_uploader import DefaultFileUploader +from .file_uploader import FileUploader + + +@dataclass +class ConnectorBuilderFileUploader(FileUploader): + """ + Connector builder file uploader + Acts as a decorator or wrapper around a FileUploader instance, copying the attributes from record.file_reference into the record.data. + """ + + file_uploader: DefaultFileUploader + + def upload(self, record: Record) -> None: + self.file_uploader.upload(record=record) + for file_reference_key, file_reference_value in record.file_reference.__dict__.items(): + if not file_reference_key.startswith("_"): + record.data[file_reference_key] = file_reference_value # type: ignore diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/default_file_uploader.py similarity index 60% rename from airbyte_cdk/sources/declarative/retrievers/file_uploader.py rename to airbyte_cdk/sources/declarative/retrievers/file_uploader/default_file_uploader.py index 98342e1af..1312ab34d 100644 --- a/airbyte_cdk/sources/declarative/retrievers/file_uploader.py +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/default_file_uploader.py @@ -22,18 +22,27 @@ from airbyte_cdk.sources.types import Config from airbyte_cdk.sources.utils.files_directory import get_files_directory +from .file_uploader import FileUploader +from .file_writer import FileWriter + logger = logging.getLogger("airbyte") @dataclass -class FileUploader: +class DefaultFileUploader(FileUploader): + """ + File uploader class + Handles the upload logic: fetching the download target, making the request via its requester, determining the file path, and calling self.file_writer.write() + Different types of file_writer:BaseFileWriter can be injected to handle different file writing strategies. + """ + requester: Requester download_target_extractor: RecordExtractor config: Config + file_writer: FileWriter parameters: InitVar[Mapping[str, Any]] filename_extractor: Optional[Union[InterpolatedString, str]] = None - content_extractor: Optional[RecordExtractor] = None def __post_init__(self, parameters: Mapping[str, Any]) -> None: if self.filename_extractor: @@ -61,33 +70,28 @@ def upload(self, record: Record) -> None: ), ) - if self.content_extractor: - raise NotImplementedError("TODO") - else: - files_directory = Path(get_files_directory()) + files_directory = Path(get_files_directory()) - file_name = ( - self.filename_extractor.eval(self.config, record=record) - if self.filename_extractor - else str(uuid.uuid4()) - ) - file_name = file_name.lstrip("/") - file_relative_path = Path(record.stream_name) / Path(file_name) + file_name = ( + self.filename_extractor.eval(self.config, record=record) + if self.filename_extractor + else str(uuid.uuid4()) + ) + file_name = file_name.lstrip("/") + file_relative_path = Path(record.stream_name) / Path(file_name) - full_path = files_directory / file_relative_path - full_path.parent.mkdir(parents=True, exist_ok=True) + full_path = files_directory / file_relative_path + full_path.parent.mkdir(parents=True, exist_ok=True) - with open(str(full_path), "wb") as f: - f.write(response.content) - file_size_bytes = full_path.stat().st_size + file_size_bytes = self.file_writer.write(full_path, content=response.content) - logger.info("File uploaded successfully") - logger.info(f"File url: {str(full_path)}") - logger.info(f"File size: {file_size_bytes / 1024} KB") - logger.info(f"File relative path: {str(file_relative_path)}") + logger.info("File uploaded successfully") + logger.info(f"File url: {str(full_path)}") + logger.info(f"File size: {file_size_bytes / 1024} KB") + logger.info(f"File relative path: {str(file_relative_path)}") - record.file_reference = AirbyteRecordMessageFileReference( - staging_file_url=str(full_path), - source_file_relative_path=str(file_relative_path), - file_size_bytes=file_size_bytes, - ) + record.file_reference = AirbyteRecordMessageFileReference( + staging_file_url=str(full_path), + source_file_relative_path=str(file_relative_path), + file_size_bytes=file_size_bytes, + ) diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/file_uploader.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/file_uploader.py new file mode 100644 index 000000000..f68de46ef --- /dev/null +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/file_uploader.py @@ -0,0 +1,22 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + +from abc import ABC, abstractmethod +from dataclasses import dataclass + +from airbyte_cdk.sources.declarative.types import Record + + +@dataclass +class FileUploader(ABC): + """ + Base class for file uploader + """ + + @abstractmethod + def upload(self, record: Record) -> None: + """ + Uploads the file to the specified location + """ + ... diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/file_writer.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/file_writer.py new file mode 100644 index 000000000..b91923c8a --- /dev/null +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/file_writer.py @@ -0,0 +1,19 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + +from abc import ABC, abstractmethod +from pathlib import Path + + +class FileWriter(ABC): + """ + Base File writer class + """ + + @abstractmethod + def write(self, file_path: Path, content: bytes) -> int: + """ + Writes the file to the specified location + """ + ... diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/local_file_system_file_writer.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/local_file_system_file_writer.py new file mode 100644 index 000000000..8880fe1e4 --- /dev/null +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/local_file_system_file_writer.py @@ -0,0 +1,18 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + +from pathlib import Path + +from .file_writer import FileWriter + + +class LocalFileSystemFileWriter(FileWriter): + def write(self, file_path: Path, content: bytes) -> int: + """ + Writes the file to the specified location + """ + with open(str(file_path), "wb") as f: + f.write(content) + + return file_path.stat().st_size diff --git a/airbyte_cdk/sources/declarative/retrievers/file_uploader/noop_file_writer.py b/airbyte_cdk/sources/declarative/retrievers/file_uploader/noop_file_writer.py new file mode 100644 index 000000000..a074e1787 --- /dev/null +++ b/airbyte_cdk/sources/declarative/retrievers/file_uploader/noop_file_writer.py @@ -0,0 +1,17 @@ +# +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. +# + +from pathlib import Path + +from .file_writer import FileWriter + + +class NoopFileWriter(FileWriter): + NOOP_FILE_SIZE = -1 + + def write(self, file_path: Path, content: bytes) -> int: + """ + Noop file writer + """ + return self.NOOP_FILE_SIZE diff --git a/unit_tests/sources/declarative/file/test_file_stream.py b/unit_tests/sources/declarative/file/test_file_stream.py index e6ee40d5b..7b645f540 100644 --- a/unit_tests/sources/declarative/file/test_file_stream.py +++ b/unit_tests/sources/declarative/file/test_file_stream.py @@ -3,9 +3,13 @@ from pathlib import Path from typing import Any, Dict, List, Optional from unittest import TestCase -from unittest.mock import Mock +from unittest.mock import Mock, patch from airbyte_cdk.models import AirbyteStateMessage, ConfiguredAirbyteCatalog, Status +from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( + ModelToComponentFactory as OriginalModelToComponentFactory, +) +from airbyte_cdk.sources.declarative.retrievers.file_uploader.noop_file_writer import NoopFileWriter from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource from airbyte_cdk.test.catalog_builder import CatalogBuilder, ConfiguredAirbyteStreamBuilder from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput @@ -55,7 +59,11 @@ def read( config = config_builder.build() state = state_builder.build() if state_builder else StateBuilder().build() return entrypoint_read( - _source(catalog, config, state, yaml_file), config, catalog, state, expecting_exception + _source(catalog, config, state, yaml_file), + config, + catalog, + state, + expecting_exception, ) @@ -177,7 +185,7 @@ def test_get_article_attachments_with_filename_extractor(self) -> None: yaml_file="test_file_stream_with_filename_extractor.yaml", ) - assert output.records + assert len(output.records) == 1 file_reference = output.records[0].record.file_reference assert file_reference assert ( @@ -190,6 +198,61 @@ def test_get_article_attachments_with_filename_extractor(self) -> None: ) assert file_reference.file_size_bytes + def test_get_article_attachments_messages_for_connector_builder(self) -> None: + with HttpMocker() as http_mocker: + http_mocker.get( + HttpRequest(url=STREAM_URL), + HttpResponse(json.dumps(find_template("file_api/articles", __file__)), 200), + ) + http_mocker.get( + HttpRequest(url=STREAM_ATTACHMENTS_URL), + HttpResponse( + json.dumps(find_template("file_api/article_attachments", __file__)), 200 + ), + ) + http_mocker.get( + HttpRequest(url=STREAM_ATTACHMENT_CONTENT_URL), + HttpResponse( + find_binary_response("file_api/article_attachment_content.png", __file__), 200 + ), + ) + + # Define a mock factory that forces emit_connector_builder_messages=True + class MockModelToComponentFactory(OriginalModelToComponentFactory): + def __init__(self, *args, **kwargs): + kwargs["emit_connector_builder_messages"] = True + super().__init__(*args, **kwargs) + + # Patch the factory class where ConcurrentDeclarativeSource (parent of YamlDeclarativeSource) imports it + with patch( + "airbyte_cdk.sources.declarative.concurrent_declarative_source.ModelToComponentFactory", + new=MockModelToComponentFactory, + ): + output = read( + self._config(), + CatalogBuilder() + .with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments")) + .build(), + yaml_file="test_file_stream_with_filename_extractor.yaml", + ) + + assert len(output.records) == 1 + file_reference = output.records[0].record.file_reference + assert file_reference + assert file_reference.staging_file_url + assert file_reference.source_file_relative_path + # because we didn't write the file, the size is NOOP_FILE_SIZE + assert file_reference.file_size_bytes == NoopFileWriter.NOOP_FILE_SIZE + + # Assert file reference fields are copied to record data + record_data = output.records[0].record.data + assert record_data["staging_file_url"] == file_reference.staging_file_url + assert ( + record_data["source_file_relative_path"] + == file_reference.source_file_relative_path + ) + assert record_data["file_size_bytes"] == file_reference.file_size_bytes + def test_discover_article_attachments(self) -> None: output = discover(self._config())