Replies: 8 comments
-
My idea was to implement with with a pluggable storage so that S3, and other storage can be used.
This is akin to how many CI/CD software (travisci, gitlabcicd) works, where the "artifacts" directory is automatically packed at the end of a job and unpacked at the start of the next job. The difference being that usually the storage is part of the CI/CD solution and here I'm proposing something similar to airflow's remote logging where the location of logs could be set to a remote location. |
Beta Was this translation helpful? Give feedback.
-
Will this be the default behaviour? If so, won't it be the excessive overhead leading to task start delay and network load? Or should we either explicitly state which artificats are required by the task? |
Beta Was this translation helpful? Give feedback.
-
You can already do this via an XComBackend: https://airflow.apache.org/blog/airflow-1.10.12/#allow-defining-custom-xcom-class There is also an example in https://medium.com/apache-airflow/airflow-2-0-dag-authoring-redesigned-651edc397178 import pandas as pd
from typing import Any
from airflow.models.xcom import BaseXCom
from airflow.providers.google.cloud.hooks.gcs import GCSHook
class GCSXComBackend(BaseXCom):
PREFIX = "xcom_gs://"
BUCKET_NAME = "airflow-xcom-backend"
@staticmethod
def serialize_value(value: Any):
if isinstance(value, pd.DataFrame):
hook = GCSHook()
object_name = "data_" + str(uuid.uuid4())
with hook.provide_file_and_upload(
bucket_name=GCSXComBackend.BUCKET_NAME,
object_name=object_name,
) as f:
value.to_csv(f.name)
# Append prefix to persist information that the file
# has to be downloaded from GCS
value = GCSXComBackend.PREFIX + object_name
return BaseXCom.serialize_value(value)
@staticmethod
def deserialize_value(result) -> Any:
result = BaseXCom.deserialize_value(result)
if isinstance(result, str) and result.startswith(GCSXComBackend.PREFIX):
object_name = result.replace(GCSXComBackend.PREFIX, "")
with GCSHook().provide_file(
bucket_name=GCSXComBackend.BUCKET_NAME,
object_name=object_name,
) as f:
f.flush()
result = pd.read_csv(f.name)
return result |
Beta Was this translation helpful? Give feedback.
-
I think it would be nice to have |
Beta Was this translation helpful? Give feedback.
-
There is some discussion around it here:
Would love to get your input on those @xinbinhuang |
Beta Was this translation helpful? Give feedback.
-
I must confess that I didn't know about Airflow 2.0: DAG Authoring Redesigned and that it has been eye opening. So I guess the approach in 2.x to pass data from one task/operator A to another task/operator B is:
So I don't see any problem with that,but from the links that @kaxil posted I believe there no interest for a generic implementation XComBackend for S3, etc to be maintained in the airflow project, because since tasks returns arbitrary python objects there is no generic efficient way to serialize them, for example if the task return a pandas's DataFrame you don't want to use pickle to serialize you want to use So this feature request probably should be close but I think there are some follow up feature request that we can further discuss:
so @kaxil where is the best place to continue the discussion, in the mailing list or here? |
Beta Was this translation helpful? Give feedback.
-
@ecerulm Mailing list is the best place |
Beta Was this translation helpful? Give feedback.
-
This is really part of several AIPs that have been raised in the past:
And those should be possibly picked up and lead to completion. There are also possible improvements to XCom backends possibly but also those shoudl be discussed in mailing list as Kaxil mentioned. Converting to discussion in case more discussion is needed. |
Beta Was this translation helpful? Give feedback.
-
Description
Provide shared storage between tasks.
Use case / motivation
Today we have XCom to share small pieces of information between task, but that is not suitable to share files.
Being able to share files to downstream task will simply new operators as you can create operators that produce files and operators that consume the files without having to implement a way to pass the files everytime.
Are you willing to submit a PR?
Yes,
Related Issues
Beta Was this translation helpful? Give feedback.
All reactions