-
Notifications
You must be signed in to change notification settings - Fork 103
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
S3 file reader support #32
base: master
Are you sure you want to change the base?
Conversation
data_integration/pipelines.py
Outdated
initial_node: Node = None | ||
final_node: Node = None | ||
initial_node: Task = None | ||
final_node: Task = None | ||
|
||
def __init__(self, id: str, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One can also add a pipeline or ParallelTask as initial / final node
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not really: there are places which expect a task, at least I had places where intelij complained that a method wasn't available
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed it in a different way
data_integration/pipelines.py
Outdated
self.initial_node = node | ||
for downstream in self.nodes.values(): | ||
if not downstream.upstreams and downstream != self.final_node: | ||
self.add_dependency(node, downstream) | ||
self.add(node) | ||
|
||
def add_final(self, node: Node) -> 'Pipeline': | ||
def add_final(self, node: Task) -> 'Pipeline': | ||
self.final_node = node |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed it a different way
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks very good otherwise. Please squash.
Let's wait with a release for the other PR
This reverts commit c39697d.
data_integration/commands/files.py
Outdated
class ReadS3File(_ReadFile): | ||
"""Reads data from a S3 file""" | ||
|
||
def __init__(self, s3_url: str, compression: Compression, target_table: str, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd think the parameter s3_url
should be called s3_uri
, according to the cp command. An URL is always an URI, but not all URIs are URLs. See as well wikipedia URL
# Conflicts: # mara_pipelines/commands/files.py
@jankatins is this running in production? |
@martin-loetzsch Nope, should also be integrated into https://github.com/mara/mara-storage where this looks much easier to do. |
Refactors the file reader and adds s3 file reader as an alternative to local file reads.
New commands:
data_integration.parallel_tasks.files.ParallelReadS3File
: reads in a whole bucketdata_integration.commands.files.ReadS3File
: reads a single file from S3From initial testing, this is a lot slower than sync + reading from a local file system (both iterating over the bucket to get the file list and the individual reads...) but then syncing that bucket to a local filesystem is also taking time... From my perspective this is only worth it if you have to do a "sync to local" every time (which we have to do, not volumns in our ETL container :-(), so the second run is then saving time compared to doing a sync + incremental read via file system. That's at least the theory, up to now I only tested locally.
The single file read will also come in handy as a replacement of google sheet imports.
WIP...