Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PoC for file upload #433

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airbyte_cdk/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
AirbyteMessage,
AirbyteProtocol,
AirbyteRecordMessage,
AirbyteRecordMessageFileReference,
AirbyteStateBlob,
AirbyteStateMessage,
AirbyteStateStats,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ def on_record(self, record: Record) -> Iterable[AirbyteMessage]:
stream_name=record.stream_name,
data_or_message=record.data,
is_file_transfer_message=record.is_file_transfer_message,
file_reference=record.file_reference,
)
stream = self._stream_name_to_instance[record.stream_name]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
PerPartitionWithGlobalCursor,
)
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.sources.declarative.models import FileUploader
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ConcurrencyLevel as ConcurrencyLevelModel,
)
Expand Down Expand Up @@ -206,6 +207,10 @@ def _group_streams(
# these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible,
# so we need to treat them as synchronous

supports_file_transfer = (
"file_uploader" in name_to_stream_mapping[declarative_stream.name]
)

if (
isinstance(declarative_stream, DeclarativeStream)
and name_to_stream_mapping[declarative_stream.name]["type"]
Expand Down Expand Up @@ -322,6 +327,7 @@ def _group_streams(
else None,
logger=self.logger,
cursor=cursor,
supports_file_transfer=supports_file_transfer,
)
)
elif (
Expand Down Expand Up @@ -353,6 +359,7 @@ def _group_streams(
cursor_field=None,
logger=self.logger,
cursor=final_state_cursor,
supports_file_transfer=supports_file_transfer,
)
)
elif (
Expand Down Expand Up @@ -406,6 +413,7 @@ def _group_streams(
cursor_field=perpartition_cursor.cursor_field.cursor_field_key,
logger=self.logger,
cursor=perpartition_cursor,
supports_file_transfer=supports_file_transfer,
)
)
else:
Expand Down
36 changes: 36 additions & 0 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -1422,6 +1422,42 @@ definitions:
- "$ref": "#/definitions/LegacyToPerPartitionStateMigration"
- "$ref": "#/definitions/CustomStateMigration"
default: []
file_uploader:
title: File Uploader
description: (experimental) Describes how to fetch a file
type: object
required:
- type
- requester
- download_target_extractor
properties:
type:
type: string
enum: [ FileUploader ]
requester:
description: Requester component that describes how to prepare HTTP requests to send to the source API.
anyOf:
- "$ref": "#/definitions/CustomRequester"
- "$ref": "#/definitions/HttpRequester"
download_target_extractor:
description: Responsible for fetching the url where the file is located. This is applied on each records and not on the HTTP response
anyOf:
- "$ref": "#/definitions/CustomRecordExtractor"
- "$ref": "#/definitions/DpathExtractor"
file_extractor:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I figured some API might return the file as a field in a JSON response for example but for now this would not be needed and we should remove it

description: Responsible for fetching the content of the file. If not defined, the assumption is that the whole response body is the file content
anyOf:
- "$ref": "#/definitions/CustomRecordExtractor"
- "$ref": "#/definitions/DpathExtractor"
filename_extractor:
description: Defines the name to store the file. Stream name is automatically added to the file path. File unique ID can be used to avoid overwriting files. Random UUID will be used if the extractor is not provided.
type: string
interpolation_context:
- config
- record
examples:
- "{{ record.id }}/{{ record.file_name }}/"
- "{{ record.id }}_{{ record.file_name }}/"
$parameters:
type: object
additional_properties: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
)
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
from airbyte_cdk.sources.declarative.models import SchemaNormalization
from airbyte_cdk.sources.declarative.retrievers.file_uploader import FileUploader
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
from airbyte_cdk.sources.utils.transform import TypeTransformer
Expand Down Expand Up @@ -42,6 +43,7 @@ class RecordSelector(HttpSelector):
record_filter: Optional[RecordFilter] = None
transformations: List[RecordTransformation] = field(default_factory=lambda: [])
transform_before_filtering: bool = False
file_uploader: Optional[FileUploader] = None

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._parameters = parameters
Expand Down Expand Up @@ -117,7 +119,10 @@ def filter_and_transform(
transformed_filtered_data, schema=records_schema
)
for data in normalized_data:
yield Record(data=data, stream_name=self.name, associated_slice=stream_slice)
record = Record(data=data, stream_name=self.name, associated_slice=stream_slice)
if self.file_uploader:
self.file_uploader.upload(record)
yield record

def _normalize_by_schema(
self, records: Iterable[Mapping[str, Any]], schema: Optional[Mapping[str, Any]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1989,6 +1989,31 @@ class Config:
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class FileUploader(BaseModel):
type: Literal["FileUploader"]
requester: Union[CustomRequester, HttpRequester] = Field(
...,
description="Requester component that describes how to prepare HTTP requests to send to the source API.",
)
download_target_extractor: Union[CustomRecordExtractor, DpathExtractor] = Field(
...,
description="Responsible for fetching the url where the file is located. This is applied on each records and not on the HTTP response",
)
file_extractor: Optional[Union[CustomRecordExtractor, DpathExtractor]] = Field(
None,
description="Responsible for fetching the content of the file. If not defined, the assumption is that the whole response body is the file content",
)
filename_extractor: Optional[str] = Field(
None,
description="Defines the name to store the file. Stream name is automatically added to the file path. File unique ID can be used to avoid overwriting files. Random UUID will be used if the extractor is not provided.",
examples=[
"{{ record.id }}/{{ record.file_name }}/",
"{{ record.id }}_{{ record.file_name }}/",
],
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class DeclarativeStream(BaseModel):
class Config:
extra = Extra.allow
Expand Down Expand Up @@ -2047,6 +2072,11 @@ class Config:
description="Array of state migrations to be applied on the input state",
title="State Migrations",
)
file_uploader: Optional[FileUploader] = Field(
None,
description="(experimental) Describes how to fetch a file",
title="File Uploader",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand Down Expand Up @@ -2464,6 +2494,7 @@ class DynamicDeclarativeStream(BaseModel):
DeclarativeSource1.update_forward_refs()
DeclarativeSource2.update_forward_refs()
SelectiveAuthenticator.update_forward_refs()
FileUploader.update_forward_refs()
DeclarativeStream.update_forward_refs()
SessionTokenAuthenticator.update_forward_refs()
DynamicSchemaLoader.update_forward_refs()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,6 @@
)
from airbyte_cdk.sources.declarative.models import (
CustomStateMigration,
GzipDecoder,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
AddedFieldDefinition as AddedFieldDefinitionModel,
Expand Down Expand Up @@ -221,6 +220,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
ExponentialBackoffStrategy as ExponentialBackoffStrategyModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
FileUploader as FileUploaderModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
FixedWindowCallRatePolicy as FixedWindowCallRatePolicyModel,
)
Expand Down Expand Up @@ -442,6 +444,7 @@
SimpleRetriever,
SimpleRetrieverTestReadDecorator,
)
from airbyte_cdk.sources.declarative.retrievers.file_uploader import FileUploader
from airbyte_cdk.sources.declarative.schema import (
ComplexFieldType,
DefaultSchemaLoader,
Expand Down Expand Up @@ -633,6 +636,7 @@ def _init_mappings(self) -> None:
ComponentMappingDefinitionModel: self.create_components_mapping_definition,
ZipfileDecoderModel: self.create_zipfile_decoder,
HTTPAPIBudgetModel: self.create_http_api_budget,
FileUploaderModel: self.create_file_uploader,
FixedWindowCallRatePolicyModel: self.create_fixed_window_call_rate_policy,
MovingWindowCallRatePolicyModel: self.create_moving_window_call_rate_policy,
UnlimitedCallRatePolicyModel: self.create_unlimited_call_rate_policy,
Expand Down Expand Up @@ -1751,6 +1755,11 @@ def create_declarative_stream(
transformations.append(
self._create_component_from_model(model=transformation_model, config=config)
)
file_uploader = None
if model.file_uploader:
file_uploader = self._create_component_from_model(
model=model.file_uploader, config=config
)

retriever = self._create_component_from_model(
model=model.retriever,
Expand All @@ -1762,6 +1771,7 @@ def create_declarative_stream(
stop_condition_on_cursor=stop_condition_on_cursor,
client_side_incremental_sync=client_side_incremental_sync,
transformations=transformations,
file_uploader=file_uploader,
incremental_sync=model.incremental_sync,
)
cursor_field = model.incremental_sync.cursor_field if model.incremental_sync else None
Expand Down Expand Up @@ -2603,6 +2613,7 @@ def create_record_selector(
transformations: List[RecordTransformation] | None = None,
decoder: Decoder | None = None,
client_side_incremental_sync: Dict[str, Any] | None = None,
file_uploader: Optional[FileUploader] = None,
**kwargs: Any,
) -> RecordSelector:
extractor = self._create_component_from_model(
Expand Down Expand Up @@ -2640,6 +2651,7 @@ def create_record_selector(
config=config,
record_filter=record_filter,
transformations=transformations or [],
file_uploader=file_uploader,
schema_normalization=schema_normalization,
parameters=model.parameters or {},
transform_before_filtering=transform_before_filtering,
Expand Down Expand Up @@ -2697,6 +2709,7 @@ def create_simple_retriever(
stop_condition_on_cursor: bool = False,
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
transformations: List[RecordTransformation],
file_uploader: Optional[FileUploader] = None,
incremental_sync: Optional[
Union[
IncrementingCountCursorModel, DatetimeBasedCursorModel, CustomIncrementalSyncModel
Expand All @@ -2719,6 +2732,7 @@ def create_simple_retriever(
decoder=decoder,
transformations=transformations,
client_side_incremental_sync=client_side_incremental_sync,
file_uploader=file_uploader,
)
url_base = (
model.requester.url_base
Expand Down Expand Up @@ -3318,6 +3332,30 @@ def create_fixed_window_call_rate_policy(
matchers=matchers,
)

def create_file_uploader(
self, model: FileUploaderModel, config: Config, **kwargs: Any
) -> FileUploader:
name = "File Uploader"
requester = self._create_component_from_model(
model=model.requester,
config=config,
name=name,
**kwargs,
)
download_target_extractor = self._create_component_from_model(
model=model.download_target_extractor,
config=config,
name=name,
**kwargs,
)
return FileUploader(
requester=requester,
download_target_extractor=download_target_extractor,
config=config,
parameters=model.parameters or {},
filename_extractor=model.filename_extractor if model.filename_extractor else None,
)

def create_moving_window_call_rate_policy(
self, model: MovingWindowCallRatePolicyModel, config: Config, **kwargs: Any
) -> MovingWindowCallRatePolicy:
Expand Down
89 changes: 89 additions & 0 deletions airbyte_cdk/sources/declarative/retrievers/file_uploader.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

import json
import logging
import uuid
from dataclasses import InitVar, dataclass, field
from pathlib import Path
from typing import Any, Mapping, Optional, Union

from airbyte_cdk.models import AirbyteRecordMessageFileReference
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import (
InterpolatedString,
)
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import (
SafeResponse,
)
from airbyte_cdk.sources.declarative.requesters import Requester
from airbyte_cdk.sources.declarative.types import Record, StreamSlice
from airbyte_cdk.sources.types import Config
from airbyte_cdk.sources.utils.files_directory import get_files_directory

logger = logging.getLogger("airbyte")


@dataclass
class FileUploader:
requester: Requester
download_target_extractor: RecordExtractor
config: Config
parameters: InitVar[Mapping[str, Any]]

filename_extractor: Optional[Union[InterpolatedString, str]] = None
content_extractor: Optional[RecordExtractor] = None

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
if self.filename_extractor:
self.filename_extractor = InterpolatedString.create(
self.filename_extractor,
parameters=parameters,
)

def upload(self, record: Record) -> None:
mocked_response = SafeResponse()
mocked_response.content = json.dumps(record.data).encode()
download_target = list(self.download_target_extractor.extract_records(mocked_response))[0]
if not isinstance(download_target, str):
raise ValueError(
f"download_target is expected to be a str but was {type(download_target)}: {download_target}"
)

response = self.requester.send_request(
stream_slice=StreamSlice(
partition={}, cursor_slice={}, extra_fields={"download_target": download_target}
),
)

if self.content_extractor:
raise NotImplementedError("TODO")
else:
files_directory = Path(get_files_directory())

file_name = (
self.filename_extractor.eval(self.config, record=record)
if self.filename_extractor
else str(uuid.uuid4())
)
file_name = file_name.lstrip("/")
file_relative_path = Path(record.stream_name) / Path(file_name)

full_path = files_directory / file_relative_path
full_path.parent.mkdir(parents=True, exist_ok=True)

with open(str(full_path), "wb") as f:
f.write(response.content)
file_size_bytes = full_path.stat().st_size

logger.info("File uploaded successfully")
logger.info(f"File url: {str(full_path)}")
logger.info(f"File size: {file_size_bytes / 1024} KB")
logger.info(f"File relative path: {str(file_relative_path)}")

record.file_reference = AirbyteRecordMessageFileReference(
file_url=str(full_path),
file_relative_path=str(file_relative_path),
file_size_bytes=file_size_bytes,
)
Loading
Loading