11package com .datadog .featureflag ;
22
3- import static datadog .trace .util .AgentThreadFactory .AgentThread .LLMOBS_EVALS_PROCESSOR ;
3+ import static datadog .trace .util .AgentThreadFactory .AgentThread .FEATURE_FLAG_EXPOSURE_PROCESSOR ;
44import static datadog .trace .util .AgentThreadFactory .newAgentThread ;
5+ import static java .util .concurrent .TimeUnit .SECONDS ;
56
6- import com .datadog .featureflag .exposure .ExposureEvent ;
7- import com .datadog .featureflag .exposure .ExposuresRequest ;
87import com .squareup .moshi .JsonAdapter ;
98import com .squareup .moshi .Moshi ;
109import datadog .communication .ddagent .DDAgentFeaturesDiscovery ;
1110import datadog .communication .http .HttpRetryPolicy ;
1211import datadog .communication .http .OkHttpUtils ;
1312import datadog .trace .api .Config ;
13+ import datadog .trace .api .featureflag .FeatureFlagGateway ;
14+ import datadog .trace .api .featureflag .exposure .ExposureEvent ;
15+ import datadog .trace .api .featureflag .exposure .ExposuresRequest ;
16+ import datadog .trace .core .util .LRUCache ;
1417import java .util .ArrayList ;
18+ import java .util .Collections ;
1519import java .util .HashMap ;
1620import java .util .List ;
1721import java .util .Map ;
22+ import java .util .Set ;
1823import java .util .concurrent .TimeUnit ;
1924import okhttp3 .Headers ;
2025import okhttp3 .HttpUrl ;
2732
2833public class ExposureWriterImpl implements ExposureWriter {
2934
35+ private static final Logger LOGGER = LoggerFactory .getLogger (ExposureWriterImpl .class );
36+ private static final int DEFAULT_CAPACITY = 1 << 16 ; // 65536 elements
37+ private static final int DEFAULT_FLUSH_INTERVAL_IN_SECONDS = 1 ;
38+ private static final int FLUSH_THRESHOLD = 50 ;
3039 private static final String EXPOSURES_API_PATH = "api/v2/exposures" ;
3140 private static final String EVP_SUBDOMAIN_HEADER_NAME = "X-Datadog-EVP-Subdomain" ;
3241 private static final String EVP_SUBDOMAIN_HEADER_VALUE = "event-platform-intake" ;
33-
34- private static final Logger log = LoggerFactory . getLogger ( ExposureWriterImpl . class );
42+ private static final HttpRetryPolicy . Factory RETRY_POLICY =
43+ new HttpRetryPolicy . Factory ( 5 , 100 , 2.0 , true );
3544
3645 private final MpscBlockingConsumerArrayQueue <ExposureEvent > queue ;
3746 private final Thread serializerThread ;
3847
39- public ExposureWriterImpl (
48+ public ExposureWriterImpl (final HttpUrl agentUrl , final Config config ) {
49+ this (DEFAULT_CAPACITY , DEFAULT_FLUSH_INTERVAL_IN_SECONDS , SECONDS , agentUrl , config );
50+ }
51+
52+ ExposureWriterImpl (
4053 final int capacity ,
4154 final long flushInterval ,
4255 final TimeUnit timeUnit ,
@@ -59,27 +72,27 @@ public ExposureWriterImpl(
5972 }
6073 final ExposureSerializingHandler serializer =
6174 new ExposureSerializingHandler (queue , flushInterval , timeUnit , url , headers , context );
62- this .serializerThread = newAgentThread (LLMOBS_EVALS_PROCESSOR , serializer );
75+ this .serializerThread = newAgentThread (FEATURE_FLAG_EXPOSURE_PROCESSOR , serializer );
6376 }
6477
6578 @ Override
6679 public void init () {
80+ FeatureFlagGateway .addExposureListener (this );
6781 this .serializerThread .start ();
6882 }
6983
7084 @ Override
7185 public void close () {
86+ FeatureFlagGateway .removeExposureListener (this );
7287 this .serializerThread .interrupt ();
7388 }
7489
7590 @ Override
76- public void write (final ExposureEvent event ) {
91+ public void accept (final ExposureEvent event ) {
7792 queue .offer (event );
7893 }
7994
8095 private static class ExposureSerializingHandler implements Runnable {
81- private static final int FLUSH_THRESHOLD = 50 ;
82-
8396 private final MpscBlockingConsumerArrayQueue <ExposureEvent > queue ;
8497 private final long ticksRequiredToFlush ;
8598 private long lastTicks ;
@@ -90,6 +103,7 @@ private static class ExposureSerializingHandler implements Runnable {
90103 private final Headers headers ;
91104
92105 private final Map <String , String > context ;
106+ private final Set <ExposureEvent > cache ;
93107
94108 private final List <ExposureEvent > buffer = new ArrayList <>();
95109
@@ -101,6 +115,7 @@ public ExposureSerializingHandler(
101115 final Headers headers ,
102116 final Map <String , String > context ) {
103117 this .queue = queue ;
118+ this .cache = Collections .newSetFromMap (new LRUCache <>(queue .capacity ()));
104119 this .jsonAdapter = new Moshi .Builder ().build ().adapter (ExposuresRequest .class );
105120 this .httpClient = new OkHttpClient ();
106121 this .submissionUrl = submissionUrl ;
@@ -110,7 +125,7 @@ public ExposureSerializingHandler(
110125 this .lastTicks = System .nanoTime ();
111126 this .ticksRequiredToFlush = timeUnit .toNanos (flushInterval );
112127
113- log .debug ("starting exposure serializer, url={}" , submissionUrl );
128+ LOGGER .debug ("starting exposure serializer, url={}" , submissionUrl );
114129 }
115130
116131 @ Override
@@ -120,25 +135,36 @@ public void run() {
120135 } catch (InterruptedException e ) {
121136 Thread .currentThread ().interrupt ();
122137 }
123- log .debug (
138+ LOGGER .debug (
124139 "exposure processor worker exited. submitting exposures stopped. unsubmitted exposures left: {}" ,
125- !queuesAreEmpty ());
140+ !queue . isEmpty ());
126141 }
127142
128143 private void runDutyCycle () throws InterruptedException {
129144 final Thread thread = Thread .currentThread ();
130145 while (!thread .isInterrupted ()) {
131- final ExposureEvent event = queue .poll (100 , TimeUnit .MILLISECONDS );
132- if (event != null ) {
133- buffer .add (event );
134- consumeBatch ();
146+ ExposureEvent event ;
147+ while ((event = queue .poll (100 , TimeUnit .MILLISECONDS )) != null ) {
148+ if (addToBuffer (event )) {
149+ consumeBatch ();
150+ break ;
151+ }
135152 }
136153 flushIfNecessary ();
137154 }
138155 }
139156
140157 private void consumeBatch () {
141- queue .drain (buffer ::add , queue .size ());
158+ queue .drain (this ::addToBuffer , queue .size ());
159+ }
160+
161+ /** Adds an element to the buffer taking care of duplicated exposures thanks to the LRU cache */
162+ private boolean addToBuffer (ExposureEvent event ) {
163+ if (cache .add (event )) {
164+ buffer .add (event );
165+ return true ;
166+ }
167+ return false ;
142168 }
143169
144170 protected void flushIfNecessary () {
@@ -147,27 +173,22 @@ protected void flushIfNecessary() {
147173 }
148174 if (shouldFlush ()) {
149175 final ExposuresRequest exposures = new ExposuresRequest (this .context , this .buffer );
150- final HttpRetryPolicy .Factory retryPolicyFactory =
151- new HttpRetryPolicy .Factory (5 , 100 , 2.0 , true );
152176 final String reqBod = jsonAdapter .toJson (exposures );
153177 final RequestBody requestBody =
154178 RequestBody .create (okhttp3 .MediaType .parse ("application/json" ), reqBod );
155179 final Request request =
156180 new Request .Builder ().headers (headers ).url (submissionUrl ).post (requestBody ).build ();
157181 try (okhttp3 .Response response =
158- OkHttpUtils .sendWithRetries (httpClient , retryPolicyFactory , request )) {
159-
182+ OkHttpUtils .sendWithRetries (httpClient , RETRY_POLICY , request )) {
160183 if (response .isSuccessful ()) {
161- log .debug ("successfully flushed exposures request with {} evals" , this .buffer .size ());
184+ LOGGER .debug (
185+ "successfully flushed exposures request with {} evals" , this .buffer .size ());
162186 this .buffer .clear ();
163187 } else {
164- log .error (
165- "Could not submit exposures (HTTP code {}) {}" ,
166- response .code (),
167- response .body () != null ? response .body ().string () : "" );
188+ LOGGER .error ("Could not submit exposures (HTTP code {})" , response .code ());
168189 }
169190 } catch (Exception e ) {
170- log .error ("Could not submit exposures" , e );
191+ LOGGER .error ("Could not submit exposures" , e );
171192 }
172193 }
173194 }
@@ -181,9 +202,5 @@ private boolean shouldFlush() {
181202 }
182203 return false ;
183204 }
184-
185- protected boolean queuesAreEmpty () {
186- return queue .isEmpty ();
187- }
188205 }
189206}
0 commit comments