Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
27bb264
fix(edge3): replace fork() with subprocess.Popen to prevent deadlocks…
diogosilva30 Apr 27, 2026
fbaa062
Fix test_worker.py to use multiprocessing.Process instead of subproce…
diogosilva30 May 6, 2026
b8a7ab3
Honor fresh interpreter mode in Edge worker
diogosilva30 May 6, 2026
7378a39
Clarify Edge worker task process handling
diogosilva30 May 6, 2026
1cf3899
Merge branch 'main' into fix/edge3-fork-deadlock-subprocess
diogosilva30 May 6, 2026
72a25a1
Merge branch 'main' into fix/edge3-fork-deadlock-subprocess
diogosilva30 May 7, 2026
dcb4f2b
Improve Edge worker subprocess failure handling
diogosilva30 May 7, 2026
617b713
Merge branch 'main' into fix/edge3-fork-deadlock-subprocess
diogosilva30 May 7, 2026
3527b7d
Merge branch 'main' into fix/edge3-fork-deadlock-subprocess
diogosilva30 May 7, 2026
29e017c
Merge branch 'main' into fix/edge3-fork-deadlock-subprocess
diogosilva30 May 8, 2026
0bceddf
fix: rollback supervise changes & fix tests related to display_name
diogosilva30 May 8, 2026
2ca14c9
Merge branch 'main' into fix/edge3-fork-deadlock-subprocess
diogosilva30 May 8, 2026
cff85d2
Merge branch 'main' into fix/edge3-fork-deadlock-subprocess
diogosilva30 May 8, 2026
d0a7550
Merge branch 'main' into fix/edge3-fork-deadlock-subprocess
diogosilva30 May 11, 2026
9de4110
Unify error transport for fork and subprocess paths via temp file
diogosilva30 May 11, 2026
c11a8ac
Merge branch 'main' into fix/edge3-fork-deadlock-subprocess
diogosilva30 May 14, 2026
e78bf72
Merge branch 'main' into fix/edge3-fork-deadlock-subprocess
diogosilva30 May 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 52 additions & 1 deletion providers/edge3/src/airflow/providers/edge3/cli/dataclasses.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@
from __future__ import annotations

import json
import subprocess
import traceback
from dataclasses import asdict, dataclass
from multiprocessing import Process
from pathlib import Path
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from multiprocessing.queues import Queue

from airflow.providers.edge3.models.edge_worker import EdgeWorkerState
from airflow.providers.edge3.worker_api.datamodels import EdgeJobFetched

Expand Down Expand Up @@ -72,17 +76,64 @@ class Job:
"""Holds all information for a task/job to be executed as bundle."""

edge_job: EdgeJobFetched
process: Process
# Process can be either a subprocess.Popen (for the spawn path) or a
# multiprocessing.Process (for the fork path)
process: subprocess.Popen | Process
Comment thread
diogosilva30 marked this conversation as resolved.
Outdated
logfile: Path
logsize: int = 0
"""Last size of log file, point of last chunk push."""
results_queue: Queue | None = None
"""Queue for child process to push results to parent, if using fork-based execution model."""
stderr_file_path: Path | None = None
"""Path to file where stderr is being redirected, if using spawn-based execution model."""

@property
def is_running(self) -> bool:
"""Check if the job is still running."""
if isinstance(self.process, subprocess.Popen):
return self.process.poll() is None
return self.process.is_alive()

@property
def is_success(self) -> bool:
"""Check if the job was successful."""
if isinstance(self.process, subprocess.Popen):
return self.process.returncode == 0
return self.process.exitcode == 0

@property
def should_poll_logs(self) -> bool:
"""Check if logs should be pushed while waiting for job completion."""
# Fork path: keep pushing logs while the child is running and has not sent a result yet.
# Subprocess path: keep pushing logs while the child is running; status comes from Popen.
if not self.is_running:
return False
return self.results_queue is None or self.results_queue.empty()

def drain_result(self) -> object | None:
"""Read the child result if the execution model provides one."""
if self.results_queue is None or self.results_queue.empty():
return None
return self.results_queue.get()

def failure_details(self, result: object | None) -> str:
"""Format execution-model-specific failure details."""
if isinstance(self.process, subprocess.Popen):
stderr_output = ""
if self.stderr_file_path:
stderr_output = self.stderr_file_path.read_bytes().decode(errors="backslashreplace").strip()
ex_txt = f"Task subprocess exited with code {self.process.returncode}"
if stderr_output:
ex_txt = f"{ex_txt}\n{stderr_output}"
return ex_txt

return (
"\n".join(traceback.format_exception(result))
if isinstance(result, Exception)
else "(Unknown error, no exception details available)"
)

def cleanup(self) -> None:
"""Remove transient files owned by this job."""
if self.stderr_file_path:
self.stderr_file_path.unlink(missing_ok=True)
103 changes: 84 additions & 19 deletions providers/edge3/src/airflow/providers/edge3/cli/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
import logging
import os
import signal
import subprocess
import sys
import tempfile
import time
import traceback
from asyncio import Task, create_task, gather, get_running_loop, sleep
from collections.abc import Awaitable, Callable
from contextlib import suppress
Expand Down Expand Up @@ -66,6 +67,7 @@
if TYPE_CHECKING:
from airflow.configuration import AirflowConfigParser
from airflow.executors.workloads import ExecuteTask
from airflow.providers.edge3.worker_api.datamodels import EdgeJobFetched

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -414,6 +416,7 @@ def _get_state(self) -> EdgeWorkerState:
return EdgeWorkerState.IDLE

def _run_job_via_supervisor(self, workload: ExecuteTask, results_queue: Queue) -> int:
"""Run a task by calling the supervisor directly (executes inside a forked child process)."""
_reset_parent_signal_state()

from airflow.sdk.execution_time.supervisor import supervise
Expand Down Expand Up @@ -447,17 +450,80 @@ def _run_job_via_supervisor(self, workload: ExecuteTask, results_queue: Queue) -
results_queue.put(e)
return 1

def _launch_job(self, workload: ExecuteTask) -> tuple[Process, Queue[Exception]]:
def _launch_job_subprocess(self, workload: ExecuteTask) -> tuple[subprocess.Popen, Path]:
"""Launch workload via a fresh Python interpreter (subprocess.Popen)."""
env = os.environ.copy()
if self._execution_api_server_url:
env["AIRFLOW__CORE__EXECUTION_API_SERVER_URL"] = self._execution_api_server_url

# Keep stderr off a PIPE: the worker only inspects stderr after the task finishes,
# so a verbose child could otherwise fill the pipe buffer and block forever. Also keep
# it task-scoped instead of inheriting the worker's stderr/stdout; supervisor startup
# failures should be pushed to the task log, not only the worker/container log.
with tempfile.NamedTemporaryFile(
prefix="airflow-edge-task-stderr-", suffix=".log", delete=False
) as stderr_file:
stderr_file_path = Path(stderr_file.name)
try:
process = subprocess.Popen(
[
sys.executable,
"-m",
"airflow.sdk.execution_time.execute_workload",
"--json-string",
workload.model_dump_json(),
],
env=env,
start_new_session=True,
stderr=stderr_file,
Comment thread
jscheffl marked this conversation as resolved.
Outdated
)
except Exception:
stderr_file_path.unlink(missing_ok=True)
raise
logger.info(
"Launched task subprocess pid=%d for %s",
process.pid,
workload.ti.id,
)
return process, stderr_file_path

def _launch_job_fork(self, workload: ExecuteTask) -> tuple[Process, Queue]:
"""Launch workload by forking the current process (multiprocessing.Process)."""
# Improvement: Use frozen GC to prevent child process from copying unnecessary memory
# See _spawn_workers_with_gc_freeze() in airflow-core/src/airflow/executors/local_executor.py
results_queue: Queue[Exception] = Queue()
results_queue: Queue = Queue()
process = Process(
target=self._run_job_via_supervisor,
kwargs={"workload": workload, "results_queue": results_queue},
)
process.start()
logger.info("Launched task fork pid=%d for %s", process.pid, workload.ti.id)
return process, results_queue

def _launch_job(self, edge_job: EdgeJobFetched, workload: ExecuteTask, logfile: Path) -> Job:
"""
Launch a task process.

Uses ``subprocess.Popen`` (fresh interpreter) when
``core.execute_tasks_new_python_interpreter`` is ``True`` or when
``os.fork`` is unavailable (e.g. Windows). Falls back to
``multiprocessing.Process`` (fork) otherwise — preserving the
original behaviour for existing deployments.
"""
use_new_interpreter = not hasattr(os, "fork") or self.conf.getboolean(
"core",
"execute_tasks_new_python_interpreter",
fallback=False,
)
if use_new_interpreter:
# Fresh subprocess path: spawn a new Python interpreter; no shared memory with parent
# Technically safer and more robust, but with more overhead
subprocess_process, stderr_file_path = self._launch_job_subprocess(workload)
return Job(edge_job, subprocess_process, logfile, stderr_file_path=stderr_file_path)
# Fork path: clone the current process; child inherits parent memory
fork_process, results_queue = self._launch_job_fork(workload)
return Job(edge_job, fork_process, logfile, results_queue=results_queue)

async def _push_logs_in_chunks(self, job: Job):
aio_logfile = anyio.Path(job.logfile)
if self.push_logs and await aio_logfile.exists() and (await aio_logfile.stat()).st_size > job.logsize:
Expand Down Expand Up @@ -581,11 +647,10 @@ async def fetch_and_run_job(self) -> None:
logger.info("Received job: %s", edge_job.identifier)

workload: ExecuteTask = edge_job.command
process, results_queue = self._launch_job(workload)
if TYPE_CHECKING:
assert workload.log_path # We need to assume this is defined in here
logfile = Path(self.base_log_folder, workload.log_path)
job = Job(edge_job, process, logfile)
job = self._launch_job(edge_job, workload, logfile)
self.jobs.append(job)
await jobs_set_state(edge_job.key, TaskInstanceState.RUNNING)

Expand All @@ -595,37 +660,37 @@ async def fetch_and_run_job(self) -> None:
self.background_tasks.add(task)
task.add_done_callback(self.background_tasks.discard)

while job.is_running and results_queue.empty():
while job.should_poll_logs:
await self._push_logs_in_chunks(job)
for _ in range(0, self.job_poll_interval * 10):
await sleep(0.1)
if not job.is_running:
break
await self._push_logs_in_chunks(job)
supervisor_msg = (
"(Unknown error, no exception details available)"
if results_queue.empty()
else results_queue.get()
)
# Ensure that supervisor really ended after we grabbed results from queue
while True:
if not job.is_running:
break
# Fork path: drain the result queue BEFORE waiting for the child to fully exit.
# A large exception travels through multiprocessing's pipe-backed queue; reading it
# here lets the child's feeder thread flush and avoids deadlocking on process exit.
# Fresh-interpreter subprocesses do not share Python exception objects with the parent.
result = job.drain_result()
# Wait for the child process to fully exit (fork path: queue is already drained above).
while job.is_running: # noqa: ASYNC110
await sleep(0.1)

self.jobs.remove(job)
if job.is_success:
logger.info("Job completed: %s", job.edge_job.identifier)
await jobs_set_state(job.edge_job.key, TaskInstanceState.SUCCESS)
job.cleanup()
else:
if isinstance(supervisor_msg, Exception):
supervisor_msg = "\n".join(traceback.format_exception(supervisor_msg))
logger.error("Job failed: %s with:\n%s", job.edge_job.identifier, supervisor_msg)
ex_txt = job.failure_details(result)
job.cleanup()
Comment thread
diogosilva30 marked this conversation as resolved.
Outdated
logger.error("Job failed: %s with:\n%s", job.edge_job.identifier, ex_txt)

# Push it upwards to logs for better diagnostic as well
await logs_push(
task=job.edge_job.key,
log_chunk_time=timezone.utcnow(),
log_chunk_data=f"Error executing job:\n{supervisor_msg}",
log_chunk_data=f"Error executing job:\n{ex_txt}",
)
await jobs_set_state(job.edge_job.key, TaskInstanceState.FAILED)

Expand Down
Loading
Loading