Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 52 additions & 1 deletion providers/edge3/src/airflow/providers/edge3/cli/dataclasses.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@
from __future__ import annotations

import json
import subprocess
import traceback
from dataclasses import asdict, dataclass
from multiprocessing import Process
from pathlib import Path
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from multiprocessing.queues import Queue

from airflow.providers.edge3.models.edge_worker import EdgeWorkerState
from airflow.providers.edge3.worker_api.datamodels import EdgeJobFetched

Expand Down Expand Up @@ -72,17 +76,64 @@ class Job:
"""Holds all information for a task/job to be executed as bundle."""

edge_job: EdgeJobFetched
process: Process
# Process can be either a subprocess.Popen (for the spawn path) or a
# multiprocessing.Process (for the fork path)
process: subprocess.Popen | Process
logfile: Path
logsize: int = 0
"""Last size of log file, point of last chunk push."""
results_queue: Queue | None = None
"""Queue for child process to push results to parent, if using fork-based execution model."""
stderr_file_path: Path | None = None
"""Path to file where stderr is being redirected, if using spawn-based execution model."""

@property
def is_running(self) -> bool:
"""Check if the job is still running."""
if isinstance(self.process, subprocess.Popen):
return self.process.poll() is None
return self.process.is_alive()

@property
def is_success(self) -> bool:
"""Check if the job was successful."""
if isinstance(self.process, subprocess.Popen):
return self.process.returncode == 0
return self.process.exitcode == 0

@property
def should_poll_logs(self) -> bool:
"""Check if logs should be pushed while waiting for job completion."""
# Fork path: keep pushing logs while the child is running and has not sent a result yet.
# Subprocess path: keep pushing logs while the child is running; status comes from Popen.
if not self.is_running:
return False
return self.results_queue is None or self.results_queue.empty()

def drain_result(self) -> object | None:
"""Read the child result if the execution model provides one."""
if self.results_queue is None or self.results_queue.empty():
return None
return self.results_queue.get()

def failure_details(self, result: object | None) -> str:
"""Format execution-model-specific failure details."""
if isinstance(self.process, subprocess.Popen):
stderr_output = ""
if self.stderr_file_path:
stderr_output = self.stderr_file_path.read_bytes().decode(errors="backslashreplace").strip()
ex_txt = f"Task subprocess exited with code {self.process.returncode}"
if stderr_output:
ex_txt = f"{ex_txt}\n{stderr_output}"
return ex_txt

return (
"\n".join(traceback.format_exception(result))
if isinstance(result, Exception)
else "(Unknown error, no exception details available)"
)

def cleanup(self) -> None:
"""Remove transient files owned by this job."""
if self.stderr_file_path:
self.stderr_file_path.unlink(missing_ok=True)
103 changes: 84 additions & 19 deletions providers/edge3/src/airflow/providers/edge3/cli/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
import logging
import os
import signal
import subprocess
import sys
import tempfile
import time
import traceback
from asyncio import Task, create_task, gather, get_running_loop, sleep
from collections.abc import Awaitable, Callable
from contextlib import suppress
Expand Down Expand Up @@ -66,6 +67,7 @@
if TYPE_CHECKING:
from airflow.configuration import AirflowConfigParser
from airflow.executors.workloads import ExecuteTask
from airflow.providers.edge3.worker_api.datamodels import EdgeJobFetched

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -414,6 +416,7 @@ def _get_state(self) -> EdgeWorkerState:
return EdgeWorkerState.IDLE

def _run_job_via_supervisor(self, workload: ExecuteTask, results_queue: Queue) -> int:
"""Run a task by calling the supervisor directly (executes inside a forked child process)."""
_reset_parent_signal_state()

from airflow.sdk.execution_time.supervisor import supervise
Expand Down Expand Up @@ -447,17 +450,80 @@ def _run_job_via_supervisor(self, workload: ExecuteTask, results_queue: Queue) -
results_queue.put(e)
return 1

def _launch_job(self, workload: ExecuteTask) -> tuple[Process, Queue[Exception]]:
def _launch_job_subprocess(self, workload: ExecuteTask) -> tuple[subprocess.Popen, Path]:
"""Launch workload via a fresh Python interpreter (subprocess.Popen)."""
env = os.environ.copy()
if self._execution_api_server_url:
env["AIRFLOW__CORE__EXECUTION_API_SERVER_URL"] = self._execution_api_server_url

# Keep stderr off a PIPE: the worker only inspects stderr after the task finishes,
# so a verbose child could otherwise fill the pipe buffer and block forever. Also keep
# it task-scoped instead of inheriting the worker's stderr/stdout; supervisor startup
# failures should be pushed to the task log, not only the worker/container log.
with tempfile.NamedTemporaryFile(
prefix="airflow-edge-task-stderr-", suffix=".log", delete=False
) as stderr_file:
stderr_file_path = Path(stderr_file.name)
try:
process = subprocess.Popen(
[
sys.executable,
"-m",
"airflow.sdk.execution_time.execute_workload",
"--json-string",
workload.model_dump_json(),
],
env=env,
start_new_session=True,
stderr=stderr_file,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not redirecting stderr to the normal logger/stdout?

Copy link
Copy Markdown
Author

@diogosilva30 diogosilva30 May 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. Used a temp file because stderr is the only parent-visible diagnostic channel for the fresh-interpreter path, and we want those diagnostics attached to the task that failed.

In the fork path, the child can return an exception object through the multiprocessing result queue. In the subprocess path, the child is a separate Python interpreter running execute_workload, so it cannot send that Python exception object back to the Edge worker. If something fails early, especially during workload parsing, supervisor startup, plugin import, or Dag import, stderr is what preserves the traceback.

We could pass sys.__stderr__ like Celery does, but then output from all concurrently running task subprocesses would share the Edge worker’s stderr. That means a traceback could end up only in the worker/container log, potentially interleaved with other task subprocesses and worker logs, and not attached to the failed task’s log.

The temp file is a per-task spool: it avoids subprocess.PIPE (which can deadlock if the parent does not continuously drain it), keeps stderr attributable to the specific task subprocess, and lets us push those startup diagnostics into the task log via logs_push after the process exits.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, sounds reasonable.

Still the STDERR then could be sent to the message queue? Just as plain text? The Edge Worker checks for Exception but otherwise should also be able to accept Test as String (like "OK" is being sent?)

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Queue approach works in the fork path because the child inherits the multiprocessing state, including the Queue itself.

With subprocess.Popen(...) we start a completely fresh Python interpreter, so there is no shared Queue unless we build a separate IPC layer (pipe/socket/fd passing/etc).

We could do that, but it adds quite a bit of complexity compared to the current tempfile approach. The tempfile also avoids PIPE deadlocks and still captures early bootstrap/import failures before any IPC channel would be initialized.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Having a sleep over and seeing the code... I do not actually want to insist on the queue :-D I just mainly want to pass error details back from supervisor if somethings failed into task logs. So the "text" content should be passed-over.

For me it would also be okay to step away from the Queue in general and transport the error details via a text file in both branches. Then we have one technical backend for both execution options. Main part I want to achieve is to have "text" transferred to instead of passing the exception to queue the test can also be written to file and picked-up. That would make it leaner?
(Including if all is OK we do not need to pass "OK" text, we just use the file for passing any error text?)

Copy link
Copy Markdown
Author

@diogosilva30 diogosilva30 May 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the suggestion! Before I push I just wanted to confirm the approach I've taken is what you had in mind:

  • Keep the current changes in fresh sub-process path.
  • Removed the multiprocessing.Queue from the fork path entirely.
  • Both paths now write failure traceback text to a named temp file (Path) stored as Job.stderr_file_path.
  • Job.failure_details() reads from that file for both paths — no arguments needed, one code path.
  • On success the file is cleaned up via Job.cleanup().

Does that match what you had in mind, or would you like any adjustments?

)
except Exception:
stderr_file_path.unlink(missing_ok=True)
raise
logger.info(
"Launched task subprocess pid=%d for %s",
process.pid,
workload.ti.id,
)
return process, stderr_file_path

def _launch_job_fork(self, workload: ExecuteTask) -> tuple[Process, Queue]:
"""Launch workload by forking the current process (multiprocessing.Process)."""
# Improvement: Use frozen GC to prevent child process from copying unnecessary memory
# See _spawn_workers_with_gc_freeze() in airflow-core/src/airflow/executors/local_executor.py
results_queue: Queue[Exception] = Queue()
results_queue: Queue = Queue()
process = Process(
target=self._run_job_via_supervisor,
kwargs={"workload": workload, "results_queue": results_queue},
)
process.start()
logger.info("Launched task fork pid=%d for %s", process.pid, workload.ti.id)
return process, results_queue

def _launch_job(self, edge_job: EdgeJobFetched, workload: ExecuteTask, logfile: Path) -> Job:
"""
Launch a task process.

Uses ``subprocess.Popen`` (fresh interpreter) when
``core.execute_tasks_new_python_interpreter`` is ``True`` or when
``os.fork`` is unavailable (e.g. Windows). Falls back to
``multiprocessing.Process`` (fork) otherwise — preserving the
original behaviour for existing deployments.
"""
use_new_interpreter = not hasattr(os, "fork") or self.conf.getboolean(
"core",
"execute_tasks_new_python_interpreter",
fallback=False,
)
if use_new_interpreter:
# Fresh subprocess path: spawn a new Python interpreter; no shared memory with parent
# Technically safer and more robust, but with more overhead
subprocess_process, stderr_file_path = self._launch_job_subprocess(workload)
return Job(edge_job, subprocess_process, logfile, stderr_file_path=stderr_file_path)
# Fork path: clone the current process; child inherits parent memory
fork_process, results_queue = self._launch_job_fork(workload)
return Job(edge_job, fork_process, logfile, results_queue=results_queue)

async def _push_logs_in_chunks(self, job: Job):
aio_logfile = anyio.Path(job.logfile)
if self.push_logs and await aio_logfile.exists() and (await aio_logfile.stat()).st_size > job.logsize:
Expand Down Expand Up @@ -581,11 +647,10 @@ async def fetch_and_run_job(self) -> None:
logger.info("Received job: %s", edge_job.identifier)

workload: ExecuteTask = edge_job.command
process, results_queue = self._launch_job(workload)
if TYPE_CHECKING:
assert workload.log_path # We need to assume this is defined in here
logfile = Path(self.base_log_folder, workload.log_path)
job = Job(edge_job, process, logfile)
job = self._launch_job(edge_job, workload, logfile)
self.jobs.append(job)
await jobs_set_state(edge_job.key, TaskInstanceState.RUNNING)

Expand All @@ -595,37 +660,37 @@ async def fetch_and_run_job(self) -> None:
self.background_tasks.add(task)
task.add_done_callback(self.background_tasks.discard)

while job.is_running and results_queue.empty():
while job.should_poll_logs:
await self._push_logs_in_chunks(job)
for _ in range(0, self.job_poll_interval * 10):
await sleep(0.1)
if not job.is_running:
break
await self._push_logs_in_chunks(job)
supervisor_msg = (
"(Unknown error, no exception details available)"
if results_queue.empty()
else results_queue.get()
)
# Ensure that supervisor really ended after we grabbed results from queue
while True:
if not job.is_running:
break
# Fork path: drain the result queue BEFORE waiting for the child to fully exit.
# A large exception travels through multiprocessing's pipe-backed queue; reading it
# here lets the child's feeder thread flush and avoids deadlocking on process exit.
# Fresh-interpreter subprocesses do not share Python exception objects with the parent.
result = job.drain_result()
# Wait for the child process to fully exit (fork path: queue is already drained above).
while job.is_running: # noqa: ASYNC110
await sleep(0.1)

self.jobs.remove(job)
if job.is_success:
logger.info("Job completed: %s", job.edge_job.identifier)
await jobs_set_state(job.edge_job.key, TaskInstanceState.SUCCESS)
job.cleanup()
else:
if isinstance(supervisor_msg, Exception):
supervisor_msg = "\n".join(traceback.format_exception(supervisor_msg))
logger.error("Job failed: %s with:\n%s", job.edge_job.identifier, supervisor_msg)
ex_txt = job.failure_details(result)
job.cleanup()
logger.error("Job failed: %s with:\n%s", job.edge_job.identifier, ex_txt)

# Push it upwards to logs for better diagnostic as well
await logs_push(
task=job.edge_job.key,
log_chunk_time=timezone.utcnow(),
log_chunk_data=f"Error executing job:\n{supervisor_msg}",
log_chunk_data=f"Error executing job:\n{ex_txt}",
)
await jobs_set_state(job.edge_job.key, TaskInstanceState.FAILED)

Expand Down
Loading
Loading