From 37c41a24b9b4acd8bd75dded19d4be9b42940aab Mon Sep 17 00:00:00 2001 From: Jed Cunningham Date: Fri, 10 Jan 2025 09:02:09 -0700 Subject: [PATCH] less invasive fix --- airflow/dag_processing/processor.py | 5 +++++ task_sdk/src/airflow/sdk/execution_time/supervisor.py | 4 ---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/airflow/dag_processing/processor.py b/airflow/dag_processing/processor.py index 8d48b5ab6aeb3f..c99894b50aa479 100644 --- a/airflow/dag_processing/processor.py +++ b/airflow/dag_processing/processor.py @@ -50,8 +50,13 @@ def _parse_file_entrypoint(): import structlog from airflow.sdk.execution_time import task_runner + from airflow.settings import configure_orm # Parse DAG file, send JSON back up! + # We need to reconfigure the orm here, as DagFileProcessorManager does db queries for bundles, and + # the session across forks blows things up. + configure_orm() + comms_decoder = task_runner.CommsDecoder[DagFileParseRequest, DagFileParsingResult]( input=sys.stdin, decoder=TypeAdapter[DagFileParseRequest](DagFileParseRequest), diff --git a/task_sdk/src/airflow/sdk/execution_time/supervisor.py b/task_sdk/src/airflow/sdk/execution_time/supervisor.py index 9f7424727d5d2e..c9cec771ea0811 100644 --- a/task_sdk/src/airflow/sdk/execution_time/supervisor.py +++ b/task_sdk/src/airflow/sdk/execution_time/supervisor.py @@ -254,10 +254,6 @@ def exit(n: int) -> NoReturn: atexit._run_exitfuncs() base_exit(n) - from airflow import settings - - settings.configure_orm() - try: target() exit(0)