2
2
// SPDX-License-Identifier: Apache-2.0
3
3
4
4
use crate :: log:: { TemporarilyRetainedMapStats , MULTI_LOG_FILTER , MULTI_LOG_WRITER } ;
5
+ use crate :: one_way_shared_memory:: OneWayShmWriter ;
5
6
use crate :: service:: {
6
- sidecar_interface:: ServeSidecarInterface , tracing:: TraceFlusher , InstanceId , QueueId ,
7
- RequestIdentification , RequestIdentifier , RuntimeInfo , RuntimeMetadata ,
7
+ sidecar_interface:: ServeSidecarInterface ,
8
+ telemetry:: { TelemetryCachedClient , TelemetryCachedClientSet } ,
9
+ tracing:: TraceFlusher ,
10
+ InstanceId , QueueId , RequestIdentification , RequestIdentifier , RuntimeInfo , RuntimeMetadata ,
8
11
SerializedTracerHeaderTags , SessionConfig , SessionInfo , SidecarAction , SidecarInterface ,
9
12
SidecarInterfaceRequest , SidecarInterfaceResponse ,
10
13
} ;
11
- use base64:: prelude:: BASE64_URL_SAFE_NO_PAD ;
12
- use base64:: Engine ;
14
+ use datadog_ipc:: platform:: NamedShmHandle ;
13
15
use datadog_ipc:: platform:: { AsyncChannel , ShmHandle } ;
14
16
use datadog_ipc:: tarpc;
15
17
use datadog_ipc:: tarpc:: context:: Context ;
@@ -18,25 +20,18 @@ use datadog_trace_utils::trace_utils::SendData;
18
20
use datadog_trace_utils:: tracer_payload:: decode_to_trace_chunks;
19
21
use datadog_trace_utils:: tracer_payload:: TraceEncoding ;
20
22
use ddcommon:: { Endpoint , MutexExt } ;
21
- use ddtelemetry:: worker:: {
22
- LifecycleAction , TelemetryActions , TelemetryWorkerBuilder , TelemetryWorkerStats ,
23
- } ;
23
+ use ddtelemetry:: worker:: { LifecycleAction , TelemetryActions , TelemetryWorkerStats } ;
24
24
use futures:: future;
25
25
use futures:: future:: Ready ;
26
26
use manual_future:: ManualFutureCompleter ;
27
27
use std:: borrow:: Cow ;
28
28
use std:: collections:: hash_map:: Entry ;
29
29
use std:: collections:: { HashMap , HashSet } ;
30
- use std:: ffi:: CString ;
31
- use std:: hash:: { Hash , Hasher } ;
32
- use std:: os:: unix:: ffi:: OsStrExt ;
33
- use std:: path:: PathBuf ;
34
30
use std:: pin:: Pin ;
35
31
use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
36
32
use std:: sync:: { Arc , Mutex } ;
37
- use std:: time:: { Duration , Instant } ;
33
+ use std:: time:: Duration ;
38
34
use tracing:: { debug, error, info, trace, warn} ;
39
- use zwohash:: ZwoHasher ;
40
35
41
36
use futures:: FutureExt ;
42
37
use serde:: { Deserialize , Serialize } ;
@@ -122,107 +117,7 @@ pub struct SidecarServer {
122
117
process_handle : Option < ProcessHandle > ,
123
118
}
124
119
125
- pub fn path_for_telemetry ( service : & str , env : & str , version : & str ) -> CString {
126
- let mut hasher = ZwoHasher :: default ( ) ;
127
- service. hash ( & mut hasher) ;
128
- env. hash ( & mut hasher) ;
129
- version. hash ( & mut hasher) ;
130
- let hash = hasher. finish ( ) ;
131
-
132
- let mut path = format ! (
133
- "/ddtl{}-{}" ,
134
- primary_sidecar_identifier( ) ,
135
- BASE64_URL_SAFE_NO_PAD . encode( hash. to_ne_bytes( ) ) ,
136
- ) ;
137
- path. truncate ( 31 ) ;
138
-
139
- #[ allow( clippy:: unwrap_used) ]
140
- CString :: new ( path) . unwrap ( )
141
- }
142
-
143
120
impl SidecarServer {
144
- pub fn get_or_create_telemetry_client (
145
- & self ,
146
- service : & str ,
147
- env : & str ,
148
- version : & str ,
149
- instance_id : & InstanceId ,
150
- runtime_meta : & RuntimeMetadata ,
151
- ) -> Option < TelemetryCachedClient > {
152
- let key = ( service. to_string ( ) , env. to_string ( ) , version. to_string ( ) ) ;
153
-
154
- let mut clients = self . telemetry_clients . inner . lock_or_panic ( ) ;
155
-
156
- if let Some ( existing) = clients. get_mut ( & key) {
157
- existing. last_used = Instant :: now ( ) ;
158
- let telemetry = existing. clone ( ) ;
159
- tokio:: spawn ( async move {
160
- telemetry
161
- . client
162
- . send_msg ( TelemetryActions :: Lifecycle ( LifecycleAction :: Start ) )
163
- . await
164
- . ok ( ) ;
165
- } ) ;
166
- return Some ( existing. clone ( ) ) ;
167
- }
168
-
169
- // Build and spawn worker
170
- let mut builder = TelemetryWorkerBuilder :: new_fetch_host (
171
- service. to_string ( ) ,
172
- runtime_meta. language_name . to_string ( ) ,
173
- runtime_meta. language_version . to_string ( ) ,
174
- runtime_meta. tracer_version . to_string ( ) ,
175
- ) ;
176
-
177
- builder. runtime_id = Some ( instance_id. runtime_id . clone ( ) ) ;
178
- builder. application . env = Some ( env. to_string ( ) ) ;
179
-
180
- let session_info = self . get_session ( & instance_id. session_id ) ;
181
- let config = session_info
182
- . session_config
183
- . lock_or_panic ( )
184
- . clone ( )
185
- . unwrap_or_else ( ddtelemetry:: config:: Config :: from_env) ;
186
-
187
- builder. config = config. clone ( ) ;
188
-
189
- match builder. spawn ( ) . now_or_never ( ) {
190
- Some ( Ok ( ( handle, _join) ) ) => {
191
- let telemetry_client = TelemetryCachedClient {
192
- client : handle. clone ( ) ,
193
- shmem_file : PathBuf :: from ( std:: ffi:: OsStr :: from_bytes (
194
- path_for_telemetry ( service, env, version) . as_bytes ( ) ,
195
- ) ) ,
196
- last_used : Instant :: now ( ) ,
197
- buffered_integrations : HashSet :: new ( ) ,
198
- buffered_composer_paths : HashSet :: new ( ) ,
199
- telemetry_metrics : Default :: default ( ) ,
200
- } ;
201
- clients. insert ( key, telemetry_client. clone ( ) ) ;
202
-
203
- let telemetry = telemetry_client. clone ( ) ;
204
- tokio:: spawn ( async move {
205
- telemetry
206
- . client
207
- . send_msg ( TelemetryActions :: Lifecycle ( LifecycleAction :: Start ) )
208
- . await
209
- . ok ( ) ;
210
- } ) ;
211
-
212
- info ! ( "spawning telemetry worker {config:?}" ) ;
213
- Some ( telemetry_client)
214
- }
215
- Some ( Err ( e) ) => {
216
- error ! ( "failed to spawn telemetry worker: {:?}" , e) ;
217
- None
218
- }
219
- None => {
220
- error ! ( "failed to spawn telemetry worker: spawn did not complete immediately" ) ;
221
- None
222
- }
223
- }
224
- }
225
-
226
121
/// Accepts a new connection and starts processing requests.
227
122
///
228
123
/// This function creates a new `tarpc` server with the provided `async_channel` and starts
@@ -494,30 +389,41 @@ impl SidecarInterface for SidecarServer {
494
389
let version = value. app_version . clone ( ) . unwrap ( ) ;
495
390
let service = value. service_name . clone ( ) . unwrap ( ) ;
496
391
497
- let mut telemetry = match self . get_or_create_telemetry_client (
392
+ let mut telemetry = match self . telemetry_clients . get_or_create (
498
393
& service,
499
394
& env,
500
395
& version,
501
396
& instance_id,
502
397
& runtime_metadata,
398
+ || {
399
+ self . get_session ( & instance_id. session_id )
400
+ . session_config
401
+ . lock_or_panic ( )
402
+ . clone ( )
403
+ } ,
503
404
) {
504
405
Some ( client) => client,
505
406
None => return no_response ( ) ,
506
407
} ;
507
408
508
409
let mut actions_to_process = vec ! [ ] ;
509
410
let mut composer_paths_to_process = vec ! [ ] ;
411
+ let mut buffered_info_changed = false ;
510
412
511
413
for action in actions {
512
- match & action {
513
- SidecarAction :: Telemetry ( TelemetryActions :: AddIntegration ( integration) ) => {
414
+ match action {
415
+ SidecarAction :: Telemetry ( TelemetryActions :: AddIntegration (
416
+ ref integration,
417
+ ) ) => {
514
418
if telemetry. buffered_integrations . insert ( integration. clone ( ) ) {
515
419
actions_to_process. push ( action) ;
420
+ buffered_info_changed = true ;
516
421
}
517
422
}
518
423
SidecarAction :: PhpComposerTelemetryFile ( path) => {
519
424
if telemetry. buffered_composer_paths . insert ( path. clone ( ) ) {
520
- composer_paths_to_process. push ( path. clone ( ) ) ;
425
+ composer_paths_to_process. push ( path) ;
426
+ buffered_info_changed = true ;
521
427
}
522
428
}
523
429
_ => {
@@ -560,16 +466,13 @@ impl SidecarInterface for SidecarServer {
560
466
} ) ;
561
467
}
562
468
563
- // Synchronous SHM write
564
- if let Ok ( buf) = bincode:: serialize ( & (
565
- & telemetry. buffered_integrations ,
566
- & telemetry. buffered_composer_paths ,
567
- ) ) {
568
- if let Err ( e) = std:: fs:: write ( & telemetry. shmem_file , buf) {
569
- error ! (
570
- "Failed to write telemetry SHM file to {:?}: {:?}" ,
571
- telemetry. shmem_file, e
572
- ) ;
469
+ // Synchronous SHM write if changes have been made
470
+ if buffered_info_changed {
471
+ if let Ok ( buf) = bincode:: serialize ( & (
472
+ & telemetry. buffered_integrations ,
473
+ & telemetry. buffered_composer_paths ,
474
+ ) ) {
475
+ telemetry. shm_writer . write ( & buf) ;
573
476
}
574
477
}
575
478
}
@@ -594,7 +497,7 @@ impl SidecarInterface for SidecarServer {
594
497
) -> Self :: SetSessionConfigFut {
595
498
debug ! ( "Set session config for {session_id} to {config:?}" ) ;
596
499
597
- let session = self . get_session ( & session_id) ;
500
+ let mut session = self . get_session ( & session_id) ;
598
501
#[ cfg( unix) ]
599
502
{
600
503
session. pid . store ( pid, Ordering :: Relaxed ) ;
@@ -604,6 +507,7 @@ impl SidecarInterface for SidecarServer {
604
507
{
605
508
* session. remote_config_notify_function . lock ( ) . unwrap ( ) = remote_config_notify_function;
606
509
}
510
+ session. remote_config_enabled = config. remote_config_enabled ;
607
511
session. modify_telemetry_config ( |cfg| {
608
512
cfg. telemetry_heartbeat_interval = config. telemetry_heartbeat_interval ;
609
513
let endpoint =
@@ -833,9 +737,9 @@ impl SidecarInterface for SidecarServer {
833
737
no_response ( )
834
738
}
835
739
836
- type SetRemoteConfigDataFut = NoResponse ;
740
+ type SetUniversalServiceTagsFut = NoResponse ;
837
741
838
- fn set_remote_config_data (
742
+ fn set_universal_service_tags (
839
743
self ,
840
744
_: Context ,
841
745
instance_id : InstanceId ,
@@ -844,7 +748,7 @@ impl SidecarInterface for SidecarServer {
844
748
env_name : String ,
845
749
app_version : String ,
846
750
global_tags : Vec < Tag > ,
847
- ) -> Self :: SetRemoteConfigDataFut {
751
+ ) -> Self :: SetUniversalServiceTagsFut {
848
752
debug ! ( "Registered remote config metadata: instance {instance_id:?}, queue_id: {queue_id:?}, service: {service_name}, env: {env_name}, version: {app_version}" ) ;
849
753
850
754
let session = self . get_session ( & instance_id. session_id ) ;
@@ -871,16 +775,18 @@ impl SidecarInterface for SidecarServer {
871
775
let runtime_info = session. get_runtime ( & instance_id. runtime_id ) ;
872
776
let mut applications = runtime_info. lock_applications ( ) ;
873
777
let app = applications. entry ( queue_id) . or_default ( ) ;
874
- app. remote_config_guard = Some ( self . remote_configs . add_runtime (
875
- invariants,
876
- * session. remote_config_interval . lock_or_panic ( ) ,
877
- instance_id. runtime_id ,
878
- notify_target,
879
- env_name. clone ( ) ,
880
- service_name. clone ( ) ,
881
- app_version. clone ( ) ,
882
- global_tags. clone ( ) ,
883
- ) ) ;
778
+ if session. remote_config_enabled {
779
+ app. remote_config_guard = Some ( self . remote_configs . add_runtime (
780
+ invariants,
781
+ * session. remote_config_interval . lock_or_panic ( ) ,
782
+ instance_id. runtime_id ,
783
+ notify_target,
784
+ env_name. clone ( ) ,
785
+ service_name. clone ( ) ,
786
+ app_version. clone ( ) ,
787
+ global_tags. clone ( ) ,
788
+ ) ) ;
789
+ }
884
790
app. set_metadata ( env_name, app_version, service_name, global_tags) ;
885
791
886
792
no_response ( )
0 commit comments