Skip to content
Open
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
4987c26
Fix Git bundle submodule fetch not using SSH settings from connection
shunsuke-sugita Apr 8, 2026
46f66a4
Merge branch 'main' into provider-git-fix-fetch-submodul
shunsuke-sugita Apr 9, 2026
fd68b54
Rename cm to ssh_env_cm in _fetch_bare_repo and _fetch_submodules.
shunsuke-sugita Apr 12, 2026
7e67b4c
Test GitDagBundle._fetch_submodules applies GIT_SSH_COMMAND via custo…
shunsuke-sugita Apr 12, 2026
ffb5e5a
Merge branch 'main' into provider-git-fix-fetch-submodul
shunsuke-sugita Apr 12, 2026
799e084
Merge branch 'main' into provider-git-fix-fetch-submodul
shunsuke-sugita Apr 15, 2026
21098f9
Merge branch 'main' into provider-git-fix-fetch-submodul
shunsuke-sugita Apr 17, 2026
6346cb2
Use autospec for Git/Repo mocks in submodule fetch unit tests
shunsuke-sugita Apr 17, 2026
2dcfd1d
Drop redundant mock_calls assertions in submodule fetch test
shunsuke-sugita Apr 20, 2026
fe31907
GitDagBundle: submodule custom_environment uses full hook.env
shunsuke-sugita Apr 20, 2026
62d9068
Merge branch 'main' into provider-git-fix-fetch-submodul
shunsuke-sugita Apr 20, 2026
6d27243
Rename submodule test for empty hook.env (custom_environment skip)
shunsuke-sugita Apr 20, 2026
4f0d7fc
Update providers/git/tests/unit/git/bundles/test_git.py
shunsuke-sugita Apr 20, 2026
9b6ca63
rm airflow
shunsuke-sugita Apr 20, 2026
76fd8ca
Add missing blank line between submodule fetch tests
shunsuke-sugita Apr 20, 2026
51f14ca
Fix submodule test call-order assertion (use mock_git.mock_calls, not…
shunsuke-sugita Apr 20, 2026
fa3f022
fix test
shunsuke-sugita Apr 22, 2026
3ccbaf5
Merge branch 'main' into provider-git-fix-fetch-submodul
shunsuke-sugita Apr 22, 2026
b8084ed
Fix git bundle and Celery executor unit tests failing in provider CI
shunsuke-sugita Apr 22, 2026
33def75
Fix git bundle and Celery executor unit tests for provider CI
shunsuke-sugita Apr 22, 2026
ce418f4
Merge branch 'main' into provider-git-fix-fetch-submodul
shunsuke-sugita Apr 22, 2026
76cfd3a
Revert celery executor test changes (keep Git bundle PR scoped)
shunsuke-sugita Apr 22, 2026
996de12
Merge branch 'main' into provider-git-fix-fetch-submodul
shunsuke-sugita Apr 22, 2026
cb599d8
Merge branch 'main' into provider-git-fix-fetch-submodul
shunsuke-sugita May 7, 2026
6b1cc87
undo test_celery_executor fix
shunsuke-sugita May 7, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 2 additions & 136 deletions providers/celery/tests/unit/celery/executors/test_celery_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
from kombu.asynchronous import set_event_loop

from airflow.exceptions import AirflowProviderDeprecationWarning
from airflow.executors.base_executor import BaseExecutor
from airflow.models.dag import DAG
from airflow.models.taskinstance import TaskInstance, TaskInstanceKey
from airflow.providers.celery.executors import celery_executor, celery_executor_utils, default_celery
Expand Down Expand Up @@ -774,10 +773,6 @@ def test_celery_tasks_registered_on_import():


@pytest.mark.skipif(not AIRFLOW_V_3_1_9_PLUS, reason="TaskAlreadyRunningError requires Airflow 3.1.9+")
@pytest.mark.skipif(
not hasattr(BaseExecutor, "run_workload"),
reason="BaseExecutor.run_workload not available in this Airflow version",
)
def test_execute_workload_ignores_already_running_task():
"""Test that execute_workload raises Celery Ignore when task is already running."""
import importlib
Expand All @@ -795,10 +790,10 @@ def test_execute_workload_ignores_already_running_task():
mock_app.current_task = mock_current_task

with (
mock.patch("airflow.executors.base_executor.BaseExecutor.run_workload") as mock_run_workload,
mock.patch("airflow.sdk.execution_time.supervisor.supervise") as mock_supervise,
mock.patch.object(celery_executor_utils, "app", mock_app),
):
mock_run_workload.side_effect = TaskAlreadyRunningError("Task already running")
mock_supervise.side_effect = TaskAlreadyRunningError("Task already running")

workload_json = """
{
Expand Down Expand Up @@ -877,64 +872,6 @@ def test_amqp_broker_url_still_builds_ssl_config(self):
assert broker_ssl["keyfile"] == "/path/to/key.pem"
assert broker_ssl["cert_reqs"] == ssl.CERT_REQUIRED

Comment thread
shunsuke-sugita marked this conversation as resolved.
@conf_vars(
{
("celery", "BROKER_URL"): "rediss://redis:6380//",
("celery", "SSL_ACTIVE"): "True",
("celery", "SSL_KEY"): "/path/to/key.pem",
("celery", "SSL_CERT"): "/path/to/cert.pem",
("celery", "SSL_CACERT"): "/path/to/ca.pem",
}
)
def test_redis_mutual_tls_builds_ssl_config(self):
"""Test mutual TLS: all three SSL keys produce correct broker_use_ssl for Redis."""
import importlib
import ssl

importlib.reload(default_celery)

config = default_celery.DEFAULT_CELERY_CONFIG
assert "broker_use_ssl" in config
broker_ssl = config["broker_use_ssl"]
assert broker_ssl["ssl_keyfile"] == "/path/to/key.pem"
assert broker_ssl["ssl_certfile"] == "/path/to/cert.pem"
assert broker_ssl["ssl_ca_certs"] == "/path/to/ca.pem"
assert broker_ssl["ssl_cert_reqs"] == ssl.CERT_REQUIRED

@conf_vars(
{
("celery", "BROKER_URL"): "amqps://guest:guest@rabbitmq:5671//",
("celery", "SSL_ACTIVE"): "True",
("celery", "SSL_CACERT"): "/path/to/ca.pem",
}
)
def test_amqps_mutual_tls_missing_key_cert_raises(self):
"""Test that mutual TLS (default) raises error when SSL_KEY/SSL_CERT are missing."""
import importlib

with pytest.raises(ValueError, match="SSL_MUTUAL_TLS is True.*but SSL_KEY and/or SSL_CERT"):
importlib.reload(default_celery)

@conf_vars(
{
("celery", "BROKER_URL"): "amqps://guest:guest@rabbitmq:5671//",
("celery", "SSL_ACTIVE"): "True",
("celery", "SSL_KEY"): "/path/to/key",
("celery", "SSL_CERT"): "/path/to/cert",
("celery", "SSL_CACERT"): "",
}
)
def test_ssl_active_without_cacert_uses_system_cas(self):
"""Test that empty SSL_CACERT falls back to system CAs (ca_certs omitted from config)."""
import importlib
import ssl

importlib.reload(default_celery)
broker_ssl = default_celery.DEFAULT_CELERY_CONFIG["broker_use_ssl"]

assert "ca_certs" not in broker_ssl
assert broker_ssl["cert_reqs"] == ssl.CERT_REQUIRED

@conf_vars(
{
("celery", "BROKER_URL"): "amqps://guest:guest@rabbitmq:5671//",
Expand All @@ -950,77 +887,6 @@ def test_amqps_broker_url_no_ssl_when_inactive(self):
config = default_celery.DEFAULT_CELERY_CONFIG
assert "broker_use_ssl" not in config

@conf_vars(
{
("celery", "BROKER_URL"): "amqps://guest:guest@rabbitmq:5671//",
("celery", "SSL_ACTIVE"): "True",
("celery", "SSL_MUTUAL_TLS"): "False",
("celery", "SSL_CACERT"): "/path/to/ca.pem",
}
)
def test_amqps_one_way_tls(self):
"""Test one-way TLS for AMQP: only ca_certs, no keyfile/certfile."""
import importlib
import ssl

importlib.reload(default_celery)

config = default_celery.DEFAULT_CELERY_CONFIG
assert "broker_use_ssl" in config
broker_ssl = config["broker_use_ssl"]
assert broker_ssl["ca_certs"] == "/path/to/ca.pem"
assert broker_ssl["cert_reqs"] == ssl.CERT_REQUIRED
assert "keyfile" not in broker_ssl
assert "certfile" not in broker_ssl

@conf_vars(
{
("celery", "BROKER_URL"): "rediss://redis:6380//",
("celery", "SSL_ACTIVE"): "True",
("celery", "SSL_MUTUAL_TLS"): "False",
("celery", "SSL_CACERT"): "/path/to/ca.pem",
}
)
def test_redis_one_way_tls(self):
"""Test one-way TLS for Redis: only ssl_ca_certs, no ssl_keyfile/ssl_certfile."""
import importlib
import ssl

importlib.reload(default_celery)

config = default_celery.DEFAULT_CELERY_CONFIG
assert "broker_use_ssl" in config
broker_ssl = config["broker_use_ssl"]
assert broker_ssl["ssl_ca_certs"] == "/path/to/ca.pem"
assert broker_ssl["ssl_cert_reqs"] == ssl.CERT_REQUIRED
assert "ssl_keyfile" not in broker_ssl
assert "ssl_certfile" not in broker_ssl

@conf_vars(
{
("celery", "BROKER_URL"): "amqps://guest:guest@rabbitmq:5671//",
("celery", "SSL_ACTIVE"): "True",
("celery", "SSL_MUTUAL_TLS"): "False",
("celery", "SSL_KEY"): "/path/to/key.pem",
("celery", "SSL_CERT"): "/path/to/cert.pem",
("celery", "SSL_CACERT"): "/path/to/ca.pem",
}
)
def test_one_way_tls_ignores_key_cert(self):
"""Test that SSL_KEY/SSL_CERT are ignored when SSL_MUTUAL_TLS is False."""
import importlib
import ssl

importlib.reload(default_celery)

config = default_celery.DEFAULT_CELERY_CONFIG
assert "broker_use_ssl" in config
broker_ssl = config["broker_use_ssl"]
assert broker_ssl["ca_certs"] == "/path/to/ca.pem"
assert broker_ssl["cert_reqs"] == ssl.CERT_REQUIRED
assert "keyfile" not in broker_ssl
assert "certfile" not in broker_ssl


class TestCreateCeleryAppTeamIsolation:
"""Tests for create_celery_app() multi-team config isolation."""
Expand Down
19 changes: 13 additions & 6 deletions providers/git/src/airflow/providers/git/bundles/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,10 +287,10 @@ def _has_version(repo: Repo, version: str) -> bool:

def _fetch_bare_repo(self):
refspecs = ["+refs/heads/*:refs/heads/*", "+refs/tags/*:refs/tags/*"]
cm = nullcontext()
ssh_env_cm = nullcontext()
if self.hook and (cmd := self.hook.env.get("GIT_SSH_COMMAND")):
cm = self.bare_repo.git.custom_environment(GIT_SSH_COMMAND=cmd)
with cm:
ssh_env_cm = self.bare_repo.git.custom_environment(GIT_SSH_COMMAND=cmd)
with ssh_env_cm:
self.bare_repo.remotes.origin.fetch(refspecs)
self.bare_repo.close()

Expand All @@ -300,9 +300,16 @@ def _fetch_bare_repo(self):
reraise=True,
)
def _fetch_submodules(self) -> None:
self._log.info("Initializing and updating submodules", repo_path=self.repo_path)
self.repo.git.submodule("sync", "--recursive")
self.repo.git.submodule("update", "--init", "--recursive", "--jobs", "1")
# Forward full hook.env: submodule subprocesses need SSH_ASKPASS/DISPLAY/etc., not only
# GIT_SSH_COMMAND (passphrase keys); do not rely on the outer configure_hook_env os.environ alone.
hook_env = getattr(self.hook, "env", None) if self.hook else None
ssh_env_cm = nullcontext()
if isinstance(hook_env, dict) and hook_env:
ssh_env_cm = self.repo.git.custom_environment(**hook_env)
with ssh_env_cm:
self._log.info("Initializing and updating submodules", repo_path=self.repo_path)
self.repo.git.submodule("sync", "--recursive")
self.repo.git.submodule("update", "--init", "--recursive", "--jobs", "1")

def refresh(self) -> None:
if self.version:
Expand Down
125 changes: 125 additions & 0 deletions providers/git/tests/unit/git/bundles/test_git.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,47 @@ def bundle_temp_dir(tmp_path):

GIT_DEFAULT_BRANCH = "main"


class _GitCustomEnvContextManager:
"""Minimal context manager surface for Git.custom_environment() (autospec target)."""

def __enter__(self) -> None:
pass

def __exit__(self, exc_type, exc_val, exc_tb) -> None:
pass


def _stub_git_custom_environment(**_kwargs: object):
"""Stand-in for ``git.cmd.Git.custom_environment`` (``create_autospec`` target only)."""
... # Calls are intercepted by autospec mock; implementation is unused.


def _stub_git_submodule(*_args: object, **_kwargs: object) -> None:
"""Stand-in for GitPython submodule subcommand calls (``create_autospec`` target only)."""
...


# ``_fetch_submodules`` only uses ``repo.git.custom_environment`` and ``repo.git.submodule``.
# Use ``types.SimpleNamespace`` for ``bundle.repo`` / ``repo.git`` so ``.git`` / ``.submodule``
# are plain instance attributes — **not** MagicMock children (nested ``Repo``/``Git`` mocks
# in unittest can lack ``submodule`` on lowest-dep / CI runs).
# ``custom_environment`` / ``submodule`` use ``create_autospec`` on small stubs so typos drift
# from the GitPython-like surface are caught (Airflow test convention).


def _repo_and_git_stubs_for_submodule_tests(ssh_ctx_cm):
"""Return (repo_stub, git_cmd_stub) for ``GitDagBundle._fetch_submodules`` unit tests."""
custom_environment = mock.create_autospec(_stub_git_custom_environment, spec_set=True)
custom_environment.return_value = ssh_ctx_cm
submodule = mock.create_autospec(_stub_git_submodule, spec_set=True)
git_cmd = types.SimpleNamespace(
custom_environment=custom_environment,
submodule=submodule,
)
Comment thread
shunsuke-sugita marked this conversation as resolved.
return types.SimpleNamespace(git=git_cmd), git_cmd


AIRFLOW_HTTPS_URL = "https://github.com/apache/airflow.git"
ACCESS_TOKEN = "my_access_token"
CONN_HTTPS = "my_git_conn"
Expand Down Expand Up @@ -920,6 +961,90 @@ def test_clone_bare_repo_invalid_repository_error_retry_fails(
# Verify Repo was called twice (failed attempt + failed retry)
assert mock_repo_class.call_count == 2

@mock.patch("airflow.providers.git.bundles.git.GitHook")
def test_fetch_submodules_uses_custom_environment_when_git_ssh_command_set(self, mock_githook_class):
"""Non-empty hook.env is passed to custom_environment; submodule runs inside that context."""
expected_ssh_cmd = "ssh -i /path/key -o IdentitiesOnly=yes -o UserKnownHostsFile=/path/known_hosts"
mock_hook = mock_githook_class.return_value
mock_hook.repo_url = "[email protected]:apache/airflow.git"
mock_hook.env = {"GIT_SSH_COMMAND": expected_ssh_cmd}

ssh_ctx = mock.create_autospec(_GitCustomEnvContextManager, instance=True)

bundle = GitDagBundle(
name="test",
git_conn_id="git_default",
tracking_ref="main",
version="123456",
submodules=True,
)
bundle.repo, stub_git = _repo_and_git_stubs_for_submodule_tests(ssh_ctx)

bundle._fetch_submodules()

stub_git.custom_environment.assert_called_once_with(GIT_SSH_COMMAND=expected_ssh_cmd)
ssh_ctx.__enter__.assert_called_once_with()
ssh_ctx.__exit__.assert_called_once_with(None, None, None)
stub_git.submodule.assert_has_calls(
[
mock.call("sync", "--recursive"),
mock.call("update", "--init", "--recursive", "--jobs", "1"),
]
)

@mock.patch("airflow.providers.git.bundles.git.GitHook")
def test_fetch_submodules_forwards_full_hook_env_including_passphrase_vars(self, mock_githook_class):
"""SSH_ASKPASS / DISPLAY etc. must reach git subprocesses, not only GIT_SSH_COMMAND."""
full_hook_env = {
"GIT_SSH_COMMAND": "ssh -i /path/key -o IdentitiesOnly=yes",
"SSH_ASKPASS": "/tmp/airflow-git-askpass.sh",
"SSH_ASKPASS_REQUIRE": "force",
"DISPLAY": ":0",
}
mock_hook = mock_githook_class.return_value
mock_hook.repo_url = "[email protected]:apache/airflow.git"
mock_hook.env = full_hook_env

ssh_ctx = mock.create_autospec(_GitCustomEnvContextManager, instance=True)

bundle = GitDagBundle(
name="test",
git_conn_id="git_default",
tracking_ref="main",
version="123456",
submodules=True,
)
bundle.repo, stub_git = _repo_and_git_stubs_for_submodule_tests(ssh_ctx)

bundle._fetch_submodules()

stub_git.custom_environment.assert_called_once_with(**full_hook_env)

@mock.patch("airflow.providers.git.bundles.git.GitHook")
def test_fetch_submodules_skips_custom_environment_when_hook_env_empty(self, mock_githook_class):
"""When hook.env is empty, submodule update does not use custom_environment."""
mock_hook = mock_githook_class.return_value
mock_hook.repo_url = "[email protected]:apache/airflow.git"
mock_hook.env = {}

ssh_ctx = mock.create_autospec(_GitCustomEnvContextManager, instance=True)

bundle = GitDagBundle(
name="test",
git_conn_id="git_default",
tracking_ref="main",
version="123456",
submodules=True,
)
bundle.repo, stub_git = _repo_and_git_stubs_for_submodule_tests(ssh_ctx)

bundle._fetch_submodules()

stub_git.custom_environment.assert_not_called()
stub_git.submodule.assert_has_calls(
[mock.call("sync", "--recursive"), mock.call("update", "--init", "--recursive", "--jobs", "1")]
)

@mock.patch("airflow.providers.git.bundles.git.shutil.rmtree")
@mock.patch("airflow.providers.git.bundles.git.os.path.exists")
@mock.patch("airflow.providers.git.bundles.git.GitHook")
Expand Down
Loading