Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
27bb264
fix(edge3): replace fork() with subprocess.Popen to prevent deadlocks…
diogosilva30 Apr 27, 2026
fbaa062
Fix test_worker.py to use multiprocessing.Process instead of subproce…
diogosilva30 May 6, 2026
b8a7ab3
Honor fresh interpreter mode in Edge worker
diogosilva30 May 6, 2026
7378a39
Clarify Edge worker task process handling
diogosilva30 May 6, 2026
1cf3899
Merge branch 'main' into fix/edge3-fork-deadlock-subprocess
diogosilva30 May 6, 2026
72a25a1
Merge branch 'main' into fix/edge3-fork-deadlock-subprocess
diogosilva30 May 7, 2026
dcb4f2b
Improve Edge worker subprocess failure handling
diogosilva30 May 7, 2026
617b713
Merge branch 'main' into fix/edge3-fork-deadlock-subprocess
diogosilva30 May 7, 2026
3527b7d
Merge branch 'main' into fix/edge3-fork-deadlock-subprocess
diogosilva30 May 7, 2026
29e017c
Merge branch 'main' into fix/edge3-fork-deadlock-subprocess
diogosilva30 May 8, 2026
0bceddf
fix: rollback supervise changes & fix tests related to display_name
diogosilva30 May 8, 2026
2ca14c9
Merge branch 'main' into fix/edge3-fork-deadlock-subprocess
diogosilva30 May 8, 2026
cff85d2
Merge branch 'main' into fix/edge3-fork-deadlock-subprocess
diogosilva30 May 8, 2026
d0a7550
Merge branch 'main' into fix/edge3-fork-deadlock-subprocess
diogosilva30 May 11, 2026
9de4110
Unify error transport for fork and subprocess paths via temp file
diogosilva30 May 11, 2026
c11a8ac
Merge branch 'main' into fix/edge3-fork-deadlock-subprocess
diogosilva30 May 14, 2026
e78bf72
Merge branch 'main' into fix/edge3-fork-deadlock-subprocess
diogosilva30 May 14, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 27 additions & 1 deletion providers/edge3/src/airflow/providers/edge3/cli/dataclasses.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from __future__ import annotations

import json
import subprocess
from dataclasses import asdict, dataclass
from multiprocessing import Process
from pathlib import Path
Expand Down Expand Up @@ -72,17 +73,42 @@ class Job:
"""Holds all information for a task/job to be executed as bundle."""

edge_job: EdgeJobFetched
process: Process
process: subprocess.Popen | Process
"""Can be subprocess.Popen (for the spawn path) or multiprocessing.Process (for the fork path)."""
logfile: Path
logsize: int = 0
"""Last size of log file, point of last chunk push."""
stderr_file_path: Path | None = None
"""Path to file where error details are written on failure (stderr for subprocess path, traceback text for fork path)."""

@property
def is_running(self) -> bool:
"""Check if the job is still running."""
if isinstance(self.process, subprocess.Popen):
return self.process.poll() is None
return self.process.is_alive()

@property
def is_success(self) -> bool:
"""Check if the job was successful."""
if isinstance(self.process, subprocess.Popen):
return self.process.returncode == 0
return self.process.exitcode == 0

def failure_details(self) -> str:
"""Format failure details, reading error text from the error file if available."""
error_output = ""
if self.stderr_file_path and self.stderr_file_path.exists():
error_output = self.stderr_file_path.read_bytes().decode(errors="backslashreplace").strip()
if isinstance(self.process, subprocess.Popen):
ex_txt = f"Task subprocess exited with code {self.process.returncode}"
else:
ex_txt = f"Task fork exited with code {self.process.exitcode}"
if error_output:
ex_txt = f"{ex_txt}\n{error_output}"
return ex_txt

def cleanup(self) -> None:
"""Remove transient files owned by this job."""
if self.stderr_file_path:
self.stderr_file_path.unlink(missing_ok=True)
121 changes: 94 additions & 27 deletions providers/edge3/src/airflow/providers/edge3/cli/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import logging
import os
import signal
import subprocess
import sys
import tempfile
import time
import traceback
from asyncio import Task, create_task, gather, get_running_loop, sleep
Expand All @@ -28,9 +30,9 @@
from datetime import datetime
from functools import cached_property
from http import HTTPStatus
from multiprocessing import Process, Queue
from multiprocessing import Process
from pathlib import Path
from typing import TYPE_CHECKING
from typing import IO, TYPE_CHECKING

import anyio
from aiofiles import open as aio_open
Expand Down Expand Up @@ -66,6 +68,7 @@
if TYPE_CHECKING:
from airflow.configuration import AirflowConfigParser
from airflow.executors.workloads import ExecuteTask
from airflow.providers.edge3.worker_api.datamodels import EdgeJobFetched

logger = logging.getLogger(__name__)

Expand All @@ -75,6 +78,12 @@
from setproctitle import setproctitle


def _make_task_temp_file(prefix: str) -> tuple[IO[bytes], Path]:
"""Create a named temporary file for task output capture and return the open file and its path."""
f = tempfile.NamedTemporaryFile(prefix=prefix, suffix=".log", delete=False)
return f, Path(f.name)


def _edge_hostname() -> str:
"""Get the hostname of the edge worker that should be reported by tasks."""
return os.environ.get("HOSTNAME", getfqdn())
Expand Down Expand Up @@ -413,7 +422,8 @@ def _get_state(self) -> EdgeWorkerState:
return EdgeWorkerState.MAINTENANCE_MODE
return EdgeWorkerState.IDLE

def _run_job_via_supervisor(self, workload: ExecuteTask, results_queue: Queue) -> int:
def _run_job_via_supervisor(self, workload: ExecuteTask, error_file_path: Path) -> int:
"""Run a task by calling the supervisor directly (executes inside a forked child process)."""
_reset_parent_signal_state()

# Ignore ctrl-c in this process -- we don't want to kill _this_ one. we let tasks run to completion
Expand Down Expand Up @@ -448,23 +458,89 @@ def _run_job_via_supervisor(self, workload: ExecuteTask, results_queue: Queue) -
server=self._execution_api_server_url,
log_path=workload.log_path,
)
results_queue.put("OK")
return 0
except Exception as e:
except Exception:
logger.exception("Task execution failed")
results_queue.put(e)
with suppress(Exception):
error_file_path.write_text(traceback.format_exc())
return 1

def _launch_job(self, workload: ExecuteTask) -> tuple[Process, Queue[Exception]]:
def _launch_job_subprocess(self, workload: ExecuteTask) -> tuple[subprocess.Popen, Path]:
"""Launch workload via a fresh Python interpreter (subprocess.Popen)."""
env = os.environ.copy()
if self._execution_api_server_url:
env["AIRFLOW__CORE__EXECUTION_API_SERVER_URL"] = self._execution_api_server_url

# Keep stderr off a PIPE: the worker only inspects stderr after the task finishes,
# so a verbose child could otherwise fill the pipe buffer and block forever. Also keep
# it task-scoped instead of inheriting the worker's stderr/stdout; supervisor startup
# failures should be pushed to the task log, not only the worker/container log.
stderr_file, stderr_file_path = _make_task_temp_file("airflow-edge-task-stderr-")
try:
process = subprocess.Popen(
[
sys.executable,
"-m",
"airflow.sdk.execution_time.execute_workload",
"--json-string",
workload.model_dump_json(),
],
env=env,
start_new_session=True,
stderr=stderr_file,
)
except Exception:
stderr_file_path.unlink(missing_ok=True)
raise
finally:
# Close the parent's copy of the fd. Popen already dup2()'d it into the child,
# so the child's stderr remains open and writable. The parent reads the output
# later via stderr_file_path (the Path) once the child has exited.
stderr_file.close()
logger.info(
"Launched task subprocess pid=%d for %s",
process.pid,
workload.ti.id,
)
return process, stderr_file_path

def _launch_job_fork(self, workload: ExecuteTask) -> tuple[Process, Path]:
"""Launch workload by forking the current process (multiprocessing.Process)."""
# Improvement: Use frozen GC to prevent child process from copying unnecessary memory
# See _spawn_workers_with_gc_freeze() in airflow-core/src/airflow/executors/local_executor.py
results_queue: Queue[Exception] = Queue()
error_file, error_file_path = _make_task_temp_file("airflow-edge-task-error-")
error_file.close() # child writes to the file by path; parent only reads it after exit
process = Process(
target=self._run_job_via_supervisor,
kwargs={"workload": workload, "results_queue": results_queue},
kwargs={"workload": workload, "error_file_path": error_file_path},
)
process.start()
return process, results_queue
logger.info("Launched task fork pid=%d for %s", process.pid, workload.ti.id)
return process, error_file_path

def _launch_job(self, edge_job: EdgeJobFetched, workload: ExecuteTask, logfile: Path) -> Job:
"""
Launch a task process.

Uses ``subprocess.Popen`` (fresh interpreter) when
``core.execute_tasks_new_python_interpreter`` is ``True`` or when
``os.fork`` is unavailable (e.g. Windows). Falls back to
``multiprocessing.Process`` (fork) otherwise — preserving the
original behaviour for existing deployments.
"""
use_new_interpreter = not hasattr(os, "fork") or self.conf.getboolean(
"core",
"execute_tasks_new_python_interpreter",
fallback=False,
)
if use_new_interpreter:
# Fresh subprocess path: spawn a new Python interpreter; no shared memory with parent
# Technically safer and more robust, but with more overhead
subprocess_process, stderr_file_path = self._launch_job_subprocess(workload)
return Job(edge_job, subprocess_process, logfile, stderr_file_path=stderr_file_path)
# Fork path: clone the current process; child inherits parent memory
fork_process, error_file_path = self._launch_job_fork(workload)
return Job(edge_job, fork_process, logfile, stderr_file_path=error_file_path)

async def _push_logs_in_chunks(self, job: Job):
aio_logfile = anyio.Path(job.logfile)
Expand Down Expand Up @@ -589,11 +665,10 @@ async def fetch_and_run_job(self) -> None:
logger.info("Received job: %s", edge_job.identifier)

workload: ExecuteTask = edge_job.command
process, results_queue = self._launch_job(workload)
if TYPE_CHECKING:
assert workload.log_path # We need to assume this is defined in here
logfile = Path(self.base_log_folder, workload.log_path)
job = Job(edge_job, process, logfile)
job = self._launch_job(edge_job, workload, logfile)
self.jobs.append(job)
await jobs_set_state(edge_job.key, TaskInstanceState.RUNNING)

Expand All @@ -603,39 +678,31 @@ async def fetch_and_run_job(self) -> None:
self.background_tasks.add(task)
task.add_done_callback(self.background_tasks.discard)

while job.is_running and results_queue.empty():
while job.is_running:
await self._push_logs_in_chunks(job)
for _ in range(0, self.job_poll_interval * 10):
await sleep(0.1)
if not job.is_running:
break
await self._push_logs_in_chunks(job)
supervisor_msg = (
"(Unknown error, no exception details available)"
if results_queue.empty()
else results_queue.get()
)
# Ensure that supervisor really ended after we grabbed results from queue
while True:
if not job.is_running:
break
await sleep(0.1)

self.jobs.remove(job)
if job.is_success:
logger.info("Job completed: %s", job.edge_job.identifier)
await jobs_set_state(job.edge_job.key, TaskInstanceState.SUCCESS)
else:
if isinstance(supervisor_msg, Exception):
supervisor_msg = "\n".join(traceback.format_exception(supervisor_msg))
logger.error("Job failed: %s with:\n%s", job.edge_job.identifier, supervisor_msg)
ex_txt = job.failure_details()
logger.error("Job failed: %s with:\n%s", job.edge_job.identifier, ex_txt)

# Push it upwards to logs for better diagnostic as well
await logs_push(
task=job.edge_job.key,
log_chunk_time=timezone.utcnow(),
log_chunk_data=f"Error executing job:\n{supervisor_msg}",
log_chunk_data=f"Error executing job:\n{ex_txt}",
)
await jobs_set_state(job.edge_job.key, TaskInstanceState.FAILED)
# Cleanup temp files used for the job
job.cleanup()

async def heartbeat(self, new_maintenance_comments: str | None = None) -> bool:
"""Report liveness state of worker to central site with stats."""
Expand Down
Loading
Loading