Skip to content

Commit d1d672d

Browse files
committed
avoid unnessary add_event calls
1 parent af76074 commit d1d672d

File tree

3 files changed

+78
-62
lines changed

3 files changed

+78
-62
lines changed

ddtrace/internal/telemetry/writer.py

Lines changed: 74 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,7 @@
2929
from ..utils.version import version as tracer_version
3030
from . import modules
3131
from .constants import TELEMETRY_APM_PRODUCT
32-
from .constants import TELEMETRY_LOG_LEVEL # noqa:F401
3332
from .constants import TELEMETRY_NAMESPACE
34-
from .constants import TELEMETRY_TYPE_DISTRIBUTION
35-
from .constants import TELEMETRY_TYPE_GENERATE_METRICS
3633
from .constants import TELEMETRY_TYPE_LOGS
3734
from .data import get_application
3835
from .data import get_host_info
@@ -202,8 +199,8 @@ def __init__(self, is_periodic=True, agentless=None):
202199
# This will occur when the agent writer starts.
203200
self.enable()
204201
# Force app started for unit tests
205-
if config.FORCE_START:
206-
self._app_started()
202+
if config.FORCE_START and (app_started := self._app_started()):
203+
self._events_queue.append(app_started)
207204
if config.LOG_COLLECTION_ENABLED:
208205
get_logger("ddtrace").addHandler(DDTelemetryLogHandler(self))
209206

@@ -259,7 +256,17 @@ def add_event(self, payload, payload_type):
259256
Payload types accepted by telemetry/proxy v2: app-started, app-closing, app-integrations-change
260257
"""
261258
if self.enable():
262-
self._events_queue.append({"payload": payload, "request_type": payload_type})
259+
with self._service_lock:
260+
self._events_queue.append({"payload": payload, "request_type": payload_type})
261+
262+
def add_events(self, events):
263+
# type: (List[Dict[str, Any]]) -> None
264+
"""
265+
Adds a list of Telemetry events to the TelemetryWriter event buffer
266+
"""
267+
if self.enable():
268+
with self._service_lock:
269+
self._events_queue.extend(events)
263270

264271
def add_integration(self, integration_name, patched, auto_patched=None, error_msg=None, version=""):
265272
# type: (str, bool, Optional[bool], Optional[str], Optional[str]) -> None
@@ -269,6 +276,9 @@ def add_integration(self, integration_name, patched, auto_patched=None, error_ms
269276
:param str integration_name: name of patched module
270277
:param bool auto_enabled: True if module is enabled in _monkey.PATCH_MODULES
271278
"""
279+
if not self.enable():
280+
return
281+
272282
# Integrations can be patched before the telemetry writer is enabled.
273283
with self._service_lock:
274284
if integration_name not in self._integrations_queue:
@@ -294,11 +304,11 @@ def add_error(self, code, msg, filename, line_number):
294304
self._error = (code, msg)
295305

296306
def _app_started(self, register_app_shutdown=True):
297-
# type: (bool) -> None
307+
# type: (bool) -> Optional[Dict[str, Any]]
298308
"""Sent when TelemetryWriter is enabled or forks"""
299309
if self._forked or self.started:
300310
# app-started events should only be sent by the main process
301-
return
311+
return None
302312
# List of configurations to be collected
303313

304314
self.started = True
@@ -329,10 +339,10 @@ def _app_started(self, register_app_shutdown=True):
329339

330340
# Reset the error after it has been reported.
331341
self._error = (0, "")
332-
self.add_event(payload, "app-started")
342+
return {"payload": payload, "request_type": "app-started"}
333343

334344
def _app_heartbeat_event(self):
335-
# type: () -> None
345+
# type: () -> Dict[str, Any]
336346
if config.DEPENDENCY_COLLECTION and time.monotonic() - self._extended_time > self._extended_heartbeat_interval:
337347
self._extended_time += self._extended_heartbeat_interval
338348
self._app_dependencies_loaded_event()
@@ -341,26 +351,27 @@ def _app_heartbeat_event(self):
341351
{"name": name, "version": version} for name, version in self._imported_dependencies.items()
342352
]
343353
}
344-
self.add_event(payload, "app-extended-heartbeat")
354+
request_type = "app-extended-heartbeat"
345355
else:
346-
self.add_event({}, "app-heartbeat")
356+
payload = {}
357+
request_type = "app-heartbeat"
358+
return {"payload": payload, "request_type": request_type}
347359

348360
def _app_closing_event(self):
349-
# type: () -> None
361+
# type: () -> Optional[Dict[str, Any]]
350362
"""Adds a Telemetry event which notifies the agent that an application instance has terminated"""
351363
if self._forked:
352364
# app-closing event should only be sent by the main process
353-
return
354-
payload = {} # type: Dict
355-
self.add_event(payload, "app-closing")
365+
return None
366+
return {"payload": {}, "request_type": "app-closing"}
356367

357368
def _app_integrations_changed_event(self, integrations):
358-
# type: (List[Dict]) -> None
369+
# type: (List[Dict]) -> Dict
359370
"""Adds a Telemetry event which sends a list of configured integrations to the agent"""
360371
payload = {
361372
"integrations": integrations,
362373
}
363-
self.add_event(payload, "app-integrations-change")
374+
return {"payload": payload, "request_type": "app-integrations-change"}
364375

365376
def _flush_integrations_queue(self):
366377
# type: () -> List[Dict]
@@ -379,46 +390,44 @@ def _flush_configuration_queue(self):
379390
return configurations
380391

381392
def _app_client_configuration_changed_event(self, configurations):
382-
# type: (List[Dict]) -> None
393+
# type: (List[Dict]) -> Dict[str, Any]
383394
"""Adds a Telemetry event which sends list of modified configurations to the agent"""
384395
payload = {
385396
"configuration": configurations,
386397
}
387-
self.add_event(payload, "app-client-configuration-change")
398+
return {"payload": payload, "request_type": "app-client-configuration-change"}
388399

389400
def _app_dependencies_loaded_event(self):
401+
# type: () -> Optional[Dict[str, Any]]
390402
"""Adds events to report imports done since the last periodic run"""
391-
392403
if not config.DEPENDENCY_COLLECTION or not self._enabled:
393-
return
404+
return None
394405
with self._service_lock:
395406
newly_imported_deps = modules.get_newly_imported_modules(self._modules_already_imported)
396407

397408
if not newly_imported_deps:
398-
return
409+
return None
399410

400411
with self._service_lock:
401-
packages = update_imported_dependencies(self._imported_dependencies, newly_imported_deps)
402-
403-
if packages:
404-
payload = {"dependencies": packages}
405-
self.add_event(payload, "app-dependencies-loaded")
412+
if packages := update_imported_dependencies(self._imported_dependencies, newly_imported_deps):
413+
return {"payload": {"dependencies": packages}, "request_type": "app-dependencies-loaded"}
414+
return None
406415

407416
def _app_product_change(self):
408-
# type: () -> None
417+
# type: () -> Optional[Dict[str, Any]]
409418
"""Adds a Telemetry event which reports the enablement of an APM product"""
410419

411420
if not self._send_product_change_updates:
412-
return
421+
return None
413422

414423
payload = {
415424
"products": {
416425
product: {"version": tracer_version, "enabled": status}
417426
for product, status in self._product_enablement.items()
418427
}
419428
}
420-
self.add_event(payload, "app-product-change")
421429
self._send_product_change_updates = False
430+
return {"payload": payload, "request_type": "app-product-change"}
422431

423432
def product_activated(self, product, enabled):
424433
# type: (str, bool) -> None
@@ -471,7 +480,7 @@ def add_configurations(self, configuration_list):
471480

472481
def add_log(self, level, message, stack_trace="", tags=None):
473482
"""
474-
Queues log. This event is meant to send library logs to Datadogs backend through the Telemetry intake.
483+
Queues log. This event is meant to send library logs to Datadog's backend through the Telemetry intake.
475484
This will make support cycles easier and ensure we know about potentially silent issues in libraries.
476485
"""
477486
if tags is None:
@@ -553,7 +562,8 @@ def _flush_log_metrics(self):
553562
self._logs = set()
554563
return log_metrics
555564

556-
def _generate_metrics_event(self, namespace_metrics) -> None:
565+
def _generate_metrics_event(self, namespace_metrics):
566+
# type: (Dict[str, Dict[str, List[Dict[str, Any]]]]) -> Optional[Dict[str, Any]]
557567
for payload_type, namespaces in namespace_metrics.items():
558568
for namespace, metrics in namespaces.items():
559569
if metrics:
@@ -562,52 +572,56 @@ def _generate_metrics_event(self, namespace_metrics) -> None:
562572
"series": metrics,
563573
}
564574
log.debug("%s request payload, namespace %s", payload_type, namespace)
565-
if payload_type == TELEMETRY_TYPE_DISTRIBUTION:
566-
self.add_event(payload, TELEMETRY_TYPE_DISTRIBUTION)
567-
elif payload_type == TELEMETRY_TYPE_GENERATE_METRICS:
568-
self.add_event(payload, TELEMETRY_TYPE_GENERATE_METRICS)
575+
return {"payload": payload, "request_type": payload_type}
576+
return None
569577

570578
def _generate_logs_event(self, logs):
571-
# type: (Set[Dict[str, str]]) -> None
579+
# type: (Set[Dict[str, str]]) -> Dict[str, Any]
572580
log.debug("%s request payload", TELEMETRY_TYPE_LOGS)
573-
self.add_event({"logs": list(logs)}, TELEMETRY_TYPE_LOGS)
581+
return {"payload": {"logs": list(logs)}, "request_type": TELEMETRY_TYPE_LOGS}
574582

575583
def periodic(self, force_flush=False, shutting_down=False):
576584
# ensure app_started is called at least once in case traces weren't flushed
577-
self._app_started()
578-
self._app_product_change()
585+
events = []
586+
if app_started := self._app_started():
587+
events.append(app_started)
588+
if app_product_change := self._app_product_change():
589+
events.append(app_product_change)
579590

580-
namespace_metrics = self._namespace.flush(float(self.interval))
581-
if namespace_metrics:
582-
self._generate_metrics_event(namespace_metrics)
591+
if namespace_metrics := self._namespace.flush(float(self.interval)):
592+
if metrics_event := self._generate_metrics_event(namespace_metrics):
593+
events.append(metrics_event)
583594

584-
logs_metrics = self._flush_log_metrics()
585-
if logs_metrics:
586-
self._generate_logs_event(logs_metrics)
595+
if logs_metrics := self._flush_log_metrics():
596+
events.append(self._generate_logs_event(logs_metrics))
587597

588598
# Telemetry metrics and logs should be aggregated into payloads every time periodic is called.
589599
# This ensures metrics and logs are submitted in 10 second time buckets.
590600
if self._is_periodic and force_flush is False:
591601
if self._periodic_count < self._periodic_threshold:
602+
if events:
603+
self.add_events(events)
592604
self._periodic_count += 1
593605
return
594606
self._periodic_count = 0
595607

596-
integrations = self._flush_integrations_queue()
597-
if integrations:
598-
self._app_integrations_changed_event(integrations)
608+
if integrations := self._flush_integrations_queue():
609+
events.append(self._app_integrations_changed_event(integrations))
599610

600-
configurations = self._flush_configuration_queue()
601-
if configurations:
602-
self._app_client_configuration_changed_event(configurations)
611+
if configurations := self._flush_configuration_queue():
612+
events.append(self._app_client_configuration_changed_event(configurations))
603613

604-
self._app_dependencies_loaded_event()
614+
if app_dependencies_loaded := self._app_dependencies_loaded_event():
615+
events.append(app_dependencies_loaded)
605616

606-
if shutting_down:
607-
self._app_closing_event()
617+
if shutting_down and (app_closing := self._app_closing_event()):
618+
events.append(app_closing)
608619

609620
# Send a heartbeat event to the agent, this is required to keep RC connections alive
610-
self._app_heartbeat_event()
621+
events.append(self._app_heartbeat_event())
622+
623+
if queued_events := self._flush_events_queue():
624+
events.extend(queued_events)
611625

612626
batch_event = {
613627
"tracer_time": int(time.time()),
@@ -617,7 +631,7 @@ def periodic(self, force_flush=False, shutting_down=False):
617631
"debug": self._debug,
618632
"application": get_application(config.SERVICE, config.VERSION, config.ENV),
619633
"host": get_host_info(),
620-
"payload": self._flush_events_queue(),
634+
"payload": events,
621635
"request_type": "message-batch",
622636
}
623637
self._client.send_event(batch_event)
@@ -702,8 +716,8 @@ def _telemetry_excepthook(self, tp, value, root_traceback):
702716
error_msg = "{}:{} {}".format(filename, lineno, str(value))
703717
self.add_integration(integration_name, True, error_msg=error_msg)
704718

705-
if self._enabled and not self.started:
706-
self._app_started(False)
719+
if app_started := self._app_started(False):
720+
self._events_queue.append(app_started)
707721

708722
self.app_shutdown()
709723

tests/telemetry/test_telemetry.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ def process_trace(self, trace):
161161
162162
# force app_started event (instead of waiting for 10 seconds)
163163
from ddtrace.internal.telemetry import telemetry_writer
164-
telemetry_writer._app_started()
164+
telemetry_writer.periodic(force_flush=True)
165165
"""
166166
_, stderr, status, _ = run_python_code_in_subprocess(code)
167167
assert status == 0, stderr

tests/telemetry/test_writer.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,9 @@ def test_app_started_event(telemetry_writer, test_agent_session, mock_time):
8888
"""asserts that app_started() queues a valid telemetry request which is then sent by periodic()"""
8989
with override_global_config(dict(_telemetry_dependency_collection=False)):
9090
# queue an app started event
91-
telemetry_writer._app_started()
91+
event = telemetry_writer._app_started()
92+
assert event is not None, "app_started() did not return an event"
93+
telemetry_writer.add_event(event["payload"], "app-started")
9294
# force a flush
9395
telemetry_writer.periodic(force_flush=True)
9496

0 commit comments

Comments
 (0)