Dynamic DAGs and Pandas to_sql behaviour #33872
Replies: 2 comments 2 replies
-
I have not used Pandas but it looks like you need to manually commit your data. Not sre what your "destination" field is in your CSV but Panda's docs is clear that if you are using SQLAlchemy connection engine the data will not be committed : https://pandas.pydata.org/docs/reference/api/pandas.DataFrame.to_sql.html
Likely whatever you pass there as "conn" parameter, results with a DB engine connection that is already "in-transaction" - maybe due to caching. (Wild guessing). What I'd do is to explicitly create the engine, get connection from it, pass it to the |
Beta Was this translation helpful? Give feedback.
-
@ziggekatten That is not related to your problem, but after you resolve current one, you might have new one for dyn_dag in json.loads(Variable.get('csv_data')) This is not recommended way for create Dynamic DAGs, those links might help you
|
Beta Was this translation helpful? Give feedback.
-
First of all, I'm quite new to Airflow, but have been using it for some time, and have built a few DAGs handling various stuff, as well as building an custom xcom backend.
Now, here is an scenario that baffels me. I have five sources that should be treated in te same way, thwe only thing differing is the name, and the end destination, in this case Postgres tables.
So a very simnple dynamic dag created with a for loop, generating five dags with the same id as the source.
Three tasks, extract, transform (add som columns), and load (pandas df.to_sql) into a separate table for each source
Everything comes up as expected, five dags generated and in paused mode.
triggering one DAG works as expected, data is extracted, transformed, and then pushed into the database table. So far so good.
But, if i put more that one of the DAGs in Active, only one of the DAGs actualy update the table(s). Everything looks ok in the logs, I can see database activity, and I started to scratch my head. After a few hours of scratching my head I looked at the database side, and I can se that only one of the DAGs actually commits, and the others do an rollback!
This made me try various things, like setting autocommit to true, etc, but to no avail.
I broke out the DAG int five, just removing the for loop, and they all work as they should when active and running.
Anyone have an idea what is going on? Running Airflow 2.6.3 in Docker.
Shorted down version of my DAG:
Beta Was this translation helpful? Give feedback.
All reactions