Skip to content

Commit 8a2456a

Browse files
committed
clean up payload queing
1 parent d1d672d commit 8a2456a

File tree

1 file changed

+67
-28
lines changed

1 file changed

+67
-28
lines changed

ddtrace/internal/telemetry/writer.py

Lines changed: 67 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -368,10 +368,12 @@ def _app_closing_event(self):
368368
def _app_integrations_changed_event(self, integrations):
369369
# type: (List[Dict]) -> Dict
370370
"""Adds a Telemetry event which sends a list of configured integrations to the agent"""
371-
payload = {
372-
"integrations": integrations,
371+
return {
372+
"payload": {
373+
"integrations": integrations,
374+
},
375+
"request_type": "app-integrations-change",
373376
}
374-
return {"payload": payload, "request_type": "app-integrations-change"}
375377

376378
def _flush_integrations_queue(self):
377379
# type: () -> List[Dict]
@@ -392,10 +394,12 @@ def _flush_configuration_queue(self):
392394
def _app_client_configuration_changed_event(self, configurations):
393395
# type: (List[Dict]) -> Dict[str, Any]
394396
"""Adds a Telemetry event which sends list of modified configurations to the agent"""
395-
payload = {
396-
"configuration": configurations,
397+
return {
398+
"payload": {
399+
"configuration": configurations,
400+
},
401+
"request_type": "app-client-configuration-change",
397402
}
398-
return {"payload": payload, "request_type": "app-client-configuration-change"}
399403

400404
def _app_dependencies_loaded_event(self):
401405
# type: () -> Optional[Dict[str, Any]]
@@ -420,14 +424,16 @@ def _app_product_change(self):
420424
if not self._send_product_change_updates:
421425
return None
422426

423-
payload = {
424-
"products": {
425-
product: {"version": tracer_version, "enabled": status}
426-
for product, status in self._product_enablement.items()
427-
}
428-
}
429427
self._send_product_change_updates = False
430-
return {"payload": payload, "request_type": "app-product-change"}
428+
return {
429+
"payload": {
430+
"products": {
431+
product: {"version": tracer_version, "enabled": status}
432+
for product, status in self._product_enablement.items()
433+
}
434+
},
435+
"request_type": "app-product-change",
436+
}
431437

432438
def product_activated(self, product, enabled):
433439
# type: (str, bool) -> None
@@ -567,12 +573,14 @@ def _generate_metrics_event(self, namespace_metrics):
567573
for payload_type, namespaces in namespace_metrics.items():
568574
for namespace, metrics in namespaces.items():
569575
if metrics:
570-
payload = {
571-
"namespace": namespace,
572-
"series": metrics,
573-
}
574576
log.debug("%s request payload, namespace %s", payload_type, namespace)
575-
return {"payload": payload, "request_type": payload_type}
577+
return {
578+
"payload": {
579+
"namespace": namespace,
580+
"series": metrics,
581+
},
582+
"request_type": payload_type,
583+
}
576584
return None
577585

578586
def _generate_logs_event(self, logs):
@@ -581,30 +589,59 @@ def _generate_logs_event(self, logs):
581589
return {"payload": {"logs": list(logs)}, "request_type": TELEMETRY_TYPE_LOGS}
582590

583591
def periodic(self, force_flush=False, shutting_down=False):
584-
# ensure app_started is called at least once in case traces weren't flushed
592+
"""Process and send telemetry events in batches.
593+
594+
This method handles the periodic collection and sending of telemetry data with two main timing intervals:
595+
1. Metrics collection interval (10 seconds by default): Collects metrics and logs
596+
2. Heartbeat interval (60 seconds by default): Sends all collected data to the telemetry endpoint
597+
598+
The method follows this flow:
599+
1. Collects metrics and logs that have accumulated since last collection
600+
2. If not at heartbeat interval and not force_flush:
601+
- Queues the metrics and logs for future sending
602+
- Returns early
603+
3. At heartbeat interval or force_flush:
604+
- Collects app status (started, product changes)
605+
- Collects integration changes
606+
- Collects configuration changes
607+
- Collects dependency changes
608+
- Collects stored events (ex: metrics and logs)
609+
- Sends everything as a single batch
610+
611+
Args:
612+
force_flush: If True, bypasses the heartbeat interval check and sends immediately
613+
shutting_down: If True, includes app-closing event in the batch
614+
615+
Note:
616+
- Metrics are collected every 10 seconds to ensure accurate time-based data
617+
- All data is sent in a single batch every 60 seconds to minimize network overhead
618+
- A heartbeat event is always included to keep RC connections alive
619+
"""
620+
# Collect metrics and logs that have accumulated since last batch
585621
events = []
586-
if app_started := self._app_started():
587-
events.append(app_started)
588-
if app_product_change := self._app_product_change():
589-
events.append(app_product_change)
590-
591622
if namespace_metrics := self._namespace.flush(float(self.interval)):
592623
if metrics_event := self._generate_metrics_event(namespace_metrics):
593624
events.append(metrics_event)
594625

595626
if logs_metrics := self._flush_log_metrics():
596627
events.append(self._generate_logs_event(logs_metrics))
597628

598-
# Telemetry metrics and logs should be aggregated into payloads every time periodic is called.
599-
# This ensures metrics and logs are submitted in 10 second time buckets.
629+
# Queue metrics if not at heartbeat interval
600630
if self._is_periodic and force_flush is False:
601631
if self._periodic_count < self._periodic_threshold:
632+
self._periodic_count += 1
602633
if events:
603634
self.add_events(events)
604-
self._periodic_count += 1
605635
return
606636
self._periodic_count = 0
607637

638+
# At heartbeat interval, collect and send all telemetry data
639+
if app_started := self._app_started():
640+
events.append(app_started)
641+
642+
if app_product_change := self._app_product_change():
643+
events.append(app_product_change)
644+
608645
if integrations := self._flush_integrations_queue():
609646
events.append(self._app_integrations_changed_event(integrations))
610647

@@ -617,12 +654,14 @@ def periodic(self, force_flush=False, shutting_down=False):
617654
if shutting_down and (app_closing := self._app_closing_event()):
618655
events.append(app_closing)
619656

620-
# Send a heartbeat event to the agent, this is required to keep RC connections alive
657+
# Always include a heartbeat to keep RC connections alive
621658
events.append(self._app_heartbeat_event())
622659

660+
# Get any queued events and combine with current batch
623661
if queued_events := self._flush_events_queue():
624662
events.extend(queued_events)
625663

664+
# Prepare and send the final batch
626665
batch_event = {
627666
"tracer_time": int(time.time()),
628667
"runtime_id": get_runtime_id(),

0 commit comments

Comments
 (0)