Skip to content

Commit ec26937

Browse files
committed
use event type
1 parent 6bb05d2 commit ec26937

File tree

6 files changed

+54
-41
lines changed

6 files changed

+54
-41
lines changed

ddtrace/internal/telemetry/constants.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,20 @@ class TELEMETRY_NAMESPACE(Enum):
1111
PROFILER = "profiler"
1212

1313

14-
TELEMETRY_TYPE_GENERATE_METRICS = "generate-metrics"
15-
TELEMETRY_TYPE_DISTRIBUTION = "distributions"
16-
TELEMETRY_TYPE_LOGS = "logs"
14+
class TELEMETRY_EVENT_TYPE(Enum):
15+
STARTED = "app-started"
16+
SHUTDOWN = "app-shutdown"
17+
HEARTBEAT = "app-heartbeat"
18+
EXTENDED_HEARTBEAT = "app-extended-heartbeat"
19+
DEPENDENCIES_LOADED = "app-dependencies-loaded"
20+
PRODUCT_CHANGE = "app-product-change"
21+
INTEGRATIONS_CHANGE = "app-integrations-change"
22+
ENDPOINTS = "app-endpoints"
23+
CLIENT_CONFIGURATION_CHANGE = "app-client-configuration-change"
24+
LOGS = "logs"
25+
METRICS = "generate-metrics"
26+
DISTRIBUTIONS = "distributions"
27+
MESSAGE_BATCH = "message-batch"
1728

1829

1930
class TELEMETRY_LOG_LEVEL(Enum):

ddtrace/internal/telemetry/metrics_namespaces.pyx

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,7 @@ from typing import Tuple
66

77
from ddtrace.internal import forksafe
88
from ddtrace.internal.telemetry.constants import TELEMETRY_NAMESPACE
9-
from ddtrace.internal.telemetry.constants import TELEMETRY_TYPE_DISTRIBUTION
10-
from ddtrace.internal.telemetry.constants import TELEMETRY_TYPE_GENERATE_METRICS
9+
from ddtrace.internal.telemetry.constants import TELEMETRY_EVENT_TYPE
1110

1211

1312
MetricTagType = Optional[Tuple[Tuple[str, str], ...]]
@@ -46,14 +45,14 @@ cdef class MetricNamespace:
4645

4746
now = int(time.time())
4847
data = {
49-
TELEMETRY_TYPE_GENERATE_METRICS: {},
50-
TELEMETRY_TYPE_DISTRIBUTION: {},
48+
TELEMETRY_EVENT_TYPE.METRICS: {},
49+
TELEMETRY_EVENT_TYPE.DISTRIBUTIONS: {},
5150
}
5251
for metric_id, value in namespace_metrics.items():
5352
name, namespace, _tags, metric_type = metric_id
5453
tags = ["{}:{}".format(k, v).lower() for k, v in _tags] if _tags else []
5554
if metric_type is MetricType.DISTRIBUTION:
56-
data[TELEMETRY_TYPE_DISTRIBUTION].setdefault(namespace, []).append({
55+
data[TELEMETRY_EVENT_TYPE.DISTRIBUTIONS].setdefault(namespace, []).append({
5756
"metric": name,
5857
"points": value,
5958
"tags": tags,
@@ -70,7 +69,7 @@ cdef class MetricNamespace:
7069
}
7170
if metric_type in (MetricType.RATE, MetricType.GAUGE):
7271
metric["interval"] = _interval
73-
data[TELEMETRY_TYPE_GENERATE_METRICS].setdefault(namespace, []).append(metric)
72+
data[TELEMETRY_EVENT_TYPE.METRICS].setdefault(namespace, []).append(metric)
7473

7574
return data
7675

ddtrace/internal/telemetry/writer.py

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,9 @@
3131
from ..utils.version import version as tracer_version
3232
from . import modules
3333
from .constants import TELEMETRY_APM_PRODUCT
34+
from .constants import TELEMETRY_EVENT_TYPE
3435
from .constants import TELEMETRY_LOG_LEVEL
3536
from .constants import TELEMETRY_NAMESPACE
36-
from .constants import TELEMETRY_TYPE_LOGS
3737
from .data import get_application
3838
from .data import get_host_info
3939
from .data import get_python_config_vars
@@ -216,7 +216,7 @@ def __init__(self, is_periodic=True, agentless=None):
216216
self.enable()
217217
# Force app started for unit tests
218218
if config.FORCE_START and (app_started := self._app_started_payload()):
219-
self._events_queue.append({"payload": app_started, "request_type": "app-started"})
219+
self._events_queue.append({"payload": app_started, "request_type": TELEMETRY_EVENT_TYPE.STARTED})
220220
get_logger("ddtrace").addHandler(DDTelemetryErrorHandler(self))
221221

222222
def enable(self):
@@ -635,7 +635,7 @@ def periodic(self, force_flush=False, shutting_down=False):
635635
)
636636

637637
if logs := self._report_logs():
638-
events.append({"payload": {"logs": list(logs)}, "request_type": TELEMETRY_TYPE_LOGS})
638+
events.append({"payload": {"logs": list(logs)}, "request_type": TELEMETRY_EVENT_TYPE.LOGS})
639639

640640
# Queue metrics if not at heartbeat interval
641641
if self._is_periodic and force_flush is False:
@@ -649,34 +649,39 @@ def periodic(self, force_flush=False, shutting_down=False):
649649
# At heartbeat interval, collect and send all telemetry data
650650
if app_started_payload := self._app_started_payload():
651651
# app-started should be the first event in the batch
652-
events = [{"payload": app_started_payload, "request_type": "app-started"}] + events
652+
events = [{"payload": app_started_payload, "request_type": TELEMETRY_EVENT_TYPE.STARTED}] + events
653653

654654
if products := self._report_app_products():
655-
events.append({"payload": {"products": products}, "request_type": "app-product-change"})
655+
events.append({"payload": {"products": products}, "request_type": TELEMETRY_EVENT_TYPE.PRODUCT_CHANGE})
656656

657657
if ints := self._report_integrations():
658-
events.append({"payload": {"integrations": ints}, "request_type": "app-integrations-change"})
658+
events.append({"payload": {"integrations": ints}, "request_type": TELEMETRY_EVENT_TYPE.INTEGRATIONS_CHANGE})
659659

660660
if endpoints := self._report_app_endpoints():
661-
events.append({"payload": endpoints, "request_type": "app-endpoints"})
661+
events.append({"payload": endpoints, "request_type": TELEMETRY_EVENT_TYPE.ENDPOINTS})
662662

663663
if configs := self._report_configuration_queue():
664-
events.append({"payload": {"configuration": configs}, "request_type": "app-client-configuration-change"})
664+
events.append(
665+
{
666+
"payload": {"configuration": configs},
667+
"request_type": TELEMETRY_EVENT_TYPE.CLIENT_CONFIGURATION_CHANGE,
668+
}
669+
)
665670

666671
if deps := self._report_app_dependencies():
667-
events.append({"payload": {"dependencies": deps}, "request_type": "app-dependencies-loaded"})
672+
events.append({"payload": {"dependencies": deps}, "request_type": TELEMETRY_EVENT_TYPE.DEPENDENCIES_LOADED})
668673

669674
if shutting_down and not self._forked:
670-
events.append({"payload": {}, "request_type": "app-closing"})
675+
events.append({"payload": {}, "request_type": TELEMETRY_EVENT_TYPE.SHUTDOWN})
671676

672677
# Always include a heartbeat to keep RC connections alive
673678
# Extended heartbeat should be queued after app-dependencies-loaded event. This
674679
# ensures that that imported dependencies are accurately reported.
675680
if heartbeat_payload := self._app_heartbeat_payload():
676681
# Extended heartbeat report dependencies while regular heartbeats report empty payloads
677-
events.append({"payload": heartbeat_payload, "request_type": "app-extended-heartbeat"})
682+
events.append({"payload": heartbeat_payload, "request_type": TELEMETRY_EVENT_TYPE.EXTENDED_HEARTBEAT})
678683
else:
679-
events.append({"payload": {}, "request_type": "app-heartbeat"})
684+
events.append({"payload": {}, "request_type": TELEMETRY_EVENT_TYPE.HEARTBEAT})
680685

681686
# Get any queued events and combine with current batch
682687
if queued_events := self._flush_events_queue():
@@ -781,7 +786,7 @@ def _telemetry_excepthook(self, tp, value, root_traceback):
781786
self.add_integration(integration_name, True, error_msg=error_msg)
782787

783788
if app_started := self._app_started_payload(False):
784-
self._events_queue.append({"payload": app_started, "request_type": "app-started"})
789+
self._events_queue.append({"payload": app_started, "request_type": TELEMETRY_EVENT_TYPE.STARTED})
785790

786791
self.app_shutdown()
787792

tests/appsec/appsec/test_telemetry.py

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,8 @@
1515
from ddtrace.constants import APPSEC_ENV
1616
from ddtrace.contrib.internal.trace_utils import set_http_meta
1717
from ddtrace.ext import SpanTypes
18+
from ddtrace.internal.telemetry.constants import TELEMETRY_EVENT_TYPE
1819
from ddtrace.internal.telemetry.constants import TELEMETRY_NAMESPACE
19-
from ddtrace.internal.telemetry.constants import TELEMETRY_TYPE_DISTRIBUTION
20-
from ddtrace.internal.telemetry.constants import TELEMETRY_TYPE_GENERATE_METRICS
2120
from ddtrace.trace import tracer
2221
import tests.appsec.rules as rules
2322
from tests.appsec.utils import asm_context
@@ -38,7 +37,7 @@ def _assert_generate_metrics(metrics_result, is_rule_triggered=False, is_blocked
3837
# this function and make the tests flaky. That's why we exclude the "enabled" metric from this assert
3938
generate_metrics = [
4039
m
41-
for m in metrics_result[TELEMETRY_TYPE_GENERATE_METRICS][TELEMETRY_NAMESPACE.APPSEC.value]
40+
for m in metrics_result[TELEMETRY_EVENT_TYPE.METRICS][TELEMETRY_NAMESPACE.APPSEC.value]
4241
if m["metric"] != "enabled"
4342
]
4443
assert (
@@ -76,7 +75,7 @@ def _assert_generate_metrics(metrics_result, is_rule_triggered=False, is_blocked
7675

7776

7877
def _assert_distributions_metrics(metrics_result, is_rule_triggered=False, is_blocked_request=False):
79-
distributions_metrics = metrics_result[TELEMETRY_TYPE_DISTRIBUTION][TELEMETRY_NAMESPACE.APPSEC.value]
78+
distributions_metrics = metrics_result[TELEMETRY_EVENT_TYPE.DISTRIBUTIONS][TELEMETRY_NAMESPACE.APPSEC.value]
8079

8180
assert len(distributions_metrics) == 2, "Expected 2 distributions_metrics"
8281
for metric in distributions_metrics:
@@ -101,8 +100,8 @@ def test_metrics_when_appsec_doesnt_runs(telemetry_writer, tracer):
101100
rules.Config(),
102101
)
103102
metrics_data = telemetry_writer._namespace.flush()
104-
assert len(metrics_data[TELEMETRY_TYPE_GENERATE_METRICS]) == 0
105-
assert len(metrics_data[TELEMETRY_TYPE_DISTRIBUTION]) == 0
103+
assert len(metrics_data[TELEMETRY_EVENT_TYPE.METRICS]) == 0
104+
assert len(metrics_data[TELEMETRY_EVENT_TYPE.DISTRIBUTIONS]) == 0
106105

107106

108107
def test_metrics_when_appsec_runs(telemetry_writer, tracer):
@@ -185,7 +184,7 @@ def test_log_metric_error_ddwaf_timeout(telemetry_writer, tracer):
185184
list_metrics_logs = list(telemetry_writer._logs)
186185
assert len(list_metrics_logs) == 0
187186

188-
generate_metrics = telemetry_writer._namespace.flush()[TELEMETRY_TYPE_GENERATE_METRICS][
187+
generate_metrics = telemetry_writer._namespace.flush()[TELEMETRY_EVENT_TYPE.METRICS][
189188
TELEMETRY_NAMESPACE.APPSEC.value
190189
]
191190

@@ -228,7 +227,7 @@ def test_log_metric_error_ddwaf_internal_error(telemetry_writer):
228227
assert len(list_telemetry_logs) == 0
229228
assert span.get_tag("_dd.appsec.waf.error") == "-3"
230229
metrics_result = telemetry_writer._namespace.flush()
231-
list_telemetry_metrics = metrics_result.get(TELEMETRY_TYPE_GENERATE_METRICS, {}).get(
230+
list_telemetry_metrics = metrics_result.get(TELEMETRY_EVENT_TYPE.METRICS, {}).get(
232231
TELEMETRY_NAMESPACE.APPSEC.value, {}
233232
)
234233
error_metrics = [m for m in list_telemetry_metrics if m["metric"] == "waf.error"]

tests/appsec/iast/test_telemetry.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@
2121
from ddtrace.appsec._iast.taint_sinks.header_injection import patch as header_injection_patch
2222
from ddtrace.appsec._iast.taint_sinks.weak_hash import patch as weak_hash_patch
2323
from ddtrace.ext import SpanTypes
24+
from ddtrace.internal.telemetry.constants import TELEMETRY_EVENT_TYPE
2425
from ddtrace.internal.telemetry.constants import TELEMETRY_NAMESPACE
25-
from ddtrace.internal.telemetry.constants import TELEMETRY_TYPE_GENERATE_METRICS
2626
from tests.appsec.iast.iast_utils import _iast_patched_module
2727
from tests.appsec.utils import asm_context
2828
from tests.utils import DummyTracer
@@ -31,7 +31,7 @@
3131

3232
def _assert_instrumented_sink(telemetry_writer, vuln_type):
3333
metrics_result = telemetry_writer._namespace.flush()
34-
generate_metrics = metrics_result[TELEMETRY_TYPE_GENERATE_METRICS][TELEMETRY_NAMESPACE.IAST.value]
34+
generate_metrics = metrics_result[TELEMETRY_EVENT_TYPE.METRICS][TELEMETRY_NAMESPACE.IAST.value]
3535
assert len(generate_metrics) == 1, "Expected 1 generate_metrics"
3636
assert [metric["metric"] for metric in generate_metrics] == ["instrumented.sink"]
3737
assert [metric["tags"] for metric in generate_metrics] == [[f"vulnerability_type:{vuln_type.lower()}"]]
@@ -97,7 +97,7 @@ def test_metric_executed_sink(
9797
metrics_result = telemetry_writer._namespace.flush()
9898
_testing_unpatch_iast()
9999

100-
generate_metrics = metrics_result[TELEMETRY_TYPE_GENERATE_METRICS][TELEMETRY_NAMESPACE.IAST.value]
100+
generate_metrics = metrics_result[TELEMETRY_EVENT_TYPE.METRICS][TELEMETRY_NAMESPACE.IAST.value]
101101
assert len(generate_metrics) == 1
102102
# Remove potential sinks from internal usage of the lib (like http.client, used to communicate with
103103
# the agent)
@@ -142,7 +142,7 @@ def test_metric_instrumented_propagation(no_request_sampling, telemetry_writer):
142142
_iast_patched_module("benchmarks.bm.iast_fixtures.str_methods")
143143

144144
metrics_result = telemetry_writer._namespace.flush()
145-
generate_metrics = metrics_result[TELEMETRY_TYPE_GENERATE_METRICS][TELEMETRY_NAMESPACE.IAST.value]
145+
generate_metrics = metrics_result[TELEMETRY_EVENT_TYPE.METRICS][TELEMETRY_NAMESPACE.IAST.value]
146146
# Remove potential sinks from internal usage of the lib (like http.client, used to communicate with
147147
# the agent)
148148
filtered_metrics = [
@@ -170,7 +170,7 @@ def test_metric_request_tainted(no_request_sampling, telemetry_writer):
170170

171171
metrics_result = telemetry_writer._namespace.flush()
172172

173-
generate_metrics = metrics_result[TELEMETRY_TYPE_GENERATE_METRICS][TELEMETRY_NAMESPACE.IAST.value]
173+
generate_metrics = metrics_result[TELEMETRY_EVENT_TYPE.METRICS][TELEMETRY_NAMESPACE.IAST.value]
174174
# Remove potential sinks from internal usage of the lib (like http.client, used to communicate with
175175
# the agent)
176176
filtered_metrics = [metric["metric"] for metric in generate_metrics if metric["metric"] != "executed.sink"]

tests/telemetry/test_telemetry_metrics.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,18 +3,17 @@
33

44
from mock.mock import ANY
55

6+
from ddtrace.internal.telemetry.constants import TELEMETRY_EVENT_TYPE
67
from ddtrace.internal.telemetry.constants import TELEMETRY_LOG_LEVEL
78
from ddtrace.internal.telemetry.constants import TELEMETRY_NAMESPACE
8-
from ddtrace.internal.telemetry.constants import TELEMETRY_TYPE_DISTRIBUTION
9-
from ddtrace.internal.telemetry.constants import TELEMETRY_TYPE_GENERATE_METRICS
109
from tests.utils import override_global_config
1110

1211

1312
def _assert_metric(
1413
test_agent,
1514
expected_metrics,
1615
namespace=TELEMETRY_NAMESPACE.TRACERS,
17-
type_paypload=TELEMETRY_TYPE_GENERATE_METRICS,
16+
type_paypload=TELEMETRY_EVENT_TYPE.METRICS,
1817
):
1918
assert len(expected_metrics) > 0, "expected_metrics should not be empty"
2019
test_agent.telemetry_writer.periodic(force_flush=True)
@@ -291,7 +290,7 @@ def test_send_appsec_distributions_metric(telemetry_writer, test_agent_session,
291290
test_agent_session,
292291
expected_series,
293292
namespace=TELEMETRY_NAMESPACE.APPSEC,
294-
type_paypload=TELEMETRY_TYPE_DISTRIBUTION,
293+
type_paypload=TELEMETRY_EVENT_TYPE.DISTRIBUTIONS,
295294
)
296295

297296

@@ -312,7 +311,7 @@ def test_send_metric_flush_and_distributions_series_is_restarted(telemetry_write
312311
test_agent_session,
313312
expected_series,
314313
namespace=TELEMETRY_NAMESPACE.APPSEC,
315-
type_paypload=TELEMETRY_TYPE_DISTRIBUTION,
314+
type_paypload=TELEMETRY_EVENT_TYPE.DISTRIBUTIONS,
316315
)
317316

318317
expected_series = [
@@ -329,7 +328,7 @@ def test_send_metric_flush_and_distributions_series_is_restarted(telemetry_write
329328
test_agent_session,
330329
expected_series,
331330
namespace=TELEMETRY_NAMESPACE.APPSEC,
332-
type_paypload=TELEMETRY_TYPE_DISTRIBUTION,
331+
type_paypload=TELEMETRY_EVENT_TYPE.DISTRIBUTIONS,
333332
)
334333

335334

0 commit comments

Comments
 (0)