Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions spark/changelog.d/21922.fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Debounce false-positive connection errors
87 changes: 83 additions & 4 deletions spark/datadog_checks/spark/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,12 @@ def __init__(self, name, init_config, instances):
raise ConfigurationError('The cluster_name must be specified in the instance configuration')

self.master_address = self._get_master_address()
self._connection_error_seen = False
self._debounced_this_run = False

def check(self, _):
self._debounced_this_run = False

tags = list(self.tags)

tags.append('spark_cluster:%s' % self.cluster_name)
Expand Down Expand Up @@ -192,6 +196,8 @@ def _get_running_apps(self):
def _collect_version(self, base_url, tags):
try:
version_json = self._rest_request_to_json(base_url, SPARK_VERSION_PATH, SPARK_SERVICE_CHECK, tags)
if version_json is None:
return False
version = version_json['spark']
except Exception as e:
self.log.debug("Failed to collect version information: %s", e)
Expand All @@ -206,10 +212,18 @@ def _driver_init(self, tags):
"""
self._collect_version(self.master_address, tags)
running_apps = {}

# A request earlier in this check run already hit a debounced connection failure.
# Skip the remaining driver queries so we only retry on the next scheduled run.
if self._debounced_this_run:
return running_apps
metrics_json = self._rest_request_to_json(
self.master_address, SPARK_APPS_PATH, SPARK_DRIVER_SERVICE_CHECK, tags
)

if metrics_json is None:
return running_apps

for app_json in metrics_json:
app_id = app_json.get('id')
app_name = app_json.get('name')
Expand All @@ -231,6 +245,9 @@ def _standalone_init(self, pre_20_mode, tags):
self.master_address, SPARK_MASTER_STATE_PATH, SPARK_STANDALONE_SERVICE_CHECK, tags
)

if metrics_json is None:
return {}

running_apps = {}
version_set = False

Expand All @@ -251,10 +268,11 @@ def _standalone_init(self, pre_20_mode, tags):
applist = self._rest_request_to_json(
app_url, SPARK_APPS_PATH, SPARK_STANDALONE_SERVICE_CHECK, tags
)
for appl in applist:
aid = appl.get('id')
aname = appl.get('name')
running_apps[aid] = (aname, app_url)
if applist:
for appl in applist:
aid = appl.get('id')
aname = appl.get('name')
running_apps[aid] = (aname, app_url)
else:
running_apps[app_id] = (app_name, app_url)
except Exception:
Expand All @@ -279,6 +297,9 @@ def _mesos_init(self, tags):

metrics_json = self._rest_request_to_json(self.master_address, MESOS_MASTER_APP_PATH, MESOS_SERVICE_CHECK, tags)

if metrics_json is None:
return running_apps

if metrics_json.get('frameworks'):
for app_json in metrics_json.get('frameworks'):
app_id = app_json.get('id')
Expand Down Expand Up @@ -330,6 +351,9 @@ def _get_standalone_app_url(self, app_id, tags):
self.master_address, SPARK_MASTER_APP_PATH, SPARK_STANDALONE_SERVICE_CHECK, tags, appId=app_id
)

if app_page is None:
return None

dom = BeautifulSoup(app_page.text, 'html.parser')
app_detail_ui_links = dom.find_all('a', string='Application Detail UI')

Expand All @@ -352,6 +376,9 @@ def _yarn_get_running_spark_apps(self, tags):
applicationTypes=YARN_APPLICATION_TYPES,
)

if metrics_json is None:
return {}

running_apps = {}

if metrics_json.get('apps'):
Expand Down Expand Up @@ -379,6 +406,8 @@ def _get_spark_app_ids(self, running_apps, tags):
if not version_set:
version_set = self._collect_version(tracking_url, tags)
response = self._rest_request_to_json(tracking_url, SPARK_APPS_PATH, SPARK_SERVICE_CHECK, tags)
if response is None:
continue
except Exception as e:
self.log.warning("Exception happened when fetching app ids for %s: %s", tracking_url, e)
continue
Expand All @@ -405,6 +434,8 @@ def _describe_app(self, property, running_apps, addl_tags):
response = self._rest_request(
base_url, SPARK_APPS_PATH, SPARK_SERVICE_CHECK, addl_tags, app_id, property
)
if response is None:
continue
except HTTPError:
self.log.debug("Got an error collecting %s", property, exc_info=True)
continue
Expand Down Expand Up @@ -512,6 +543,8 @@ def _spark_structured_streams_metrics(self, running_apps, addl_tags):
response = self._rest_request_to_json(
base_url, self.metricsservlet_path, SPARK_SERVICE_CHECK, addl_tags
)
if response is None:
continue
self.log.debug('Structured streaming metrics: %s', response)
response = {
metric_name: v['value']
Expand Down Expand Up @@ -611,6 +644,10 @@ def _rest_request(self, url, object_path, service_name, tags, *args, **kwargs):
self.log.debug('Spark check URL: %s', url)
response = self.http.get(url, cookies=self.proxy_redirect_cookies)
response.raise_for_status()

# Reset connection errors on success
self._connection_error_seen = False

content = response.text
proxy_redirect_url = self._parse_proxy_redirect_url(content)
if proxy_redirect_url:
Expand All @@ -633,6 +670,9 @@ def _rest_request(self, url, object_path, service_name, tags, *args, **kwargs):
raise

except (HTTPError, InvalidURL, ConnectionError) as e:
if isinstance(e, ConnectionError) and self._should_suppress_connection_error(e, tags):
return None

self.service_check(
service_name,
AgentCheck.CRITICAL,
Expand All @@ -654,6 +694,9 @@ def _rest_request_to_json(self, address, object_path, service_name, tags, *args,
"""
response = self._rest_request(address, object_path, service_name, tags, *args, **kwargs)

if response is None:
return None

try:
response_json = response.json()

Expand All @@ -668,6 +711,42 @@ def _rest_request_to_json(self, address, object_path, service_name, tags, *args,

return response_json

def _should_suppress_connection_error(self, exception, tags):
"""Suppress kubernetes-only connection false positives during pod shutdown."""
pod_phase = self._get_pod_phase(tags)
if pod_phase is None:
return False

if pod_phase in ('failed', 'succeeded', 'unknown'):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know what happens if the pod is pending? Does autodiscovery pick up config from pending pods?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question - I have not seen an example of this, but I do not know for sure. I could do a bit of research to try to see.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's more a of a nit more than anything feel free to timebox it and convert this into a PR.

self.log.debug("Pod phase is terminal, suppressing request error: %s", exception)
return True

if (
not self._connection_error_seen
and not self._debounced_this_run
and ("Connection refused" in str(exception) or "No route to host" in str(exception))
):
self._connection_error_seen = True
self._debounced_this_run = True
self.log.warning(
"Connection failed. Suppressing error once to ensure driver is running. Error: %s",
exception,
)
return True

return False

def _is_pod_in_terminal_state(self, tags):
pod_phase = self._get_pod_phase(tags)
return pod_phase in ('failed', 'succeeded', 'unknown') if pod_phase is not None else False

@staticmethod
def _get_pod_phase(tags):
for tag in tags or []:
if tag.startswith('pod_phase:'):
return tag.split(':', 1)[1].strip().lower()
return None

@classmethod
def _join_url_dir(cls, url, *args):
"""
Expand Down
127 changes: 126 additions & 1 deletion spark/tests/test_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import mock
import pytest
import urllib3
from requests import RequestException
from requests import ConnectionError, RequestException

from datadog_checks.dev.http import MockResponse
from datadog_checks.dev.utils import get_metadata_metrics
Expand Down Expand Up @@ -1465,3 +1465,128 @@ def test_integration_driver_2(aggregator, dd_run_check):
)
aggregator.assert_all_metrics_covered()
aggregator.assert_metrics_using_metadata(get_metadata_metrics())


@pytest.mark.unit
def test_debounce_connection_failure(aggregator, dd_run_check, caplog):
# Mock connection failure
def connection_failure_mock(*args, **kwargs):
raise ConnectionError("Connection refused")

instance = DRIVER_CONFIG.copy()
instance['tags'] = list(instance.get('tags', [])) + ['pod_phase:Running']

with mock.patch('requests.Session.get', side_effect=connection_failure_mock):
c = SparkCheck('spark', {}, [instance])

# First run: expect warning, no CRITICAL check
with caplog.at_level(logging.WARNING):
dd_run_check(c)

assert "Connection failed. Suppressing error once to ensure driver is running" in caplog.text

# Verify no CRITICAL check sent for spark.driver.can_connect
service_checks = aggregator.service_checks(SPARK_DRIVER_SERVICE_CHECK)
assert len(service_checks) == 0

# Second run: expect CRITICAL (wrapped by dd_run_check as Exception)
with pytest.raises(Exception) as excinfo:
dd_run_check(c)

assert "Connection refused" in str(excinfo.value)

service_checks = aggregator.service_checks(SPARK_DRIVER_SERVICE_CHECK)
assert len(service_checks) == 1
assert service_checks[0].status == SparkCheck.CRITICAL


@pytest.mark.unit
def test_connection_failure_non_k8s(aggregator, dd_run_check):
def connection_failure_mock(*args, **kwargs):
raise ConnectionError("Connection refused")

instance = DRIVER_CONFIG.copy()
instance['tags'] = list(instance.get('tags', []))

with mock.patch('requests.Session.get', side_effect=connection_failure_mock):
c = SparkCheck('spark', {}, [instance])

with pytest.raises(Exception) as excinfo:
dd_run_check(c)

assert "Connection refused" in str(excinfo.value)

service_checks = aggregator.service_checks(SPARK_DRIVER_SERVICE_CHECK)
assert len(service_checks) == 1
assert service_checks[0].status == SparkCheck.CRITICAL


@pytest.mark.unit
def test_debounce_connection_failure_terminal_phase(aggregator, dd_run_check, caplog):
def connection_failure_mock(*args, **kwargs):
raise ConnectionError("Connection refused")

instance = DRIVER_CONFIG.copy()
instance['tags'] = list(instance.get('tags', [])) + ['pod_phase:Failed']

with mock.patch('requests.Session.get', side_effect=connection_failure_mock):
c = SparkCheck('spark', {}, [instance])

with caplog.at_level(logging.DEBUG):
dd_run_check(c)

assert "Pod phase is terminal, suppressing request error" in caplog.text

# Expect NO service check because we suppress errors for failed pods
service_checks = aggregator.service_checks(SPARK_DRIVER_SERVICE_CHECK)
assert len(service_checks) == 0


@pytest.mark.unit
def test_debounce_connection_recovery(aggregator, dd_run_check, caplog):
# Mock connection failure
def connection_failure_mock(*args, **kwargs):
raise ConnectionError("Connection refused")

instance = DRIVER_CONFIG.copy()
instance['tags'] = list(instance.get('tags', [])) + ['pod_phase:Running']

c = SparkCheck('spark', {}, [instance])

# 1. Fail (Debounce)
with mock.patch('requests.Session.get', side_effect=connection_failure_mock):
with caplog.at_level(logging.WARNING):
dd_run_check(c)

assert "Connection failed. Suppressing error once to ensure driver is running" in caplog.text
# Verify no CRITICAL check sent
service_checks = aggregator.service_checks(SPARK_DRIVER_SERVICE_CHECK)
assert len(service_checks) == 0

caplog.clear()
aggregator.reset()

# 2. Success (Reset)
with mock.patch('requests.Session.get', driver_requests_get_mock):
dd_run_check(c)

# Verify success
service_checks = aggregator.service_checks(SPARK_DRIVER_SERVICE_CHECK)
assert len(service_checks) > 0
assert service_checks[0].status == SparkCheck.OK

# Verify internal state was reset
assert c._connection_error_seen is False

caplog.clear()
aggregator.reset()

# 3. Fail (Debounce again)
with mock.patch('requests.Session.get', side_effect=connection_failure_mock):
with caplog.at_level(logging.WARNING):
dd_run_check(c)

assert "Connection failed. Suppressing error once to ensure driver is running" in caplog.text
# Verify no CRITICAL check sent
service_checks = aggregator.service_checks(SPARK_DRIVER_SERVICE_CHECK)
assert len(service_checks) == 0
Loading