-
Notifications
You must be signed in to change notification settings - Fork 15
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PoC for file upload #433
base: main
Are you sure you want to change the base?
PoC for file upload #433
Conversation
/autofix
|
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
anyOf: | ||
- "$ref": "#/definitions/CustomRecordExtractor" | ||
- "$ref": "#/definitions/DpathExtractor" | ||
file_extractor: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I figured some API might return the file as a field in a JSON response for example but for now this would not be needed and we should remove it
self._content_extractor = content_extractor | ||
|
||
def upload(self, record: Record) -> None: | ||
# TODO validate record shape - is the transformation applied at this point? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There were a couple of discussions that makes me think this is needed:
- Adding a AirbyteFileRecordMessage to the protocol instead of relying on a implicit class that we only have in the CDK here
- Supporting canonical schemas
In order words, this TODO logic here would validate that we can create the AirbyteMessage that describes a file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has been outscoped for now and it seems like the record can have any shape now. I would probably add the attachment ID so that users can rely on a foreign key and the would be it. We might want to add a field on this component with jinja interpolation to let the user create the path
if self._content_extractor: | ||
raise NotImplementedError("TODO") | ||
else: | ||
with open(str(Path(__file__).parent / record.data["file_name"]), "ab") as f: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The path here would need to rely on the environment variable as such. We can extract this logic somewhere re-usable. There is still some questions to be answered in terms of path as it is not like a file based source i.e. that there is not necessarily a path defined by the API
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for the base path but we also need to consider that there is no file system in API sources and there might be clashes between streams or even records so we need to take a decision here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is for the base path but we also need to consider that there is no file system in API sources and there might be clashes between streams or even records so we need to take a decision here
Someething to consider for this one, I will update the description of this issue.
extractor: | ||
type: DpathExtractor | ||
field_path: ["article_attachments"] | ||
file_uploader: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added this to the stream level so that it can apply to any type of retrievers. That being said, I'm not sure asyncretrievers, statedelegatingretrievers or propertychunkingretrievers would make a lot of sense here...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I've identified the lower boundaries here: it needs to be at least at the end of the RecordSelector meaning. The reason is that if we have client side filtering, we only want to download the files if we know the record will get synced.
This could be an interesting middle to avoid having this only in the Concurrent CDK
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We will track it here.
"credentials": { | ||
"credentials": "api_token", | ||
"email": "[email protected]", | ||
"api_token": <redacted> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Set this to run the tests
@@ -206,6 +207,20 @@ def _group_streams( | |||
# these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible, | |||
# so we need to treat them as synchronous | |||
|
|||
file_uploader = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One of the downside of this solution is that we need to add things on the ConcurrentDeclarativeSource level because we don't yet build a DefaultStream. This is added debt that we will clean up when we remove the declarative cursors and move the stream creation back on the model_to_component_factory but the first part is only a stretch project in Q2a so I expect that this will live there for quite some time still
) | ||
if self._file_uploader: | ||
self._file_uploader.upload(record) | ||
yield record |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The file_uploader should probably set the universal file transfer information so that the destination can read it. This means that this will probably return enough information to create the AirbyteFileTransferRecordMessage
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should make the trick
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py
Show resolved
Hide resolved
Co-authored-by: Maxime Carbonneau-Leclerc <[email protected]> Co-authored-by: octavia-squidington-iii <[email protected]>
PoC for having file upload in API sources