Skip to content
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from __future__ import annotations

import json
import subprocess
from dataclasses import asdict, dataclass
from multiprocessing import Process
from pathlib import Path
Expand Down Expand Up @@ -72,17 +73,23 @@ class Job:
"""Holds all information for a task/job to be executed as bundle."""

edge_job: EdgeJobFetched
process: Process
# Process can be either a subprocess.Popen (for the spawn path) or a
# multiprocessing.Process (for the fork path)
process: subprocess.Popen | Process
logfile: Path
logsize: int = 0
"""Last size of log file, point of last chunk push."""

@property
def is_running(self) -> bool:
"""Check if the job is still running."""
if isinstance(self.process, subprocess.Popen):
return self.process.poll() is None
return self.process.is_alive()

@property
def is_success(self) -> bool:
"""Check if the job was successful."""
if isinstance(self.process, subprocess.Popen):
return self.process.returncode == 0
return self.process.exitcode == 0
124 changes: 108 additions & 16 deletions providers/edge3/src/airflow/providers/edge3/cli/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import logging
import os
import signal
import subprocess
import sys
import tempfile
import time
import traceback
from asyncio import Task, create_task, gather, get_running_loop, sleep
Expand Down Expand Up @@ -133,6 +135,7 @@ def __init__(
self.concurrency = concurrency
self.daemon = daemon
self.team_name = team_name
self._subprocess_stderr_files: dict[int, Path] = {}

self.worker_start_time: datetime = datetime.now()

Expand Down Expand Up @@ -414,6 +417,7 @@ def _get_state(self) -> EdgeWorkerState:
return EdgeWorkerState.IDLE

def _run_job_via_supervisor(self, workload: ExecuteTask, results_queue: Queue) -> int:
"""Run a task by calling the supervisor directly (executes inside a forked child process)."""
_reset_parent_signal_state()

from airflow.sdk.execution_time.supervisor import supervise
Expand Down Expand Up @@ -447,17 +451,77 @@ def _run_job_via_supervisor(self, workload: ExecuteTask, results_queue: Queue) -
results_queue.put(e)
return 1

def _launch_job(self, workload: ExecuteTask) -> tuple[Process, Queue[Exception]]:
def _launch_job_subprocess(self, workload: ExecuteTask) -> subprocess.Popen:
"""Launch workload via a fresh Python interpreter (subprocess.Popen)."""
env = os.environ.copy()
if self._execution_api_server_url:
env["AIRFLOW__CORE__EXECUTION_API_SERVER_URL"] = self._execution_api_server_url

# Keep stderr off a PIPE: the worker only inspects stderr after the task finishes,
# so a verbose child could otherwise fill the pipe buffer and block forever.
with tempfile.NamedTemporaryFile(
prefix="airflow-edge-task-stderr-", suffix=".log", delete=False
) as stderr_file:
stderr_file_path = Path(stderr_file.name)
try:
process = subprocess.Popen(
[
sys.executable,
"-m",
"airflow.sdk.execution_time.execute_workload",
"--json-string",
workload.model_dump_json(),
],
env=env,
start_new_session=True,
stderr=stderr_file,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not redirecting stderr to the normal logger/stdout?

Copy link
Copy Markdown
Author

@diogosilva30 diogosilva30 May 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. Used a temp file because stderr is the only parent-visible diagnostic channel for the fresh-interpreter path, and we want those diagnostics attached to the task that failed.

In the fork path, the child can return an exception object through the multiprocessing result queue. In the subprocess path, the child is a separate Python interpreter running execute_workload, so it cannot send that Python exception object back to the Edge worker. If something fails early, especially during workload parsing, supervisor startup, plugin import, or Dag import, stderr is what preserves the traceback.

We could pass sys.__stderr__ like Celery does, but then output from all concurrently running task subprocesses would share the Edge worker’s stderr. That means a traceback could end up only in the worker/container log, potentially interleaved with other task subprocesses and worker logs, and not attached to the failed task’s log.

The temp file is a per-task spool: it avoids subprocess.PIPE (which can deadlock if the parent does not continuously drain it), keeps stderr attributable to the specific task subprocess, and lets us push those startup diagnostics into the task log via logs_push after the process exits.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, sounds reasonable.

Still the STDERR then could be sent to the message queue? Just as plain text? The Edge Worker checks for Exception but otherwise should also be able to accept Test as String (like "OK" is being sent?)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Queue approach works in the fork path because the child inherits the multiprocessing state, including the Queue itself.

With subprocess.Popen(...) we start a completely fresh Python interpreter, so there is no shared Queue unless we build a separate IPC layer (pipe/socket/fd passing/etc).

We could do that, but it adds quite a bit of complexity compared to the current tempfile approach. The tempfile also avoids PIPE deadlocks and still captures early bootstrap/import failures before any IPC channel would be initialized.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a sleep over and seeing the code... I do not actually want to insist on the queue :-D I just mainly want to pass error details back from supervisor if somethings failed into task logs. So the "text" content should be passed-over.

For me it would also be okay to step away from the Queue in general and transport the error details via a text file in both branches. Then we have one technical backend for both execution options. Main part I want to achieve is to have "text" transferred to instead of passing the exception to queue the test can also be written to file and picked-up. That would make it leaner?
(Including if all is OK we do not need to pass "OK" text, we just use the file for passing any error text?)

Copy link
Copy Markdown
Author

@diogosilva30 diogosilva30 May 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion! Before I push I just wanted to confirm the approach I've taken is what you had in mind:

  • Keep the current changes in fresh sub-process path.
  • Removed the multiprocessing.Queue from the fork path entirely.
  • Both paths now write failure traceback text to a named temp file (Path) stored as Job.stderr_file_path.
  • Job.failure_details() reads from that file for both paths — no arguments needed, one code path.
  • On success the file is cleaned up via Job.cleanup().

Does that match what you had in mind, or would you like any adjustments?

)
except Exception:
stderr_file_path.unlink(missing_ok=True)
raise
self._subprocess_stderr_files[process.pid] = stderr_file_path
logger.debug(
"Launched task subprocess pid=%d for %s",
process.pid,
workload.display_name,
)
return process

def _launch_job_fork(self, workload: ExecuteTask) -> tuple[Process, Queue]:
"""Launch workload by forking the current process (multiprocessing.Process)."""
# Improvement: Use frozen GC to prevent child process from copying unnecessary memory
# See _spawn_workers_with_gc_freeze() in airflow-core/src/airflow/executors/local_executor.py
results_queue: Queue[Exception] = Queue()
results_queue: Queue = Queue()
process = Process(
target=self._run_job_via_supervisor,
kwargs={"workload": workload, "results_queue": results_queue},
)
process.start()
logger.debug("Launched task fork pid=%d for %s", process.pid, workload.display_name)
return process, results_queue

def _launch_job(self, workload: ExecuteTask) -> tuple[subprocess.Popen | Process, Queue | None]:
"""
Launch a task process.

Uses ``subprocess.Popen`` (fresh interpreter) when
``core.execute_tasks_new_python_interpreter`` is ``True`` or when
``os.fork`` is unavailable (e.g. Windows). Falls back to
``multiprocessing.Process`` (fork) otherwise — preserving the
original behaviour for existing deployments.
"""
use_new_interpreter = not hasattr(os, "fork") or self.conf.getboolean(
"core",
"execute_tasks_new_python_interpreter",
fallback=False,
)
if use_new_interpreter:
# Fresh subprocess path: spawn a new Python interpreter; no shared memory with parent
# Technically safer and more robust, but with more overhead
return self._launch_job_subprocess(workload), None
# Fork path: clone the current process; child inherits parent memory
return self._launch_job_fork(workload)

async def _push_logs_in_chunks(self, job: Job):
aio_logfile = anyio.Path(job.logfile)
if self.push_logs and await aio_logfile.exists() and (await aio_logfile.stat()).st_size > job.logsize:
Expand Down Expand Up @@ -595,37 +659,65 @@ async def fetch_and_run_job(self) -> None:
self.background_tasks.add(task)
task.add_done_callback(self.background_tasks.discard)

while job.is_running and results_queue.empty():
# Fork path: keep pushing logs while the child is running and has not sent a result yet.
# Subprocess path: keep pushing logs while the child is running; status comes from Popen.
while job.is_running and (results_queue is None or results_queue.empty()):
Comment thread
diogosilva30 marked this conversation as resolved.
Outdated
await self._push_logs_in_chunks(job)
for _ in range(0, self.job_poll_interval * 10):
await sleep(0.1)
if not job.is_running:
break
await self._push_logs_in_chunks(job)
supervisor_msg = (
"(Unknown error, no exception details available)"
if results_queue.empty()
else results_queue.get()
)
# Ensure that supervisor really ended after we grabbed results from queue
while True:
if not job.is_running:
break
# Fork path: drain the result queue BEFORE waiting for the child to fully exit.
# A large exception travels through multiprocessing's pipe-backed queue; reading it
# here lets the child's feeder thread flush and avoids deadlocking on process exit.
# Fresh-interpreter subprocesses do not share Python exception objects with the parent.
fork_result = None if (results_queue is None or results_queue.empty()) else results_queue.get()
# Wait for the child process to fully exit (fork path: queue is already drained above).
while job.is_running: # noqa: ASYNC110
await sleep(0.1)

self.jobs.remove(job)
# Subprocess stderr is keyed by PID because Job intentionally stores only the process
# object. Pop it once the process is done so every completion path owns cleanup.
stderr_file_path = (
self._subprocess_stderr_files.pop(job.process.pid, None)
if isinstance(job.process, subprocess.Popen)
else None
)
if job.is_success:
logger.info("Job completed: %s", job.edge_job.identifier)
await jobs_set_state(job.edge_job.key, TaskInstanceState.SUCCESS)
if stderr_file_path:
stderr_file_path.unlink(missing_ok=True)
else:
if isinstance(supervisor_msg, Exception):
supervisor_msg = "\n".join(traceback.format_exception(supervisor_msg))
logger.error("Job failed: %s with:\n%s", job.edge_job.identifier, supervisor_msg)
if isinstance(job.process, subprocess.Popen):
Comment thread
diogosilva30 marked this conversation as resolved.
Outdated
# The subprocess cannot send a Python exception object back to this process;
# stderr is the diagnostic channel that preserves errors and tracebacks.
stderr_output = ""
if stderr_file_path:
try:
stderr_output = (
stderr_file_path.read_bytes().decode(errors="backslashreplace").strip()
)
finally:
stderr_file_path.unlink(missing_ok=True)
ex_txt = f"Task subprocess exited with code {job.process.returncode}"
if stderr_output:
ex_txt = f"{ex_txt}\n{stderr_output}"
else:
# Fork path: use the result already drained from the queue above.
ex_txt = (
"\n".join(traceback.format_exception(fork_result))
if isinstance(fork_result, Exception)
else "(Unknown error, no exception details available)"
)
logger.error("Job failed: %s with:\n%s", job.edge_job.identifier, ex_txt)
# Push it upwards to logs for better diagnostic as well
await logs_push(
task=job.edge_job.key,
log_chunk_time=timezone.utcnow(),
log_chunk_data=f"Error executing job:\n{supervisor_msg}",
log_chunk_data=f"Error executing job:\n{ex_txt}",
)
await jobs_set_state(job.edge_job.key, TaskInstanceState.FAILED)

Expand Down
140 changes: 140 additions & 0 deletions providers/edge3/tests/unit/edge3/cli/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import json
import logging
import multiprocessing
import os
import signal
import subprocess
import sys
from datetime import datetime
from io import StringIO
from multiprocessing import Process, Queue
Expand Down Expand Up @@ -52,6 +55,7 @@
WorkerRegistrationReturn,
WorkerSetStateReturn,
)
from airflow.utils.state import TaskInstanceState

from tests_common.test_utils.config import conf_vars
from tests_common.test_utils.version_compat import AIRFLOW_V_3_2_PLUS
Expand Down Expand Up @@ -118,6 +122,15 @@ def join(self, timeout=None):
pass


class _MockPopen(subprocess.Popen):
def __init__(self, returncode: int | None = None, pid: int = 1234):
self.returncode = returncode
self.pid = pid

def poll(self):
return self.returncode


class TestEdgeWorker:
@pytest.fixture(autouse=True)
def setup_parser(self):
Expand Down Expand Up @@ -234,6 +247,93 @@ def test_execution_api_server_url(
url = test_worker._execution_api_server_url
assert url == expected_url

@pytest.mark.parametrize(
("has_fork", "use_new_interpreter", "expected_launch_method"),
[
pytest.param(True, False, "fork", id="fork_available_config_false"),
pytest.param(True, True, "subprocess", id="fork_available_config_true"),
pytest.param(False, False, "subprocess", id="fork_unavailable_config_false"),
pytest.param(False, True, "subprocess", id="fork_unavailable_config_true"),
],
)
def test_launch_job_honors_execute_tasks_new_python_interpreter(
self,
has_fork,
use_new_interpreter,
expected_launch_method,
monkeypatch,
worker_with_job: EdgeWorker,
):
if not has_fork:
monkeypatch.delattr(os, "fork", raising=False)
worker_with_job.conf = mock.MagicMock()
worker_with_job.conf.getboolean.return_value = use_new_interpreter
workload = worker_with_job.jobs[0].edge_job.command
subprocess_process = _MockPopen(returncode=None)
fork_process = _MockProcess()
results_queue = mock.MagicMock()

with (
patch.object(
worker_with_job, "_launch_job_subprocess", return_value=subprocess_process
) as mock_launch_subprocess,
patch.object(
worker_with_job, "_launch_job_fork", return_value=(fork_process, results_queue)
) as mock_launch_fork,
):
process, queue = worker_with_job._launch_job(workload)

if has_fork:
worker_with_job.conf.getboolean.assert_called_once_with(
"core", "execute_tasks_new_python_interpreter", fallback=False
)
else:
worker_with_job.conf.getboolean.assert_not_called()
if expected_launch_method == "subprocess":
assert process is subprocess_process
assert queue is None
mock_launch_subprocess.assert_called_once_with(workload)
mock_launch_fork.assert_not_called()
else:
assert process is fork_process
assert queue is results_queue
mock_launch_fork.assert_called_once_with(workload)
mock_launch_subprocess.assert_not_called()

@patch("airflow.providers.edge3.cli.worker.subprocess.Popen")
def test_launch_job_subprocess_uses_fresh_interpreter_and_spools_stderr(
self,
mock_popen,
worker_with_job: EdgeWorker,
):
process = _MockPopen(returncode=None, pid=4321)
mock_popen.return_value = process
worker_with_job.__dict__["_execution_api_server_url"] = "https://mock-server/execution"
workload = worker_with_job.jobs[0].edge_job.command

try:
assert worker_with_job._launch_job_subprocess(workload) is process

popen_args, popen_kwargs = mock_popen.call_args
assert popen_args[0] == [
sys.executable,
"-m",
"airflow.sdk.execution_time.execute_workload",
"--json-string",
workload.model_dump_json(),
]
assert (
popen_kwargs["env"]["AIRFLOW__CORE__EXECUTION_API_SERVER_URL"]
== "https://mock-server/execution"
)
assert popen_kwargs["start_new_session"] is True
assert popen_kwargs["stderr"] is not subprocess.PIPE
assert Path(popen_kwargs["stderr"].name) == worker_with_job._subprocess_stderr_files[process.pid]
finally:
stderr_file_path = worker_with_job._subprocess_stderr_files.pop(process.pid, None)
if stderr_file_path:
stderr_file_path.unlink(missing_ok=True)

@patch("airflow.sdk.execution_time.supervisor.supervise")
@pytest.mark.asyncio
async def test_supervise_launch(
Expand Down Expand Up @@ -433,6 +533,46 @@ async def test_fetch_and_run_job_one_job_fail(
assert len(worker_with_job.jobs) == 1 # no new job added (was removed at the end...)
mock_logs_push.assert_called_once()

@patch("airflow.providers.edge3.cli.worker.jobs_fetch")
@patch("airflow.providers.edge3.cli.worker.jobs_set_state")
@patch("airflow.providers.edge3.cli.worker.EdgeWorker._push_logs_in_chunks")
@patch("airflow.providers.edge3.cli.worker.logs_push")
@pytest.mark.asyncio
async def test_fetch_and_run_job_subprocess_failure_pushes_stderr_to_logs(
self,
mock_logs_push,
mock_push_log_chunks,
mock_jobs_set_state,
mock_jobs_fetch,
tmp_path: Path,
worker_with_job: EdgeWorker,
):
mock_jobs_fetch.return_value = EdgeJobFetched(
dag_id="test",
task_id="test",
run_id="test",
map_index=-1,
try_number=1,
concurrency_slots=1,
command=MOCK_COMMAND, # type: ignore[arg-type]
)
worker_with_job.concurrency = 1
process = _MockPopen(returncode=1, pid=5678)
stderr_file_path = tmp_path / "subprocess-stderr.log"
stderr_file_path.write_text("ModuleNotFoundError: No module named 'common'\n")
worker_with_job._subprocess_stderr_files[process.pid] = stderr_file_path

with patch.object(worker_with_job, "_launch_job", return_value=(process, None)):
await worker_with_job.fetch_and_run_job()

mock_jobs_fetch.assert_called_once()
mock_push_log_chunks.assert_called_once()
assert mock_jobs_set_state.call_args_list[-1].args[1] == TaskInstanceState.FAILED
log_chunk_data = mock_logs_push.call_args.kwargs["log_chunk_data"]
assert "Task subprocess exited with code 1" in log_chunk_data
assert "ModuleNotFoundError: No module named 'common'" in log_chunk_data
assert not stderr_file_path.exists()

@time_machine.travel(datetime.now(), tick=False)
@patch("airflow.providers.edge3.cli.worker.logs_push")
@pytest.mark.asyncio
Expand Down
Loading