Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
container_is_completed,
container_is_running,
)
from airflow.providers.cncf.kubernetes.utils.pod_manager import PodNotFoundException
from airflow.providers.common.compat.connection import get_async_connection
from airflow.providers.common.compat.sdk import AirflowException, AirflowNotFoundException, BaseHook
from airflow.utils import yaml
Expand Down Expand Up @@ -1065,25 +1066,51 @@ async def get_conn(self) -> AsyncGenerator[async_client.ApiClient, None]:
await kube_client.close()

@generic_api_retry
async def get_pod(self, name: str, namespace: str) -> V1Pod:
async def get_pod(self, name: str, namespace: str, *, pod: V1Pod | None = None) -> V1Pod:
"""
Get pod's object.

:param name: Name of the pod.
:param namespace: Name of the pod's namespace.
:param pod: The last known pod object (optional), used to check if the pod was running.
"""
async with self.get_conn() as connection:
try:
v1_api = async_client.CoreV1Api(connection)
pod: V1Pod = await v1_api.read_namespaced_pod(
name=name,
namespace=namespace,
)
return pod
except HTTPError as e:
if hasattr(e, "status") and e.status == 403:
raise KubernetesApiPermissionError("Permission denied (403) from Kubernetes API.") from e
raise KubernetesApiError from e
v1_api = async_client.CoreV1Api(connection)
retries = 3
delay = 2
for attempt in range(retries + 1):
try:
current_pod: V1Pod = await v1_api.read_namespaced_pod(
name=name,
namespace=namespace,
)
return current_pod
except async_client.ApiException as e:
if e.status == 404:
was_running = (
pod and pod.status and pod.status.phase and pod.status.phase != "Pending"
)
if attempt < retries and not was_running:
self.log.info(
"Pod '%s' not found in namespace '%s'. Retrying in %s seconds...",
name,
namespace,
delay,
)
await asyncio.sleep(delay)
delay *= 2
continue
raise PodNotFoundException(
f"Pod '{name}' not found in namespace '{namespace}'. "
f"This may be caused by pod preemption (e.g., by higher-priority daemonset pods)."
) from e
raise
except HTTPError as e:
if hasattr(e, "status") and e.status == 403:
raise KubernetesApiPermissionError(
"Permission denied (403) from Kubernetes API."
) from e
raise KubernetesApiError from e

@generic_api_retry
async def delete_pod(self, name: str, namespace: str, grace_period_seconds: int | None = None):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ def __init__(
self.trigger_kwargs = trigger_kwargs or {}
self._fired_event = False
self._since_time = None
self.last_pod: V1Pod | None = None

def serialize(self) -> tuple[str, dict[str, Any]]:
"""Serialize KubernetesCreatePodTrigger arguments and classpath."""
Expand Down Expand Up @@ -416,7 +417,8 @@ async def _wait_for_container_completion(self) -> TriggerEvent:
@tenacity.retry(stop=tenacity.stop_after_attempt(3), wait=tenacity.wait_exponential(), reraise=True)
async def _get_pod(self) -> V1Pod:
"""Get the pod from Kubernetes with retries."""
pod = await self.hook.get_pod(name=self.pod_name, namespace=self.pod_namespace)
pod = await self.hook.get_pod(name=self.pod_name, namespace=self.pod_namespace, pod=self.last_pod)
self.last_pod = pod
# Due to AsyncKubernetesHook overriding get_pod, we need to cast the return
# value to kubernetes_asyncio.V1Pod, because it's perceived as different type
return cast("V1Pod", pod)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -941,10 +941,31 @@ def read_pod_events(self, pod: V1Pod, resource_version: str | None = None) -> Co
@generic_api_retry
def read_pod(self, pod: V1Pod) -> V1Pod:
"""Read POD information."""
try:
return self._client.read_namespaced_pod(pod.metadata.name, pod.metadata.namespace)
except HTTPError as e:
raise KubernetesApiException(f"There was an error reading the kubernetes API: {e}")
retries = 3
delay = 2
for attempt in range(retries + 1):
try:
return self._client.read_namespaced_pod(pod.metadata.name, pod.metadata.namespace)
except ApiException as e:
if e.status == 404:
was_running = pod.status and pod.status.phase and pod.status.phase != "Pending"
if attempt < retries and not was_running:
self.log.info(
"Pod '%s' not found in namespace '%s'. Retrying in %s seconds...",
pod.metadata.name,
pod.metadata.namespace,
delay,
)
time.sleep(delay)
delay *= 2
continue
raise PodNotFoundException(
f"Pod '{pod.metadata.name}' not found in namespace '{pod.metadata.namespace}'. "
f"This may be caused by pod preemption (e.g., by higher-priority daemonset pods)."
) from e
raise
except HTTPError as e:
raise KubernetesApiException(f"There was an error reading the kubernetes API: {e}")

def await_xcom_sidecar_container_start(
self, pod: V1Pod, timeout: int = 900, log_interval: int = 30
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
API_TIMEOUT,
API_TIMEOUT_OFFSET_SERVER_SIDE,
)
from airflow.providers.cncf.kubernetes.utils.pod_manager import PodNotFoundException
from airflow.providers.common.compat.sdk import AirflowException, AirflowNotFoundException

from tests_common.test_utils.db import clear_test_connections
Expand Down Expand Up @@ -1741,6 +1742,60 @@ async def test_get_pod(self, lib_method, kube_config_loader):
namespace=NAMESPACE,
)

@pytest.mark.asyncio
@mock.patch("asyncio.sleep")
@mock.patch(KUBE_API.format("read_namespaced_pod"))
async def test_get_pod_raises_pod_not_found_on_404(self, lib_method, mock_sleep, kube_config_loader):
"""When the K8s API returns 404 (pod preempted/deleted), raise PodNotFoundException after retries."""
lib_method.side_effect = async_client.ApiException(status=404, reason="Not Found")

hook = AsyncKubernetesHook(
conn_id=None,
in_cluster=False,
config_file=None,
cluster_context=None,
)

with pytest.raises(PodNotFoundException, match="not found"):
await hook.get_pod(
name=POD_NAME,
namespace=NAMESPACE,
)

# Verify 3 retries (4 calls total)
assert lib_method.call_count == 4
# Verify sleep times: 2s, 4s, 8s
mock_sleep.assert_has_calls([mock.call(2), mock.call(4), mock.call(8)])

@pytest.mark.asyncio
@mock.patch("asyncio.sleep")
@mock.patch(KUBE_API.format("read_namespaced_pod"))
async def test_get_pod_raises_pod_not_found_on_404_no_retry_if_running(
self, lib_method, mock_sleep, kube_config_loader
):
"""When the pod was previously running, raise PodNotFoundException immediately on 404 without retries."""
lib_method.side_effect = async_client.ApiException(status=404, reason="Not Found")

hook = AsyncKubernetesHook(
conn_id=None,
in_cluster=False,
config_file=None,
cluster_context=None,
)

mock_pod = mock.MagicMock()
mock_pod.status.phase = "Running"

with pytest.raises(PodNotFoundException, match="not found"):
await hook.get_pod(
name=POD_NAME,
namespace=NAMESPACE,
pod=mock_pod,
)

assert lib_method.call_count == 1
mock_sleep.assert_not_called()

@pytest.mark.asyncio
@mock.patch(KUBE_API.format("delete_namespaced_pod"))
async def test_delete_pod(self, lib_method, kube_config_loader):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
AsyncPodManager,
PodLogsConsumer,
PodManager,
PodNotFoundException,
PodPhase,
XComRetrievalError,
_parse_log_level,
Expand Down Expand Up @@ -709,6 +710,39 @@ def test_read_pod_retries_fails(self):
with pytest.raises(AirflowException):
self.pod_manager.read_pod(mock.sentinel)

@mock.patch("time.sleep")
def test_read_pod_raises_pod_not_found_on_404(self, mock_sleep):
"""When the K8s API returns 404 (pod preempted/deleted), raise PodNotFoundException."""
mock.sentinel.metadata = mock.MagicMock()
self.mock_kube_client.read_namespaced_pod.side_effect = ApiException(status=404, reason="Not Found")
with pytest.raises(PodNotFoundException, match="not found"):
self.pod_manager.read_pod(mock.sentinel)

# Verify 3 retries (4 calls total)
assert self.mock_kube_client.read_namespaced_pod.call_count == 4
# Verify sleep times: 2s, 4s, 8s
mock_sleep.assert_has_calls([mock.call(2), mock.call(4), mock.call(8)])

@mock.patch("time.sleep")
def test_read_pod_raises_pod_not_found_on_404_no_retry_if_running(self, mock_sleep):
"""When the K8s API returns 404 but pod was previously running, raise PodNotFoundException immediately."""
mock.sentinel.metadata = mock.MagicMock()
mock.sentinel.status = mock.MagicMock(phase="Running")
self.mock_kube_client.read_namespaced_pod.side_effect = ApiException(status=404, reason="Not Found")

with pytest.raises(PodNotFoundException, match="not found"):
self.pod_manager.read_pod(mock.sentinel)

assert self.mock_kube_client.read_namespaced_pod.call_count == 1
mock_sleep.assert_not_called()

def test_read_pod_reraises_non_404_api_exception(self):
"""Non-404 ApiException errors that are not transient should still propagate."""
mock.sentinel.metadata = mock.MagicMock()
self.mock_kube_client.read_namespaced_pod.side_effect = ApiException(status=403, reason="Forbidden")
with pytest.raises(ApiException):
self.pod_manager.read_pod(mock.sentinel)

@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.container_is_running")
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.PodManager.read_pod_logs")
def test_fetch_container_logs_returning_last_timestamp(
Expand Down Expand Up @@ -1451,6 +1485,20 @@ async def test_read_pod_events_without_resource_version(self):
"test-pod", "test-namespace", resource_version=None
)

@pytest.mark.asyncio
async def test_read_pod_raises_pod_not_found_on_404(self):
"""When the hook raises PodNotFoundException (404), it propagates from read_pod."""
mock_pod = mock.Mock()
mock_pod.metadata.namespace = "test-namespace"
mock_pod.metadata.name = "test-pod"

self.mock_async_hook.get_pod.side_effect = PodNotFoundException(
"Pod 'test-pod' not found in namespace 'test-namespace'."
)

with pytest.raises(PodNotFoundException, match="not found"):
await self.async_pod_manager.read_pod(mock_pod)

@pytest.mark.asyncio
async def test_watch_pod_events_uses_hook_watch(self):
"""Test that watch_pod_events uses hook's watch_pod_events method."""
Expand Down
Loading
Loading