Allow using fresh interpreter besides fork() in Edge Worker#65943
Conversation
|
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide
|
|
That bug report sounds interesting and we run Edge Worker since more than a year in production - with more than one job in concurrency. Never had any of the reported problems so wondering why it hits in your environment. os.fork() is also being used in CeleryExecutor and LocalExecutor which are the other main work horses in Airflow since years. I need to admit I am not a Unix/Signalling/fork() expert but a bit courious how this problem appears in your env. The implementation in EdgeWorker was also "just" inherited by Celery and LocalExecutor. |
|
I thought a moment (but not final) about the PR. What wonders me a bit that you say there are 22 threads being started - Edge workers uses AsyncIO with tasks living in one thread in an event loop. There might be a background thread being started by plugins in the environment but wondering how you get to 22. Can you have more information on this? I would have expected 1 thread. Nevertheless the process spawn penalty has been seen much larger in my environments. So I#d not really favor in fully switching. Would like much rather a configuration option to define how to run a separate process. |
|
@jscheffl it just happened again in our prod environment. After Sunday some tasks started randomly failing on edge worker. The logs (anonymized): Both Full anonymized logsPod spec (anonymized)spec:
containers:
- name: edge-worker
image: internal-registry.example.net/monitoring/airflow:3.1.8
args: [edge, worker, '-q', stg__general__region]
env:
- name: AIRFLOW__CORE__PLUGINS_FOLDER
value: /opt/airflow/dags/repo/plugins
- name: AIRFLOW__EDGE__API_URL
value: https://airflow.stg.monitoring.example.com/edge_worker/v1/rpcapi
volumeMounts:
- mountPath: /opt/airflow/dags
name: dags
- name: git-sync
image: internal-registry.example.net/monitoring/git-sync:4.3.0
args: [--repo=..., --root=/dags, --link=repo, --period=60s]
volumeMounts:
- mountPath: /dags
name: dags
volumes:
- name: dags
emptyDir: {}This connects directly to your question about the thread count. The edge worker runs
You can verify from inside a running pod: import os
from collections import Counter
task_dir = '/proc/1/task'
tids = os.listdir(task_dir)
wchans = [open(f'{task_dir}/{tid}/wchan').read().strip() for tid in tids]
print(f'Total threads: {len(tids)}')
print(Counter(wchans).most_common())
# → [('futex_wait_queue_me', 20), ('ep_poll', 1), ...]When Are you seeing the Proposal: hook onto Rather than a hard switch, Airflow already has Happy to update the PR to implement it that way if you're on board. |
|
@diogosilva30 Converting to draft — this PR doesn't yet meet our Pull Request quality criteria.
See the linked criteria for how to fix each item, then mark the PR "Ready for review". This is not a rejection — just an invitation to bring the PR up to standard. No rush. Note: This comment was drafted by an AI-assisted triage tool and may contain mistakes. Once you have addressed the points above, an Apache Airflow maintainer — a real person — will take the next look at your PR. We use this two-stage triage process so that our maintainers' limited time is spent where it matters most: the conversation with you. |
Interesting, is it intended on your end to load plugins on the worker actually? Did not realize after adding anyio that this runs the IO calls just in a background thread. Thought this lib is just making async IO calls on OS level.
I still wonder about the case and that it is not happening on our side and heard no reports previously and code is productive since 12+months. Anyway I'd accept a switlcih like with this flag as it is also in CeleryExecutor as an optional switch/flag. But the Exception serialization which was originally removed should stay as this is uploaded back to task logs such that a user can see more details of the root of the failure in task logs. |
|
@jscheffl yes, this is intentional. We ship DAG factories and reusable operators as modules inside the Pattern overview:
"""Reusable DAG factory for fetching and exporting metrics."""
from datetime import timedelta
from airflow.sdk import task
@task
def fetch_data(source: str) -> list[dict]:
"""Fetch records from a data source."""
# ... implementation ...
return []
@task
def export_metrics(data: list[dict], conn_id: str) -> None:
"""Export metrics via an external connection."""
# ... implementation ...
def metrics_dag_definition(source: str, conn_id: str) -> None:
"""Wire up the DAG tasks."""
data = fetch_data(source=source)
export_metrics(data=data, conn_id=conn_id)
metrics_dag_kwargs = {
"schedule": timedelta(minutes=5),
"tags": ["metrics"],
}
"""DAG for exporting prod metrics."""
import functools
from common import create_dag
from common.operators.example_dag_factory import metrics_dag_definition, metrics_dag_kwargs
create_dag(
dag_name="prod_metrics",
dag_definition=functools.partial(
metrics_dag_definition,
source="prod",
conn_id="metrics_conn_prod",
),
dag_file=__file__,
**metrics_dag_kwargs,
)The DAG file itself is essentially a one-liner, all the task logic lives in the shared plugin module. Regarding the |
be9ce01 to
9987618
Compare
… in multi-threaded workers The edge worker process runs 22+ threads (asyncio event loop, ThreadPoolExecutor, HTTP clients). When `_launch_job()` used `multiprocessing.Process` (fork start method), `os.fork()` copied locked import locks from other threads into the child. Since only the forking thread survives, those locks are never released — causing permanent deadlocks on any subsequent import in the child process. A non-deadlock variant also occurs where the child inherits corrupted `sys.modules` state, causing `ModuleNotFoundError` cascades for all plugin and DAG imports. This commit replaces the `multiprocessing.Process` fork with `subprocess.Popen` launching a fresh Python interpreter via the existing `airflow.sdk.execution_time.execute_workload` CLI entrypoint. The `ExecuteTask` workload is already a Pydantic model with `model_dump_json()` — the same serialization path used by the ECS executor and the edge executor's own DB storage. Changes: - `worker.py`: Replace `_launch_job` to use `subprocess.Popen` with `execute_workload --json-string`. Remove `_run_job_via_supervisor`, `_reset_parent_signal_state`, `multiprocessing` imports, and the `results_queue` plumbing. - `dataclasses.py`: Change `Job.process` type from `multiprocessing.Process` to `subprocess.Popen`. Update `is_running` to use `poll()` and `is_success` to check `returncode`. - `test_worker.py`: Update mocks and assertions to match the new subprocess-based approach. Fixes: apache#65942
9987618 to
27bb264
Compare
d169be1 to
132e2bb
Compare
|
@diogosilva30 I have a question — how often does this error occur? Does the user's code change frequently? If I understand correctly, the issue is that among the multiple threads running in the edge worker, if a fork is performed while another thread (one not performing the fork) is in the middle of an import, it can cause problems in the import system. If that's the case, the problem would arise when a new module is being imported in another thread. Even with lazy loading, since the edge worker follows a fixed footprint, I'm curious whether new module loading happens frequently. Since a module that has already been imported once should no longer be a source of the problem, I would expect the frequency to gradually decrease over time. Applying the same approach that exists in Celery seems like a good idea. However, the trade-offs should be carefully understood. With airflow, simply loading the airflow module alone loads 100mb of libraries. The existing fork approach significantly reduces PSS through COW, but this approach causes memory to increase linearly with the number of concurrent executions. And slow loading is a bonus downside. below is checking the PSS usage when just As jens mentioned, the problem is clear enough that it could have been reported by now, so it's also a bit curious that it hasn't been. |
|
What do you think about changing the edge worker to handle task the same way as LocalExecutor? At initial startup, pre-create a persistent process (like fork pool) sized to concurrency and dispatch workloads through a queue. If we set up the pool before the asyncio loop starts, this issue shouldn't occur either. |
|
Hey, just checked our staging pods to answer this properly. On one of our workers today ( Under 2 seconds probably means the child never actually ran. It deadlocked on an import lock right at startup and got aborted. On the "gradually decreases" question, I see where you're coming from, but the problem here isn't plugin imports in user code. The warning fires on every single fork throughout the worker's lifetime because On memory, fair point, the numbers are real. That's exactly why we made it opt-in rather than changing the default. People who aren't hitting this keep the existing behaviour. |
@wjddn279 yeah, a pre-forked pool would sidestep the race (fork before asyncio means single-threaded children), but I'm not sure it's the right shape for the edge worker. My hesitation with a persistent pool:
What about stdlib already ships the right primitive here. One forkserver process is started at startup, before asyncio, stays single-threaded for the worker's lifetime, and is the only thing that ever calls Call-site diff is basically nothing: # before
p = Process(target=supervise, args=(job, child_conn))
p.start() # fork() from multi-threaded asyncio
# after
p = ctx.Process(target=supervise, args=(job, child_conn))
p.start() # fork() happens inside the forkserverThe real constraint is ordering: the forkserver context has to exist before any asyncio import or thread start. That's a small CLI entry point change, not a redesign. Memory at 5 concurrent tasks:
Near-COW memory cost, same safety as subprocess, without the per-task RSS hit you called out. One caveat: Potential order:
Happy to draft that follow-up if there's appetite for it. @jscheffl curious what you think. |
|
@diogosilva30
Fair point. I was only thinking about the import-lock case, but it sounds like the locking issues span much more broadly across threads than just imports.
After looking into how forkserver actually works, it looks like a better fit than a fork pool for a multi-threaded environment like the edge worker. There is no need to write many code to apply it. Spinning up the server before the async cycle starts and forking from there seems like the right shape.
The forked process doesn't do anything beyond running supervise_task. Any connections or fds it creates are explicitly closed. (There were a few leaks in practice, but I've fixed most of them by now.)
Same here — by design the worker doesn't import user code directly. All user code is loaded only inside the short-lived process forked by supervise_task.
Agreed. There's reference code we could lean on, but the diff is still substantial — you'd need a Queue for IPC and liveness checks layered on top.
This was actually my biggest concern with the pool approach. If a worker dies you have to restart it, but there's no guarantee the restarted process is lock-safe either. That's exactly where forkserver looks like the strong option |
|
@wjddn279 @diogosilva30 Thanks for all valid discussions. Really cool. So improvements in general welcome. @wjddn279 The optimization that was added to LocalExecutor is still on my (long) bucket list. I even left a comment in https://github.com/apache/airflow/blob/main/providers/edge3/src/airflow/providers/edge3/cli/worker.py#L460 to remind me whenever I see the code again. Feel free to raise a PR (to be faster than me)! In general I am mostly wondering why at @diogosilva30 this error is happeing often while in our environment we do not see this at all. Must be some environmental side effect. So for the moment I'd accept it as an option but as of memory overhead and time only optional. Main line should be kept as is until more errors reported. I am not sure if for many environments a pre-fork makes sense. It might be another (tuning) option. I assume the typical pattern for celery is different than for Edge... at least I think. I#d in general favor simplicity over complexity. And a pre-fork pool might add a level of complexity making it harder to maintain or adding bugs. My actual "dream" would be that we could make supervisor bing mostly async such that we do not need to fork() or spawn another process at all just to run another process... but just have a set of in-process supervisor instances which themself just fork the execution for the task. Because today we fork worker -> supervisor -> task execution. |
Remove the multiprocessing.Queue from the fork execution path and use a plain temp file for both fork and subprocess paths. Both paths now write failure text to a NamedTemporaryFile (Path stored as Job.stderr_file_path); the parent reads it after the child exits via Job.failure_details() and pushes the content to the task log via logs_push. Benefits over the Queue approach: - No risk of buffer deadlock (the Queue deadlock was the original issue) - Works identically for both fork and subprocess children - Simpler: no IPC setup, no draining loop, no Queue import - Error file is only created/filled on failure; task logs cover the success path Also extract _make_task_temp_file() helper to avoid duplicating the NamedTemporaryFile creation pattern across _launch_job_subprocess and _launch_job_fork.
132e2bb to
9de4110
Compare
|
@jscheffl applied all your suggestions! |
7a9fb22 to
c11a8ac
Compare
|
@jscheffl saw the test CI failures on compat tests. Should be good now ✅ |
wjddn279
left a comment
There was a problem hiding this comment.
Thanks for good investigation and suggestion! LGTM
I agree to apply forkserver needs to more ref of this issue and discuss!
|
Awesome work, congrats on your first merged pull request! You are invited to check our Issue Tracker for additional contributions. |
What
Update the Edge worker task launch path to honor Airflow's existing
[core] execute_tasks_new_python_interpreteroption.By default, Edge workers keep the existing fork-based behavior. When
execute_tasks_new_python_interpreter=Trueis configured, or whenos.forkis unavailable, the worker launches the task in a fresh Python interpreter withsubprocess.Popenand the existingairflow.sdk.execution_time.execute_workloadentrypoint.Fixes #65942
Why
Some Edge worker deployments can run task execution from a multi-threaded worker process. Forking a process after threads have started can inherit unsafe parent state, including import locks and partially initialized modules. In affected deployments this can show up as intermittent task-start failures, plugin import errors, or startup-reschedule exhaustion.
Airflow already has a core setting for this tradeoff:
execute_tasks_new_python_interpreter. Other execution paths can use it to choose a fresh interpreter instead of fork. This PR applies the same behavior to the Edge worker without changing the default for existing deployments.How
The change keeps both launch modes:
os.forkexists andexecute_tasks_new_python_interpreter=Falsemultiprocessing.Processand the existing supervisor helperexecute_tasks_new_python_interpreter=Trueor noos.forksubprocess.Popenwithpython -m airflow.sdk.execution_time.execute_workload --json-string ...The fresh-interpreter path also spools stderr to a temporary file instead of
stderr=PIPE. The worker only needs stderr after the subprocess exits, and a pipe can deadlock if the child writes enough data before the parent reads it. Spooling to a file avoids that while still preserving root failure details in task logs.Changes
providers/edge3/src/airflow/providers/edge3/cli/worker.pyself.conf.getboolean("core", "execute_tasks_new_python_interpreter"); track subprocess stderr temp files by PID; upload subprocess stderr details on failure; preserve fork result-queue handlingproviders/edge3/tests/unit/edge3/cli/test_worker.pyNotes
self.conf, not the global config object, so team-aware Edge worker configuration is respected.Testing
uv run ruff format providers/edge3/src/airflow/providers/edge3/cli/worker.py providers/edge3/tests/unit/edge3/cli/test_worker.pyuv run ruff check --fix providers/edge3/src/airflow/providers/edge3/cli/worker.py providers/edge3/tests/unit/edge3/cli/test_worker.pyuv run --project providers/edge3 pytest providers/edge3/tests/unit/edge3/cli/test_worker.py -xvs(68 passed)uv run prek run --files providers/edge3/src/airflow/providers/edge3/cli/worker.py providers/edge3/tests/unit/edge3/cli/test_worker.pyuv run --project providers/edge3 mypy providers/edge3/src/airflow/providers/edge3/cli/worker.py providers/edge3/tests/unit/edge3/cli/test_worker.pyWas generative AI tooling used to co-author this PR?
Generated-by: GitHub Copilot following the guidelines