Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion airflow-core/src/airflow/dag_processing/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
from airflow.dag_processing.dagbag import BundleDagBag, DagBag
from airflow.models.dag import DagModel
from airflow.sdk.exceptions import TaskNotFound
from airflow.sdk.execution_time import supervisor
from airflow.sdk.execution_time.comms import (
ConnectionResult,
DeleteVariable,
Expand Down Expand Up @@ -586,13 +587,24 @@ def start( # type: ignore[override]
) -> Self:
logger = kwargs["logger"]

_pre_import_airflow_modules(os.fspath(path), logger)
# Parsing DAG files runs user code that can trigger macOS-unsafe ObjC
# initialization (secret backends, connection/variable lookups, HTTP
# clients). Fork+exec a clean interpreter there. Tests override `target`
# with a stub to exercise the base infrastructure; keep bare fork for those.
use_exec = target is _parse_file_entrypoint and supervisor._should_use_exec()

# Pre-importing only helps the bare-fork child (it inherits the imports via
# copy-on-write). An exec'd child re-imports from scratch, so skip it there
# to avoid leaking user modules into the long-lived processor manager.
if not use_exec:
_pre_import_airflow_modules(os.fspath(path), logger)

proc: Self = super().start(
target=target,
client=client,
bundle_name=bundle_name,
dag_file_rel_path=dag_file_rel_path,
use_exec=use_exec,
**kwargs,
)
proc.had_callbacks = bool(callbacks) # Track if this process had callbacks
Expand Down
13 changes: 12 additions & 1 deletion airflow-core/src/airflow/jobs/triggerer_job_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
from airflow.observability.metrics import stats_utils
from airflow.sdk.api.datamodels._generated import HITLDetailResponse
from airflow.sdk.definitions.asset import Asset
from airflow.sdk.execution_time import supervisor
from airflow.sdk.execution_time.comms import (
AssetStateStoreResult,
ClearAssetStateStoreByName,
Expand Down Expand Up @@ -539,7 +540,17 @@ def start( # type: ignore[override]
**kwargs,
):
proc_id = job.id if job is not None else uuid4()
proc = super().start(id=proc_id, job=job, target=cls.run_in_process, logger=logger, **kwargs)
# Triggers run user code that polls APIs / watches queues / hits HTTP
# endpoints -- almost always network calls that trigger macOS-unsafe ObjC
# initialization. Fork+exec a clean interpreter for the runner child.
proc = super().start(
id=proc_id,
job=job,
target=cls.run_in_process,
logger=logger,
use_exec=supervisor._should_use_exec(),
**kwargs,
)

msg = messages.StartTriggerer()
proc.send_msg(msg, request_id=0)
Expand Down
46 changes: 45 additions & 1 deletion airflow-core/tests/unit/dag_processing/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,14 @@
_execute_email_callbacks,
_execute_task_callbacks,
_parse_file,
_parse_file_entrypoint,
_pre_import_airflow_modules,
)
from airflow.models import DagRun
from airflow.sdk import DAG, BaseOperator
from airflow.sdk.api.client import Client
from airflow.sdk.api.datamodels._generated import ConnectionResponse, DagRunState, VariableResponse
from airflow.sdk.execution_time import comms
from airflow.sdk.execution_time import comms, supervisor
from airflow.sdk.execution_time.comms import (
GetConnection,
GetTaskStates,
Expand All @@ -94,6 +95,19 @@

pytestmark = pytest.mark.db_test


@pytest.fixture(autouse=True)
def _force_bare_fork(monkeypatch):
"""
Keep the parsing child on a bare ``os.fork`` on every platform.

On macOS the DAG processor forks+execs a clean interpreter for fork-safety
(see ``DagFileProcessorProcess.start``). Forcing bare fork keeps local macOS
runs aligned with Linux/CI behavior.
"""
monkeypatch.setattr(supervisor, "_should_use_exec", lambda: False)


DEFAULT_DATE = timezone.datetime(2016, 1, 1)

# Filename to be used for dags that are created in an ad-hoc manner and can be removed/
Expand Down Expand Up @@ -468,6 +482,36 @@ def test__pre_import_airflow_modules_partial_success_and_warning(self):
assert logger.warning.call_count == 1


@pytest.mark.parametrize(
("platform_uses_exec", "target", "expected_use_exec"),
[
(True, _parse_file_entrypoint, True),
(False, _parse_file_entrypoint, False),
(True, lambda: None, False),
],
)
def test_start_opts_into_fork_exec(monkeypatch, mocker, platform_uses_exec, target, expected_use_exec):
"""start() forks+execs only for the real parse entry point on fork-unsafe platforms."""
monkeypatch.setattr(supervisor, "_should_use_exec", lambda: platform_uses_exec)
base_start = mocker.patch(
"airflow.sdk.execution_time.supervisor.WatchedSubprocess.start", return_value=MagicMock()
)
mocker.patch("airflow.dag_processing.processor._pre_import_airflow_modules")

DagFileProcessorProcess.start(
path="some_dag.py",
bundle_path=pathlib.Path("/tmp/bundle"),
bundle_name="testing",
dag_file_rel_path="some_dag.py",
callbacks=[],
client=MagicMock(spec=Client),
target=target,
logger=MagicMock(),
)

assert base_start.call_args.kwargs["use_exec"] is expected_use_exec


def write_dag_in_a_fn_to_file(fn: Callable[[], None], folder: pathlib.Path) -> pathlib.Path:
# Create the dag in a fn, and use inspect.getsource to write it to a file so that
# a) the test dag is directly viewable here in the tests
Expand Down
28 changes: 28 additions & 0 deletions airflow-core/tests/unit/jobs/test_triggerer_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
from airflow.sdk.api.client import Client
from airflow.sdk.api.datamodels._generated import AssetStateStoreResponse
from airflow.sdk.exceptions import ErrorType
from airflow.sdk.execution_time import supervisor
from airflow.sdk.execution_time.comms import (
AssetStateStoreResult,
ClearAssetStateStoreByName,
Expand Down Expand Up @@ -123,6 +124,19 @@
pytestmark = pytest.mark.db_test


@pytest.fixture(autouse=True)
def _force_bare_fork(monkeypatch):
"""
Keep the runner child on a bare ``os.fork`` on every platform.

On macOS the triggerer forks+execs a clean interpreter for fork-safety
(see ``TriggerRunnerSupervisor.start``). These tests fork the real runner
with trigger classes defined in this module, which a fresh exec'd interpreter
cannot import. Forcing bare fork makes local macOS runs match Linux/CI.
"""
monkeypatch.setattr(supervisor, "_should_use_exec", lambda: False)


@pytest.fixture(autouse=True)
def clean_database():
"""Fixture that cleans the database before and after every test."""
Expand Down Expand Up @@ -232,6 +246,20 @@ def test_triggerer_job_runner_stores_team_name(team_name):
assert runner.team_name == team_name


@pytest.mark.parametrize("platform_uses_exec", [True, False])
def test_start_opts_into_fork_exec(monkeypatch, mocker, platform_uses_exec):
"""start() forks+execs the runner child on fork-unsafe platforms (macOS)."""
monkeypatch.setattr(supervisor, "_should_use_exec", lambda: platform_uses_exec)
base_start = mocker.patch(
"airflow.sdk.execution_time.supervisor.WatchedSubprocess.start", return_value=MagicMock()
)

TriggerRunnerSupervisor.start(job=Job(id=999), capacity=10)

assert base_start.call_args.kwargs["use_exec"] is platform_uses_exec
assert base_start.call_args.kwargs["target"] == TriggerRunnerSupervisor.run_in_process


@pytest.fixture
def supervisor_builder(mocker, session):
def builder(job=None):
Expand Down
87 changes: 58 additions & 29 deletions task-sdk/src/airflow/sdk/execution_time/supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io
import logging
import os
import pkgutil
import selectors
import signal
import sys
Expand Down Expand Up @@ -498,42 +499,63 @@ def exit(n: int) -> NoReturn:

Calling ``os.execv`` immediately after ``os.fork`` replaces the child's address
space, giving it clean ObjC state. Before exec, the supervisor ``dup2``s the
socketpairs onto FDs 0 (requests/stdin), 1 (stdout), 2 (stderr). The duplicated
FDs survive the upcoming exec because ``os.dup2(inheritable=True)`` (the default)
clears ``FD_CLOEXEC`` on the destination FDs. The log channel is obtained after
startup via the existing ``ResendLoggingFD`` mechanism.
socketpairs onto fixed FDs the exec'd child reconstructs: 0 (requests/stdin),
1 (stdout), 2 (stderr), 3 (structured logs). ``os.set_inheritable`` clears
``FD_CLOEXEC`` on those FDs so they survive the upcoming exec.

Currently only task execution opts in (via ``ActivitySubprocess.start``). DAG
processor and triggerer can also hit this crash and will need the same treatment
as a follow-up (see https://github.com/apache/airflow/issues/65691).
Task execution (``ActivitySubprocess``), the DAG processor
(``DagFileProcessorProcess``) and the triggerer (``TriggerRunnerSupervisor``)
all opt in -- each runs a different child entry point, named by ``module:qualname``
in the ``_AIRFLOW_CHILD_TARGET`` env var and rehydrated by :func:`_child_exec_main`.

See: https://github.com/python/cpython/issues/105912
https://github.com/apache/airflow/discussions/24463
https://github.com/apache/airflow/issues/65691
"""


def _should_use_exec() -> bool:
"""Whether forked children should ``exec`` a fresh interpreter on this platform."""
return sys.platform in _FORK_EXEC_PLATFORMS


def _resolve_child_target(dotted: str) -> Callable[[], None]:
"""
Resolve a ``module:qualname`` string to the callable the exec'd child runs.

Empty input falls back to :func:`_subprocess_main` (task execution). The
qualname may name a nested attribute (e.g. ``ClassName.classmethod``) so
classmethod entry points like ``TriggerRunnerSupervisor.run_in_process``
rehydrate correctly across ``execv``.
"""
if not dotted:
return _subprocess_main
return pkgutil.resolve_name(dotted)


def _child_exec_main():
"""
Entry point for the child process when using fork+exec (macOS).

After exec, FDs 0/1/2 are already the requests/stdout/stderr sockets
(dup2'd by the parent before exec). The log channel is NOT inherited;
instead, the task runner requests it from the supervisor via the existing
``ResendLoggingFD`` mechanism after startup.
After exec, FDs 0/1/2/3 are the requests/stdout/stderr/log sockets the parent
placed there via dup2. The target to run is named in ``_AIRFLOW_CHILD_TARGET``
(``module:qualname``); it is rehydrated and handed to :func:`_fork_main`, which
sets up the structured log channel from FD 3 exactly as the bare-fork path does.
"""
# FDs 0, 1, 2 were dup2'd onto the socketpairs before exec.
child_requests = socket(fileno=0)
child_stdout = socket(fileno=1)
child_stderr = socket(fileno=2)

target = _resolve_child_target(os.environ.pop("_AIRFLOW_CHILD_TARGET", ""))

# _fork_main always exits via os._exit(), so the socket objects above are
# never GC'd (which would close their underlying FDs). This is safe but
# depends on that invariant -- do not refactor _fork_main to return.
#
# log_fd=0 tells _fork_main to skip structured log channel setup.
# Signal to the task runner to request it via ResendLoggingFD after startup.
os.environ["_AIRFLOW_FORK_EXEC"] = "1"
_fork_main(child_requests, child_stdout, child_stderr, 0, _subprocess_main)
# FD 3 is the inherited structured-log socket; _fork_main configures logging
# over it just as it would with the FD inherited across a bare fork.
_fork_main(child_requests, child_stdout, child_stderr, 3, target)


class NeverRaised(Exception):
Expand Down Expand Up @@ -669,15 +691,14 @@ def start(
:param use_exec: If True, on platforms that need it (currently macOS),
immediately ``os.execv`` a fresh Python interpreter after ``os.fork``.
This avoids macOS fork-safety issues with Objective-C frameworks.
Task execution opts in; DAG processor and triggerer do not.

The exec'd child always runs ``_subprocess_main``, so ``use_exec=True``
is only valid when ``target is _subprocess_main``.
``target`` is rehydrated in the exec'd child from its ``module:qualname``,
so any importable entry point (task execution, DAG processor, triggerer)
is supported.
"""
if use_exec and target is not _subprocess_main:
raise ValueError(
f"use_exec=True is only supported with target=_subprocess_main; got target={target!r}"
)
if use_exec and "<" in getattr(target, "__qualname__", "<"):
# Closures/lambdas (``<locals>`` / ``<lambda>`` in the qualname) and
# objects without a qualname can't be named for the exec'd child.
raise ValueError(f"use_exec=True requires a top-level importable target, got {target!r}")
# Create socketpairs/"pipes" to connect to the stdin and out from the subprocess
child_stdout, read_stdout = socketpair()
child_stderr, read_stderr = socketpair()
Expand All @@ -700,14 +721,22 @@ def start(

try:
if use_exec:
# macOS: exec a fresh Python interpreter to replace the
# inherited ObjC/CoreFoundation state that is not fork-safe.
# dup2 copies the socketpairs onto FDs 0/1/2; os.dup2 clears
# FD_CLOEXEC on the destination FDs, so they survive exec.
# The log channel is requested later via ResendLoggingFD.
# macOS: exec a fresh Python interpreter to drop the inherited
# ObjC/CoreFoundation state that is not fork-safe. Redirect the
# socketpairs onto the fixed FDs the exec'd child reconstructs:
# 0 (requests/stdin), 1 (stdout), 2 (stderr), 3 (structured logs).
# The source fds are always >= 3 (0/1/2 stay open in every launch
# path), so no dup2 clobbers a not-yet-placed source. set_inheritable
# guarantees FD_CLOEXEC is clear on all four (dup2 leaves it set when
# a source already equals its target), so they survive execv. The
# entry point is passed to the child by name.
os.environ["_AIRFLOW_CHILD_TARGET"] = f"{target.__module__}:{target.__qualname__}"
os.dup2(child_requests.fileno(), 0)
os.dup2(child_stdout.fileno(), 1)
os.dup2(child_stderr.fileno(), 2)
os.dup2(child_logs.fileno(), 3)
for fd in (0, 1, 2, 3):
os.set_inheritable(fd, True)
os.execv(
sys.executable,
[
Expand Down Expand Up @@ -1329,7 +1358,7 @@ def start( # type: ignore[override]
# Opt in to fork+exec on platforms that need it (currently macOS).
# Tests override `target` with a local stub to exercise the base
# infrastructure; keep bare fork for those.
use_exec = target is _subprocess_main and sys.platform in _FORK_EXEC_PLATFORMS
use_exec = target is _subprocess_main and _should_use_exec()
proc: Self = super().start(
id=what.id, client=client, target=target, logger=logger, use_exec=use_exec, **kwargs
)
Expand Down
7 changes: 0 additions & 7 deletions task-sdk/src/airflow/sdk/execution_time/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -2304,13 +2304,6 @@ def main():
log.info("::group::Pre Execute")
startup_details = get_startup_details()

# On macOS fork+exec path, the structured log channel wasn't
# inherited (exec replaces the address space). Request it from
# the supervisor using the existing ResendLoggingFD mechanism.
# Must happen after get_startup_details() so we don't read the
# startup message as a ResendLoggingFD response.
if os.environ.pop("_AIRFLOW_FORK_EXEC", None) == "1":
reinit_supervisor_comms()
span_ctx_mgr = _make_task_span(msg=startup_details)
span = stack.enter_context(span_ctx_mgr)
ti, context, log = startup(msg=startup_details)
Expand Down
Loading
Loading