Skip to content

Commit

Permalink
Fix side effect of not tearing down mongo hook in tests (#46291)
Browse files Browse the repository at this point in the history
The MongoClient apparently starts a separate thread that runs an
active loop that performs sleep every 0.5 second. MongoClient
is stored in the Hook and hooks/test_mongo.py stored the hook
as object in the test class, which means that the thread has not
been torn down and continued to run after mongo tests completed.

Subsequently any test that was mocking time.sleep (and skipping waiting)
had been spammed by mongo client thread calling sleep() -
potentially many times.

Earlier attempts to fix this had not succeeded - so they are reverted
now as part of this PR, as the original patching of sleep was much nicer
there.

Revert "Make  racy test test_start_pod_startup_interval_seconds less racy (#46282)"

This reverts commit 60f0abf.

Revert "Make azure test less flaky/racy (#46281)"

This reverts commit ae55e22.
  • Loading branch information
potiuk authored Jan 30, 2025
1 parent f8453ba commit a50d2e5
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 36 deletions.
3 changes: 3 additions & 0 deletions providers/mongo/src/airflow/providers/mongo/hooks/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ def get_conn(self) -> MongoClient:
self.client = MongoClient(self.uri, **options)
return self.client

def close(self):
self.client.close()

def _create_uri(self) -> str:
"""
Create URI string from the given credentials.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ def setup_method(self):
self.hook = MongoHookTest(mongo_conn_id="mongo_default")
self.conn = self.hook.get_conn()

def teardown_method(self):
self.conn.close()

def test_mongo_conn_id(self):
# Use default "mongo_default"
assert MongoHook().mongo_conn_id == "mongo_default"
Expand Down
27 changes: 11 additions & 16 deletions providers/tests/cncf/kubernetes/utils/test_pod_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,8 @@ def test_start_pod_raises_informative_error_on_timeout(self):
startup_timeout=0,
)

def test_start_pod_startup_interval_seconds(self):
@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.time.sleep")
def test_start_pod_startup_interval_seconds(self, mock_time_sleep):
pod_info_pending = mock.MagicMock(**{"status.phase": PodPhase.PENDING})
pod_info_succeeded = mock.MagicMock(**{"status.phase": PodPhase.SUCCEEDED})

Expand All @@ -414,21 +415,15 @@ def pod_state_gen():
while True:
yield pod_info_succeeded

import time

# Avoid race condition when we can run to a lot of sleeps when mock takes no time at all
original_time_sleep = time.sleep
with mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.time.sleep") as mock_time_sleep:
mock_time_sleep.side_effect = lambda _: original_time_sleep(0.2)
self.mock_kube_client.read_namespaced_pod.side_effect = pod_state_gen()
startup_check_interval = 10 # Any value is fine, as time.sleep is mocked to do almost nothing
mock_pod = MagicMock()
self.pod_manager.await_pod_start(
pod=mock_pod,
startup_timeout=60, # Never hit, any value is fine, as time.sleep is mocked to do nothing
startup_check_interval=startup_check_interval,
)
mock_time_sleep.assert_called_with(startup_check_interval)
self.mock_kube_client.read_namespaced_pod.side_effect = pod_state_gen()
startup_check_interval = 10 # Any value is fine, as time.sleep is mocked to do nothing
mock_pod = MagicMock()
self.pod_manager.await_pod_start(
pod=mock_pod,
startup_timeout=60, # Never hit, any value is fine, as time.sleep is mocked to do nothing
startup_check_interval=startup_check_interval,
)
mock_time_sleep.assert_called_with(startup_check_interval)
assert mock_time_sleep.call_count == 2

@mock.patch("airflow.providers.cncf.kubernetes.utils.pod_manager.container_is_running")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,30 +453,24 @@ def test_execute_fails_with_incorrect_restart_policy(self, aci_mock):
)

@mock.patch("airflow.providers.microsoft.azure.operators.container_instances.AzureContainerInstanceHook")
def test_execute_correct_sleep_cycle(self, aci_mock):
@mock.patch("airflow.providers.microsoft.azure.operators.container_instances.time.sleep")
def test_execute_correct_sleep_cycle(self, sleep_mock, aci_mock):
expected_cg1 = make_mock_container(state="Running", exit_code=0, detail_status="test")
expected_cg2 = make_mock_container(state="Terminated", exit_code=0, detail_status="test")

import time

original_time_sleep = time.sleep
with mock.patch(
"airflow.providers.microsoft.azure.operators.container_instances.time.sleep"
) as sleep_mock:
sleep_mock.side_effect = lambda _: original_time_sleep(0.1)
aci_mock.return_value.get_state.side_effect = [expected_cg1, expected_cg1, expected_cg2]
aci_mock.return_value.exists.return_value = False
aci_mock.return_value.get_state.side_effect = [expected_cg1, expected_cg1, expected_cg2]
aci_mock.return_value.exists.return_value = False

aci = AzureContainerInstancesOperator(
ci_conn_id=None,
registry_conn_id=None,
resource_group="resource-group",
name="container-name",
image="container-image",
region="region",
task_id="task",
)
aci.execute(None)
aci = AzureContainerInstancesOperator(
ci_conn_id=None,
registry_conn_id=None,
resource_group="resource-group",
name="container-name",
image="container-image",
region="region",
task_id="task",
)
aci.execute(None)

# sleep is called at the end of cycles. Thus, the Terminated call does not trigger sleep
assert sleep_mock.call_count == 2
Expand Down

0 comments on commit a50d2e5

Please sign in to comment.