Skip to content
Draft
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
4987c26
Fix Git bundle submodule fetch not using SSH settings from connection
shunsuke-sugita Apr 8, 2026
46f66a4
Merge branch 'main' into provider-git-fix-fetch-submodul
shunsuke-sugita Apr 9, 2026
fd68b54
Rename cm to ssh_env_cm in _fetch_bare_repo and _fetch_submodules.
shunsuke-sugita Apr 12, 2026
7e67b4c
Test GitDagBundle._fetch_submodules applies GIT_SSH_COMMAND via custo…
shunsuke-sugita Apr 12, 2026
ffb5e5a
Merge branch 'main' into provider-git-fix-fetch-submodul
shunsuke-sugita Apr 12, 2026
799e084
Merge branch 'main' into provider-git-fix-fetch-submodul
shunsuke-sugita Apr 15, 2026
21098f9
Merge branch 'main' into provider-git-fix-fetch-submodul
shunsuke-sugita Apr 17, 2026
6346cb2
Use autospec for Git/Repo mocks in submodule fetch unit tests
shunsuke-sugita Apr 17, 2026
2dcfd1d
Drop redundant mock_calls assertions in submodule fetch test
shunsuke-sugita Apr 20, 2026
fe31907
GitDagBundle: submodule custom_environment uses full hook.env
shunsuke-sugita Apr 20, 2026
62d9068
Merge branch 'main' into provider-git-fix-fetch-submodul
shunsuke-sugita Apr 20, 2026
6d27243
Rename submodule test for empty hook.env (custom_environment skip)
shunsuke-sugita Apr 20, 2026
4f0d7fc
Update providers/git/tests/unit/git/bundles/test_git.py
shunsuke-sugita Apr 20, 2026
9b6ca63
rm airflow
shunsuke-sugita Apr 20, 2026
76fd8ca
Add missing blank line between submodule fetch tests
shunsuke-sugita Apr 20, 2026
51f14ca
Fix submodule test call-order assertion (use mock_git.mock_calls, not…
shunsuke-sugita Apr 20, 2026
fa3f022
fix test
shunsuke-sugita Apr 22, 2026
3ccbaf5
Merge branch 'main' into provider-git-fix-fetch-submodul
shunsuke-sugita Apr 22, 2026
b8084ed
Fix git bundle and Celery executor unit tests failing in provider CI
shunsuke-sugita Apr 22, 2026
33def75
Fix git bundle and Celery executor unit tests for provider CI
shunsuke-sugita Apr 22, 2026
ce418f4
Merge branch 'main' into provider-git-fix-fetch-submodul
shunsuke-sugita Apr 22, 2026
76cfd3a
Revert celery executor test changes (keep Git bundle PR scoped)
shunsuke-sugita Apr 22, 2026
996de12
Merge branch 'main' into provider-git-fix-fetch-submodul
shunsuke-sugita Apr 22, 2026
cb599d8
Merge branch 'main' into provider-git-fix-fetch-submodul
shunsuke-sugita May 7, 2026
6b1cc87
undo test_celery_executor fix
shunsuke-sugita May 7, 2026
0b0c77a
Merge branch 'main' into provider-git-fix-fetch-submodul
shunsuke-sugita May 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,10 @@ class MockWorkload:
here because it's not picklable.
"""

def model_dump_json(self) -> str:
"""``send_workload_to_executor`` serializes v3+ workloads with Pydantic (``BaseWorkload``)."""
return "{}"

def apply_async(self, *args, **kwargs):
return 1

Expand Down Expand Up @@ -430,13 +434,22 @@ def test_send_workloads_to_celery_hang(register_signals):
executor = celery_executor.CeleryExecutor()

workload = MockWorkload()
workload_tuples_to_send = [(None, None, None, workload) for _ in range(26)]
# ``WorkloadInCelery`` is (key, workload, queue, team_name); the 4th field must be a
# str | None team name, not the workload (regression after team / ExecutorConf support).
workload_tuples_to_send = [(None, workload, None, None) for _ in range(26)]

for _ in range(250):
# This loop can hang on Linux if celery_executor does something wrong with
# multiprocessing.
results = executor._send_workloads_to_celery(workload_tuples_to_send)
assert results == [(None, None, 1) for _ in workload_tuples_to_send]
assert len(results) == len(workload_tuples_to_send)
for key, sent_args, result in results:
assert key is None
if AIRFLOW_V_3_0_PLUS:
assert sent_args == (workload.model_dump_json(),)
else:
assert sent_args == [workload]
assert not isinstance(result, celery_executor_utils.ExceptionWithTraceback)
Comment thread
shunsuke-sugita marked this conversation as resolved.
Outdated


@conf_vars({("celery", "result_backend"): "rediss://test_user:test_password@localhost:6379/0"})
Expand Down
19 changes: 13 additions & 6 deletions providers/git/src/airflow/providers/git/bundles/git.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,10 +287,10 @@ def _has_version(repo: Repo, version: str) -> bool:

def _fetch_bare_repo(self):
refspecs = ["+refs/heads/*:refs/heads/*", "+refs/tags/*:refs/tags/*"]
cm = nullcontext()
ssh_env_cm = nullcontext()
if self.hook and (cmd := self.hook.env.get("GIT_SSH_COMMAND")):
cm = self.bare_repo.git.custom_environment(GIT_SSH_COMMAND=cmd)
with cm:
ssh_env_cm = self.bare_repo.git.custom_environment(GIT_SSH_COMMAND=cmd)
with ssh_env_cm:
self.bare_repo.remotes.origin.fetch(refspecs)
self.bare_repo.close()

Expand All @@ -300,9 +300,16 @@ def _fetch_bare_repo(self):
reraise=True,
)
def _fetch_submodules(self) -> None:
self._log.info("Initializing and updating submodules", repo_path=self.repo_path)
self.repo.git.submodule("sync", "--recursive")
self.repo.git.submodule("update", "--init", "--recursive", "--jobs", "1")
# Forward full hook.env: submodule subprocesses need SSH_ASKPASS/DISPLAY/etc., not only
# GIT_SSH_COMMAND (passphrase keys); do not rely on the outer configure_hook_env os.environ alone.
hook_env = getattr(self.hook, "env", None) if self.hook else None
ssh_env_cm = nullcontext()
if isinstance(hook_env, dict) and hook_env:
ssh_env_cm = self.repo.git.custom_environment(**hook_env)
with ssh_env_cm:
self._log.info("Initializing and updating submodules", repo_path=self.repo_path)
self.repo.git.submodule("sync", "--recursive")
self.repo.git.submodule("update", "--init", "--recursive", "--jobs", "1")

def refresh(self) -> None:
if self.version:
Expand Down
110 changes: 110 additions & 0 deletions providers/git/tests/unit/git/bundles/test_git.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,32 @@ def bundle_temp_dir(tmp_path):

GIT_DEFAULT_BRANCH = "main"


class _GitCustomEnvContextManager:
"""Minimal context manager surface for Git.custom_environment() (autospec target)."""

def __enter__(self) -> None:
pass

def __exit__(self, exc_type, exc_val, exc_tb) -> None:
pass


# ``_fetch_submodules`` only uses ``repo.git.custom_environment`` and ``repo.git.submodule``.
# Use ``types.SimpleNamespace`` for ``bundle.repo`` / ``repo.git`` so ``.git`` / ``.submodule``
# are plain instance attributes — **not** MagicMock children (nested ``Repo``/``Git`` mocks
# in unittest can lack ``submodule`` on lowest-dep / CI runs).


def _repo_and_git_stubs_for_submodule_tests(ssh_ctx_cm):
"""Return (repo_stub, git_cmd_stub) for ``GitDagBundle._fetch_submodules`` unit tests."""
git_cmd = types.SimpleNamespace(
custom_environment=mock.MagicMock(return_value=ssh_ctx_cm),
submodule=mock.MagicMock(),
)
Comment thread
shunsuke-sugita marked this conversation as resolved.
return types.SimpleNamespace(git=git_cmd), git_cmd


AIRFLOW_HTTPS_URL = "https://github.com/apache/airflow.git"
ACCESS_TOKEN = "my_access_token"
CONN_HTTPS = "my_git_conn"
Expand Down Expand Up @@ -920,6 +946,90 @@ def test_clone_bare_repo_invalid_repository_error_retry_fails(
# Verify Repo was called twice (failed attempt + failed retry)
assert mock_repo_class.call_count == 2

@mock.patch("airflow.providers.git.bundles.git.GitHook")
def test_fetch_submodules_uses_custom_environment_when_git_ssh_command_set(self, mock_githook_class):
"""Non-empty hook.env is passed to custom_environment; submodule runs inside that context."""
expected_ssh_cmd = "ssh -i /path/key -o IdentitiesOnly=yes -o UserKnownHostsFile=/path/known_hosts"
mock_hook = mock_githook_class.return_value
mock_hook.repo_url = "git@github.com:apache/airflow.git"
mock_hook.env = {"GIT_SSH_COMMAND": expected_ssh_cmd}

ssh_ctx = mock.create_autospec(_GitCustomEnvContextManager, instance=True)

bundle = GitDagBundle(
name="test",
git_conn_id="git_default",
tracking_ref="main",
version="123456",
submodules=True,
)
bundle.repo, stub_git = _repo_and_git_stubs_for_submodule_tests(ssh_ctx)

bundle._fetch_submodules()

stub_git.custom_environment.assert_called_once_with(GIT_SSH_COMMAND=expected_ssh_cmd)
ssh_ctx.__enter__.assert_called_once_with()
ssh_ctx.__exit__.assert_called_once_with(None, None, None)
stub_git.submodule.assert_has_calls(
[
mock.call("sync", "--recursive"),
mock.call("update", "--init", "--recursive", "--jobs", "1"),
]
)

@mock.patch("airflow.providers.git.bundles.git.GitHook")
def test_fetch_submodules_forwards_full_hook_env_including_passphrase_vars(self, mock_githook_class):
"""SSH_ASKPASS / DISPLAY etc. must reach git subprocesses, not only GIT_SSH_COMMAND."""
full_hook_env = {
"GIT_SSH_COMMAND": "ssh -i /path/key -o IdentitiesOnly=yes",
"SSH_ASKPASS": "/tmp/airflow-git-askpass.sh",
"SSH_ASKPASS_REQUIRE": "force",
"DISPLAY": ":0",
}
mock_hook = mock_githook_class.return_value
mock_hook.repo_url = "git@github.com:apache/airflow.git"
mock_hook.env = full_hook_env

ssh_ctx = mock.create_autospec(_GitCustomEnvContextManager, instance=True)

bundle = GitDagBundle(
name="test",
git_conn_id="git_default",
tracking_ref="main",
version="123456",
submodules=True,
)
bundle.repo, stub_git = _repo_and_git_stubs_for_submodule_tests(ssh_ctx)

bundle._fetch_submodules()

stub_git.custom_environment.assert_called_once_with(**full_hook_env)

@mock.patch("airflow.providers.git.bundles.git.GitHook")
def test_fetch_submodules_skips_custom_environment_when_hook_env_empty(self, mock_githook_class):
"""When hook.env is empty, submodule update does not use custom_environment."""
mock_hook = mock_githook_class.return_value
mock_hook.repo_url = "git@github.com:apache/airflow.git"
mock_hook.env = {}

ssh_ctx = mock.create_autospec(_GitCustomEnvContextManager, instance=True)

bundle = GitDagBundle(
name="test",
git_conn_id="git_default",
tracking_ref="main",
version="123456",
submodules=True,
)
bundle.repo, stub_git = _repo_and_git_stubs_for_submodule_tests(ssh_ctx)

bundle._fetch_submodules()

stub_git.custom_environment.assert_not_called()
stub_git.submodule.assert_has_calls(
[mock.call("sync", "--recursive"), mock.call("update", "--init", "--recursive", "--jobs", "1")]
)

@mock.patch("airflow.providers.git.bundles.git.shutil.rmtree")
@mock.patch("airflow.providers.git.bundles.git.os.path.exists")
@mock.patch("airflow.providers.git.bundles.git.GitHook")
Expand Down
Loading