Allow using fresh interpreter besides fork() in Edge Worker#65943
Allow using fresh interpreter besides fork() in Edge Worker#65943diogosilva30 wants to merge 13 commits intoapache:mainfrom
Conversation
|
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide
|
|
That bug report sounds interesting and we run Edge Worker since more than a year in production - with more than one job in concurrency. Never had any of the reported problems so wondering why it hits in your environment. os.fork() is also being used in CeleryExecutor and LocalExecutor which are the other main work horses in Airflow since years. I need to admit I am not a Unix/Signalling/fork() expert but a bit courious how this problem appears in your env. The implementation in EdgeWorker was also "just" inherited by Celery and LocalExecutor. |
|
I thought a moment (but not final) about the PR. What wonders me a bit that you say there are 22 threads being started - Edge workers uses AsyncIO with tasks living in one thread in an event loop. There might be a background thread being started by plugins in the environment but wondering how you get to 22. Can you have more information on this? I would have expected 1 thread. Nevertheless the process spawn penalty has been seen much larger in my environments. So I#d not really favor in fully switching. Would like much rather a configuration option to define how to run a separate process. |
|
@jscheffl it just happened again in our prod environment. After Sunday some tasks started randomly failing on edge worker. The logs (anonymized): Both Full anonymized logsPod spec (anonymized)spec:
containers:
- name: edge-worker
image: internal-registry.example.net/monitoring/airflow:3.1.8
args: [edge, worker, '-q', stg__general__region]
env:
- name: AIRFLOW__CORE__PLUGINS_FOLDER
value: /opt/airflow/dags/repo/plugins
- name: AIRFLOW__EDGE__API_URL
value: https://airflow.stg.monitoring.example.com/edge_worker/v1/rpcapi
volumeMounts:
- mountPath: /opt/airflow/dags
name: dags
- name: git-sync
image: internal-registry.example.net/monitoring/git-sync:4.3.0
args: [--repo=..., --root=/dags, --link=repo, --period=60s]
volumeMounts:
- mountPath: /dags
name: dags
volumes:
- name: dags
emptyDir: {}This connects directly to your question about the thread count. The edge worker runs
You can verify from inside a running pod: import os
from collections import Counter
task_dir = '/proc/1/task'
tids = os.listdir(task_dir)
wchans = [open(f'{task_dir}/{tid}/wchan').read().strip() for tid in tids]
print(f'Total threads: {len(tids)}')
print(Counter(wchans).most_common())
# → [('futex_wait_queue_me', 20), ('ep_poll', 1), ...]When Are you seeing the Proposal: hook onto Rather than a hard switch, Airflow already has Happy to update the PR to implement it that way if you're on board. |
|
@diogosilva30 Converting to draft — this PR doesn't yet meet our Pull Request quality criteria.
See the linked criteria for how to fix each item, then mark the PR "Ready for review". This is not a rejection — just an invitation to bring the PR up to standard. No rush. Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Once you have addressed the points above, an Apache Airflow maintainer — a real person — will take the next look at your PR. We use this two-stage triage process so that our maintainers' limited time is spent where it matters most: the conversation with you. |
Interesting, is it intended on your end to load plugins on the worker actually? Did not realize after adding anyio that this runs the IO calls just in a background thread. Thought this lib is just making async IO calls on OS level.
I still wonder about the case and that it is not happening on our side and heard no reports previously and code is productive since 12+months. Anyway I'd accept a switlcih like with this flag as it is also in CeleryExecutor as an optional switch/flag. But the Exception serialization which was originally removed should stay as this is uploaded back to task logs such that a user can see more details of the root of the failure in task logs. |
|
@jscheffl yes, this is intentional. We ship DAG factories and reusable operators as modules inside the Pattern overview:
"""Reusable DAG factory for fetching and exporting metrics."""
from datetime import timedelta
from airflow.sdk import task
@task
def fetch_data(source: str) -> list[dict]:
"""Fetch records from a data source."""
# ... implementation ...
return []
@task
def export_metrics(data: list[dict], conn_id: str) -> None:
"""Export metrics via an external connection."""
# ... implementation ...
def metrics_dag_definition(source: str, conn_id: str) -> None:
"""Wire up the DAG tasks."""
data = fetch_data(source=source)
export_metrics(data=data, conn_id=conn_id)
metrics_dag_kwargs = {
"schedule": timedelta(minutes=5),
"tags": ["metrics"],
}
"""DAG for exporting prod metrics."""
import functools
from common import create_dag
from common.operators.example_dag_factory import metrics_dag_definition, metrics_dag_kwargs
create_dag(
dag_name="prod_metrics",
dag_definition=functools.partial(
metrics_dag_definition,
source="prod",
conn_id="metrics_conn_prod",
),
dag_file=__file__,
**metrics_dag_kwargs,
)The DAG file itself is essentially a one-liner, all the task logic lives in the shared plugin module. Regarding the |
be9ce01 to
9987618
Compare
… in multi-threaded workers The edge worker process runs 22+ threads (asyncio event loop, ThreadPoolExecutor, HTTP clients). When `_launch_job()` used `multiprocessing.Process` (fork start method), `os.fork()` copied locked import locks from other threads into the child. Since only the forking thread survives, those locks are never released — causing permanent deadlocks on any subsequent import in the child process. A non-deadlock variant also occurs where the child inherits corrupted `sys.modules` state, causing `ModuleNotFoundError` cascades for all plugin and DAG imports. This commit replaces the `multiprocessing.Process` fork with `subprocess.Popen` launching a fresh Python interpreter via the existing `airflow.sdk.execution_time.execute_workload` CLI entrypoint. The `ExecuteTask` workload is already a Pydantic model with `model_dump_json()` — the same serialization path used by the ECS executor and the edge executor's own DB storage. Changes: - `worker.py`: Replace `_launch_job` to use `subprocess.Popen` with `execute_workload --json-string`. Remove `_run_job_via_supervisor`, `_reset_parent_signal_state`, `multiprocessing` imports, and the `results_queue` plumbing. - `dataclasses.py`: Change `Job.process` type from `multiprocessing.Process` to `subprocess.Popen`. Update `is_running` to use `poll()` and `is_success` to check `returncode`. - `test_worker.py`: Update mocks and assertions to match the new subprocess-based approach. Fixes: apache#65942
9987618 to
27bb264
Compare
|
Note: Changed title as we are not using semantic commits. |
| ], | ||
| env=env, | ||
| start_new_session=True, | ||
| stderr=stderr_file, |
There was a problem hiding this comment.
Why not redirecting stderr to the normal logger/stdout?
There was a problem hiding this comment.
Good question. Used a temp file because stderr is the only parent-visible diagnostic channel for the fresh-interpreter path, and we want those diagnostics attached to the task that failed.
In the fork path, the child can return an exception object through the multiprocessing result queue. In the subprocess path, the child is a separate Python interpreter running execute_workload, so it cannot send that Python exception object back to the Edge worker. If something fails early, especially during workload parsing, supervisor startup, plugin import, or Dag import, stderr is what preserves the traceback.
We could pass sys.__stderr__ like Celery does, but then output from all concurrently running task subprocesses would share the Edge worker’s stderr. That means a traceback could end up only in the worker/container log, potentially interleaved with other task subprocesses and worker logs, and not attached to the failed task’s log.
The temp file is a per-task spool: it avoids subprocess.PIPE (which can deadlock if the parent does not continuously drain it), keeps stderr attributable to the specific task subprocess, and lets us push those startup diagnostics into the task log via logs_push after the process exits.
There was a problem hiding this comment.
Okay, sounds reasonable.
Still the STDERR then could be sent to the message queue? Just as plain text? The Edge Worker checks for Exception but otherwise should also be able to accept Test as String (like "OK" is being sent?)
There was a problem hiding this comment.
The Queue approach works in the fork path because the child inherits the multiprocessing state, including the Queue itself.
With subprocess.Popen(...) we start a completely fresh Python interpreter, so there is no shared Queue unless we build a separate IPC layer (pipe/socket/fd passing/etc).
We could do that, but it adds quite a bit of complexity compared to the current tempfile approach. The tempfile also avoids PIPE deadlocks and still captures early bootstrap/import failures before any IPC channel would be initialized.
There was a problem hiding this comment.
Having a sleep over and seeing the code... I do not actually want to insist on the queue :-D I just mainly want to pass error details back from supervisor if somethings failed into task logs. So the "text" content should be passed-over.
For me it would also be okay to step away from the Queue in general and transport the error details via a text file in both branches. Then we have one technical backend for both execution options. Main part I want to achieve is to have "text" transferred to instead of passing the exception to queue the test can also be written to file and picked-up. That would make it leaner?
(Including if all is OK we do not need to pass "OK" text, we just use the file for passing any error text?)
|
@jscheffl made the requested changes to move some branching logic to Also noticed that in fork path we are using deprecated |
There are two PRs open (in parallel) to address this -> #65847 + #63498 |
What
Update the Edge worker task launch path to honor Airflow's existing
[core] execute_tasks_new_python_interpreteroption.By default, Edge workers keep the existing fork-based behavior. When
execute_tasks_new_python_interpreter=Trueis configured, or whenos.forkis unavailable, the worker launches the task in a fresh Python interpreter withsubprocess.Popenand the existingairflow.sdk.execution_time.execute_workloadentrypoint.Fixes #65942
Why
Some Edge worker deployments can run task execution from a multi-threaded worker process. Forking a process after threads have started can inherit unsafe parent state, including import locks and partially initialized modules. In affected deployments this can show up as intermittent task-start failures, plugin import errors, or startup-reschedule exhaustion.
Airflow already has a core setting for this tradeoff:
execute_tasks_new_python_interpreter. Other execution paths can use it to choose a fresh interpreter instead of fork. This PR applies the same behavior to the Edge worker without changing the default for existing deployments.How
The change keeps both launch modes:
os.forkexists andexecute_tasks_new_python_interpreter=Falsemultiprocessing.Processand the existing supervisor helperexecute_tasks_new_python_interpreter=Trueor noos.forksubprocess.Popenwithpython -m airflow.sdk.execution_time.execute_workload --json-string ...The fresh-interpreter path also spools stderr to a temporary file instead of
stderr=PIPE. The worker only needs stderr after the subprocess exits, and a pipe can deadlock if the child writes enough data before the parent reads it. Spooling to a file avoids that while still preserving root failure details in task logs.Changes
providers/edge3/src/airflow/providers/edge3/cli/worker.pyself.conf.getboolean("core", "execute_tasks_new_python_interpreter"); track subprocess stderr temp files by PID; upload subprocess stderr details on failure; preserve fork result-queue handlingproviders/edge3/tests/unit/edge3/cli/test_worker.pyNotes
self.conf, not the global config object, so team-aware Edge worker configuration is respected.Testing
uv run ruff format providers/edge3/src/airflow/providers/edge3/cli/worker.py providers/edge3/tests/unit/edge3/cli/test_worker.pyuv run ruff check --fix providers/edge3/src/airflow/providers/edge3/cli/worker.py providers/edge3/tests/unit/edge3/cli/test_worker.pyuv run --project providers/edge3 pytest providers/edge3/tests/unit/edge3/cli/test_worker.py -xvs(68 passed)uv run prek run --files providers/edge3/src/airflow/providers/edge3/cli/worker.py providers/edge3/tests/unit/edge3/cli/test_worker.pyuv run --project providers/edge3 mypy providers/edge3/src/airflow/providers/edge3/cli/worker.py providers/edge3/tests/unit/edge3/cli/test_worker.pyWas generative AI tooling used to co-author this PR?
Generated-by: GitHub Copilot following the guidelines