Skip to content

Commit cc833ad

Browse files
committed
fix(spark): [O11YINFRA-46] retry connection error to account for autodiscovery race condition
1 parent 52b2493 commit cc833ad

File tree

3 files changed

+117
-5
lines changed

3 files changed

+117
-5
lines changed

spark/CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,12 @@
22

33
<!-- towncrier release notes start -->
44

5+
## Unreleased
6+
7+
***Fixed***:
8+
9+
* Reduce false-positive Spark connection errors by debouncing only `pod_phase:succeeded` and alerting immediately for other phases ([#21922](https://github.com/DataDog/integrations-core/pull/21922))
10+
511
## 7.2.1 / 2025-10-31
612

713
***Fixed***:

spark/datadog_checks/spark/spark.py

Lines changed: 59 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ def __init__(self, name, init_config, instances):
8383
raise ConfigurationError('The cluster_name must be specified in the instance configuration')
8484

8585
self.master_address = self._get_master_address()
86+
self._connection_errors = 0
8687

8788
def check(self, _):
8889
tags = list(self.tags)
@@ -192,6 +193,8 @@ def _get_running_apps(self):
192193
def _collect_version(self, base_url, tags):
193194
try:
194195
version_json = self._rest_request_to_json(base_url, SPARK_VERSION_PATH, SPARK_SERVICE_CHECK, tags)
196+
if version_json is None:
197+
return False
195198
version = version_json['spark']
196199
except Exception as e:
197200
self.log.debug("Failed to collect version information: %s", e)
@@ -210,6 +213,9 @@ def _driver_init(self, tags):
210213
self.master_address, SPARK_APPS_PATH, SPARK_DRIVER_SERVICE_CHECK, tags
211214
)
212215

216+
if metrics_json is None:
217+
return running_apps
218+
213219
for app_json in metrics_json:
214220
app_id = app_json.get('id')
215221
app_name = app_json.get('name')
@@ -231,6 +237,9 @@ def _standalone_init(self, pre_20_mode, tags):
231237
self.master_address, SPARK_MASTER_STATE_PATH, SPARK_STANDALONE_SERVICE_CHECK, tags
232238
)
233239

240+
if metrics_json is None:
241+
return {}
242+
234243
running_apps = {}
235244
version_set = False
236245

@@ -251,10 +260,11 @@ def _standalone_init(self, pre_20_mode, tags):
251260
applist = self._rest_request_to_json(
252261
app_url, SPARK_APPS_PATH, SPARK_STANDALONE_SERVICE_CHECK, tags
253262
)
254-
for appl in applist:
255-
aid = appl.get('id')
256-
aname = appl.get('name')
257-
running_apps[aid] = (aname, app_url)
263+
if applist:
264+
for appl in applist:
265+
aid = appl.get('id')
266+
aname = appl.get('name')
267+
running_apps[aid] = (aname, app_url)
258268
else:
259269
running_apps[app_id] = (app_name, app_url)
260270
except Exception:
@@ -279,6 +289,9 @@ def _mesos_init(self, tags):
279289

280290
metrics_json = self._rest_request_to_json(self.master_address, MESOS_MASTER_APP_PATH, MESOS_SERVICE_CHECK, tags)
281291

292+
if metrics_json is None:
293+
return running_apps
294+
282295
if metrics_json.get('frameworks'):
283296
for app_json in metrics_json.get('frameworks'):
284297
app_id = app_json.get('id')
@@ -330,6 +343,9 @@ def _get_standalone_app_url(self, app_id, tags):
330343
self.master_address, SPARK_MASTER_APP_PATH, SPARK_STANDALONE_SERVICE_CHECK, tags, appId=app_id
331344
)
332345

346+
if app_page is None:
347+
return None
348+
333349
dom = BeautifulSoup(app_page.text, 'html.parser')
334350
app_detail_ui_links = dom.find_all('a', string='Application Detail UI')
335351

@@ -352,6 +368,9 @@ def _yarn_get_running_spark_apps(self, tags):
352368
applicationTypes=YARN_APPLICATION_TYPES,
353369
)
354370

371+
if metrics_json is None:
372+
return {}
373+
355374
running_apps = {}
356375

357376
if metrics_json.get('apps'):
@@ -379,6 +398,8 @@ def _get_spark_app_ids(self, running_apps, tags):
379398
if not version_set:
380399
version_set = self._collect_version(tracking_url, tags)
381400
response = self._rest_request_to_json(tracking_url, SPARK_APPS_PATH, SPARK_SERVICE_CHECK, tags)
401+
if response is None:
402+
continue
382403
except Exception as e:
383404
self.log.warning("Exception happened when fetching app ids for %s: %s", tracking_url, e)
384405
continue
@@ -405,6 +426,8 @@ def _describe_app(self, property, running_apps, addl_tags):
405426
response = self._rest_request(
406427
base_url, SPARK_APPS_PATH, SPARK_SERVICE_CHECK, addl_tags, app_id, property
407428
)
429+
if response is None:
430+
continue
408431
except HTTPError:
409432
self.log.debug("Got an error collecting %s", property, exc_info=True)
410433
continue
@@ -512,6 +535,8 @@ def _spark_structured_streams_metrics(self, running_apps, addl_tags):
512535
response = self._rest_request_to_json(
513536
base_url, self.metricsservlet_path, SPARK_SERVICE_CHECK, addl_tags
514537
)
538+
if response is None:
539+
continue
515540
self.log.debug('Structured streaming metrics: %s', response)
516541
response = {
517542
metric_name: v['value']
@@ -611,6 +636,10 @@ def _rest_request(self, url, object_path, service_name, tags, *args, **kwargs):
611636
self.log.debug('Spark check URL: %s', url)
612637
response = self.http.get(url, cookies=self.proxy_redirect_cookies)
613638
response.raise_for_status()
639+
640+
# Reset connection errors on success
641+
self._connection_errors = 0
642+
614643
content = response.text
615644
proxy_redirect_url = self._parse_proxy_redirect_url(content)
616645
if proxy_redirect_url:
@@ -633,6 +662,21 @@ def _rest_request(self, url, object_path, service_name, tags, *args, **kwargs):
633662
raise
634663

635664
except (HTTPError, InvalidURL, ConnectionError) as e:
665+
debounce_allowed = (
666+
isinstance(e, ConnectionError)
667+
and ("Connection refused" in str(e) or "No route to host" in str(e))
668+
and self._should_debounce_connection_error(tags)
669+
)
670+
if debounce_allowed:
671+
self._connection_errors += 1
672+
if self._connection_errors <= 1:
673+
self.log.warning(
674+
"Connection failed. Retrying next run to confirm if it's a transient error or shutdown. "
675+
"Error: %s",
676+
e,
677+
)
678+
return None
679+
636680
self.service_check(
637681
service_name,
638682
AgentCheck.CRITICAL,
@@ -654,6 +698,9 @@ def _rest_request_to_json(self, address, object_path, service_name, tags, *args,
654698
"""
655699
response = self._rest_request(address, object_path, service_name, tags, *args, **kwargs)
656700

701+
if response is None:
702+
return None
703+
657704
try:
658705
response_json = response.json()
659706

@@ -668,6 +715,14 @@ def _rest_request_to_json(self, address, object_path, service_name, tags, *args,
668715

669716
return response_json
670717

718+
def _should_debounce_connection_error(self, tags):
719+
"""Allow one debounce unless the pod phase tag says failed/unknown."""
720+
for tag in tags or []:
721+
if tag.startswith('pod_phase:'):
722+
phase = tag.split(':', 1)[1].strip().lower()
723+
return phase not in ('failed', 'unknown')
724+
return True
725+
671726
@classmethod
672727
def _join_url_dir(cls, url, *args):
673728
"""

spark/tests/test_spark.py

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
import mock
1313
import pytest
1414
import urllib3
15-
from requests import RequestException
15+
from requests import RequestException, ConnectionError
1616

1717
from datadog_checks.dev.http import MockResponse
1818
from datadog_checks.dev.utils import get_metadata_metrics
@@ -1465,3 +1465,54 @@ def test_integration_driver_2(aggregator, dd_run_check):
14651465
)
14661466
aggregator.assert_all_metrics_covered()
14671467
aggregator.assert_metrics_using_metadata(get_metadata_metrics())
1468+
1469+
1470+
@pytest.mark.unit
1471+
def test_debounce_connection_failure(aggregator, dd_run_check, caplog):
1472+
# Mock connection failure
1473+
def connection_failure_mock(session, url, *args, **kwargs):
1474+
raise ConnectionError("Connection refused")
1475+
1476+
instance = DRIVER_CONFIG.copy()
1477+
instance['tags'] = list(instance.get('tags', [])) + ['pod_phase:Running']
1478+
1479+
with mock.patch('requests.Session.get', side_effect=connection_failure_mock):
1480+
c = SparkCheck('spark', {}, [instance])
1481+
1482+
# First run: expect warning, no CRITICAL check
1483+
with caplog.at_level(logging.WARNING):
1484+
dd_run_check(c)
1485+
1486+
assert "Connection failed. Retrying next run" in caplog.text
1487+
1488+
# Verify no CRITICAL check sent for spark.driver.can_connect
1489+
service_checks = aggregator.service_checks(SPARK_DRIVER_SERVICE_CHECK)
1490+
assert len(service_checks) == 0
1491+
1492+
# Second run: expect CRITICAL
1493+
# The check raises the exception, so we expect it here
1494+
with pytest.raises(ConnectionError):
1495+
dd_run_check(c)
1496+
1497+
service_checks = aggregator.service_checks(SPARK_DRIVER_SERVICE_CHECK)
1498+
assert len(service_checks) == 1
1499+
assert service_checks[0].status == SparkCheck.CRITICAL
1500+
1501+
1502+
@pytest.mark.unit
1503+
def test_debounce_connection_failure_terminal_phase(aggregator, dd_run_check, caplog):
1504+
def connection_failure_mock(session, url, *args, **kwargs):
1505+
raise ConnectionError("Connection refused")
1506+
1507+
instance = DRIVER_CONFIG.copy()
1508+
instance['tags'] = list(instance.get('tags', [])) + ['pod_phase:Succeeded']
1509+
1510+
with mock.patch('requests.Session.get', side_effect=connection_failure_mock):
1511+
c = SparkCheck('spark', {}, [instance])
1512+
1513+
with pytest.raises(ConnectionError):
1514+
dd_run_check(c)
1515+
1516+
service_checks = aggregator.service_checks(SPARK_DRIVER_SERVICE_CHECK)
1517+
assert len(service_checks) == 1
1518+
assert service_checks[0].status == SparkCheck.CRITICAL

0 commit comments

Comments
 (0)