Skip to content

feat(cdk): connector builder support for file uploader #503

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Apr 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions airbyte_cdk/sources/declarative/extractors/record_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
)
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.models import SchemaNormalization
from airbyte_cdk.sources.declarative.retrievers.file_uploader import FileUploader
from airbyte_cdk.sources.declarative.retrievers.file_uploader import DefaultFileUploader
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
from airbyte_cdk.sources.utils.transform import TypeTransformer
Expand Down Expand Up @@ -43,7 +43,7 @@ class RecordSelector(HttpSelector):
record_filter: Optional[RecordFilter] = None
transformations: List[RecordTransformation] = field(default_factory=lambda: [])
transform_before_filtering: bool = False
file_uploader: Optional[FileUploader] = None
file_uploader: Optional[DefaultFileUploader] = None

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._parameters = parameters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -481,7 +481,13 @@
SimpleRetriever,
SimpleRetrieverTestReadDecorator,
)
from airbyte_cdk.sources.declarative.retrievers.file_uploader import FileUploader
from airbyte_cdk.sources.declarative.retrievers.file_uploader import (
ConnectorBuilderFileUploader,
DefaultFileUploader,
FileUploader,
LocalFileSystemFileWriter,
NoopFileWriter,
)
from airbyte_cdk.sources.declarative.schema import (
ComplexFieldType,
DefaultSchemaLoader,
Expand Down Expand Up @@ -2815,7 +2821,7 @@ def create_record_selector(
transformations: List[RecordTransformation] | None = None,
decoder: Decoder | None = None,
client_side_incremental_sync: Dict[str, Any] | None = None,
file_uploader: Optional[FileUploader] = None,
file_uploader: Optional[DefaultFileUploader] = None,
**kwargs: Any,
) -> RecordSelector:
extractor = self._create_component_from_model(
Expand Down Expand Up @@ -2919,7 +2925,7 @@ def create_simple_retriever(
stop_condition_on_cursor: bool = False,
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
transformations: List[RecordTransformation],
file_uploader: Optional[FileUploader] = None,
file_uploader: Optional[DefaultFileUploader] = None,
incremental_sync: Optional[
Union[
IncrementingCountCursorModel, DatetimeBasedCursorModel, CustomIncrementalSyncModel
Expand Down Expand Up @@ -3606,14 +3612,24 @@ def create_file_uploader(
name=name,
**kwargs,
)
return FileUploader(
emit_connector_builder_messages = self._emit_connector_builder_messages
file_uploader = DefaultFileUploader(
requester=requester,
download_target_extractor=download_target_extractor,
config=config,
file_writer=NoopFileWriter()
if emit_connector_builder_messages
else LocalFileSystemFileWriter(),
parameters=model.parameters or {},
filename_extractor=model.filename_extractor if model.filename_extractor else None,
)

return (
ConnectorBuilderFileUploader(file_uploader)
if emit_connector_builder_messages
else file_uploader
)

def create_moving_window_call_rate_policy(
self, model: MovingWindowCallRatePolicyModel, config: Config, **kwargs: Any
) -> MovingWindowCallRatePolicy:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from .connector_builder_file_uploader import ConnectorBuilderFileUploader
from .default_file_uploader import DefaultFileUploader
from .file_uploader import FileUploader
from .file_writer import FileWriter
from .local_file_system_file_writer import LocalFileSystemFileWriter
from .noop_file_writer import NoopFileWriter

__all__ = [
"DefaultFileUploader",
"LocalFileSystemFileWriter",
"NoopFileWriter",
"ConnectorBuilderFileUploader",
"FileUploader",
"FileWriter",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from dataclasses import dataclass

from airbyte_cdk.sources.declarative.types import Record

from .default_file_uploader import DefaultFileUploader
from .file_uploader import FileUploader


@dataclass
class ConnectorBuilderFileUploader(FileUploader):
"""
Connector builder file uploader
Acts as a decorator or wrapper around a FileUploader instance, copying the attributes from record.file_reference into the record.data.
"""

file_uploader: DefaultFileUploader

def upload(self, record: Record) -> None:
self.file_uploader.upload(record=record)
for file_reference_key, file_reference_value in record.file_reference.__dict__.items():
if not file_reference_key.startswith("_"):
record.data[file_reference_key] = file_reference_value # type: ignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,18 +22,27 @@
from airbyte_cdk.sources.types import Config
from airbyte_cdk.sources.utils.files_directory import get_files_directory

from .file_uploader import FileUploader
from .file_writer import FileWriter

logger = logging.getLogger("airbyte")


@dataclass
class FileUploader:
class DefaultFileUploader(FileUploader):
"""
File uploader class
Handles the upload logic: fetching the download target, making the request via its requester, determining the file path, and calling self.file_writer.write()
Different types of file_writer:BaseFileWriter can be injected to handle different file writing strategies.
"""

requester: Requester
download_target_extractor: RecordExtractor
config: Config
file_writer: FileWriter
parameters: InitVar[Mapping[str, Any]]

filename_extractor: Optional[Union[InterpolatedString, str]] = None
content_extractor: Optional[RecordExtractor] = None

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
if self.filename_extractor:
Expand Down Expand Up @@ -61,33 +70,28 @@ def upload(self, record: Record) -> None:
),
)

if self.content_extractor:
raise NotImplementedError("TODO")
else:
files_directory = Path(get_files_directory())
files_directory = Path(get_files_directory())

file_name = (
self.filename_extractor.eval(self.config, record=record)
if self.filename_extractor
else str(uuid.uuid4())
)
file_name = file_name.lstrip("/")
file_relative_path = Path(record.stream_name) / Path(file_name)
file_name = (
self.filename_extractor.eval(self.config, record=record)
if self.filename_extractor
else str(uuid.uuid4())
)
file_name = file_name.lstrip("/")
file_relative_path = Path(record.stream_name) / Path(file_name)

full_path = files_directory / file_relative_path
full_path.parent.mkdir(parents=True, exist_ok=True)
full_path = files_directory / file_relative_path
full_path.parent.mkdir(parents=True, exist_ok=True)

with open(str(full_path), "wb") as f:
f.write(response.content)
file_size_bytes = full_path.stat().st_size
file_size_bytes = self.file_writer.write(full_path, content=response.content)

logger.info("File uploaded successfully")
logger.info(f"File url: {str(full_path)}")
logger.info(f"File size: {file_size_bytes / 1024} KB")
logger.info(f"File relative path: {str(file_relative_path)}")
logger.info("File uploaded successfully")
logger.info(f"File url: {str(full_path)}")
logger.info(f"File size: {file_size_bytes / 1024} KB")
logger.info(f"File relative path: {str(file_relative_path)}")

record.file_reference = AirbyteRecordMessageFileReference(
staging_file_url=str(full_path),
source_file_relative_path=str(file_relative_path),
file_size_bytes=file_size_bytes,
)
record.file_reference = AirbyteRecordMessageFileReference(
staging_file_url=str(full_path),
source_file_relative_path=str(file_relative_path),
file_size_bytes=file_size_bytes,
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from abc import ABC, abstractmethod
from dataclasses import dataclass

from airbyte_cdk.sources.declarative.types import Record


@dataclass
class FileUploader(ABC):
"""
Base class for file uploader
"""

@abstractmethod
def upload(self, record: Record) -> None:
"""
Uploads the file to the specified location
"""
...
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from abc import ABC, abstractmethod
from pathlib import Path


class FileWriter(ABC):
"""
Base File writer class
"""

@abstractmethod
def write(self, file_path: Path, content: bytes) -> int:
"""
Writes the file to the specified location
"""
...
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from pathlib import Path

from .file_writer import FileWriter


class LocalFileSystemFileWriter(FileWriter):
def write(self, file_path: Path, content: bytes) -> int:
"""
Writes the file to the specified location
"""
with open(str(file_path), "wb") as f:
f.write(content)

return file_path.stat().st_size
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from pathlib import Path

from .file_writer import FileWriter


class NoopFileWriter(FileWriter):
NOOP_FILE_SIZE = -1

def write(self, file_path: Path, content: bytes) -> int:
"""
Noop file writer
"""
return self.NOOP_FILE_SIZE
69 changes: 66 additions & 3 deletions unit_tests/sources/declarative/file/test_file_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,13 @@
from pathlib import Path
from typing import Any, Dict, List, Optional
from unittest import TestCase
from unittest.mock import Mock
from unittest.mock import Mock, patch

from airbyte_cdk.models import AirbyteStateMessage, ConfiguredAirbyteCatalog, Status
from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
ModelToComponentFactory as OriginalModelToComponentFactory,
)
from airbyte_cdk.sources.declarative.retrievers.file_uploader.noop_file_writer import NoopFileWriter
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
from airbyte_cdk.test.catalog_builder import CatalogBuilder, ConfiguredAirbyteStreamBuilder
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput
Expand Down Expand Up @@ -55,7 +59,11 @@ def read(
config = config_builder.build()
state = state_builder.build() if state_builder else StateBuilder().build()
return entrypoint_read(
_source(catalog, config, state, yaml_file), config, catalog, state, expecting_exception
_source(catalog, config, state, yaml_file),
config,
catalog,
state,
expecting_exception,
)


Expand Down Expand Up @@ -177,7 +185,7 @@ def test_get_article_attachments_with_filename_extractor(self) -> None:
yaml_file="test_file_stream_with_filename_extractor.yaml",
)

assert output.records
assert len(output.records) == 1
file_reference = output.records[0].record.file_reference
assert file_reference
assert (
Expand All @@ -190,6 +198,61 @@ def test_get_article_attachments_with_filename_extractor(self) -> None:
)
assert file_reference.file_size_bytes

def test_get_article_attachments_messages_for_connector_builder(self) -> None:
with HttpMocker() as http_mocker:
http_mocker.get(
HttpRequest(url=STREAM_URL),
HttpResponse(json.dumps(find_template("file_api/articles", __file__)), 200),
)
http_mocker.get(
HttpRequest(url=STREAM_ATTACHMENTS_URL),
HttpResponse(
json.dumps(find_template("file_api/article_attachments", __file__)), 200
),
)
http_mocker.get(
HttpRequest(url=STREAM_ATTACHMENT_CONTENT_URL),
HttpResponse(
find_binary_response("file_api/article_attachment_content.png", __file__), 200
),
)

# Define a mock factory that forces emit_connector_builder_messages=True
class MockModelToComponentFactory(OriginalModelToComponentFactory):
def __init__(self, *args, **kwargs):
kwargs["emit_connector_builder_messages"] = True
super().__init__(*args, **kwargs)

# Patch the factory class where ConcurrentDeclarativeSource (parent of YamlDeclarativeSource) imports it
with patch(
"airbyte_cdk.sources.declarative.concurrent_declarative_source.ModelToComponentFactory",
new=MockModelToComponentFactory,
):
output = read(
self._config(),
CatalogBuilder()
.with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments"))
.build(),
yaml_file="test_file_stream_with_filename_extractor.yaml",
)

assert len(output.records) == 1
file_reference = output.records[0].record.file_reference
assert file_reference
assert file_reference.staging_file_url
assert file_reference.source_file_relative_path
# because we didn't write the file, the size is NOOP_FILE_SIZE
assert file_reference.file_size_bytes == NoopFileWriter.NOOP_FILE_SIZE

# Assert file reference fields are copied to record data
record_data = output.records[0].record.data
assert record_data["staging_file_url"] == file_reference.staging_file_url
assert (
record_data["source_file_relative_path"]
== file_reference.source_file_relative_path
)
assert record_data["file_size_bytes"] == file_reference.file_size_bytes

def test_discover_article_attachments(self) -> None:
output = discover(self._config())

Expand Down
Loading