Skip to content

Commit 7e03de2

Browse files
Add a proper LRU cache for the exposures
1 parent d8de5d9 commit 7e03de2

File tree

13 files changed

+425
-169
lines changed

13 files changed

+425
-169
lines changed

dd-java-agent/agent-featureflag/build.gradle

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,12 @@ java {
1212
targetCompatibility = JavaVersion.VERSION_1_8
1313
}
1414

15+
excludedClassesCoverage += [
16+
// POJOs
17+
'com.datadog.featureflag.ExposureCache.Key',
18+
'com.datadog.featureflag.ExposureCache.Value'
19+
]
20+
1521
dependencies {
1622
api libs.slf4j
1723
implementation libs.moshi
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package com.datadog.featureflag;
2+
3+
import datadog.trace.api.featureflag.exposure.ExposureEvent;
4+
import java.util.Objects;
5+
6+
public interface ExposureCache {
7+
8+
boolean add(ExposureEvent event);
9+
10+
Value get(Key key);
11+
12+
int size();
13+
14+
final class Key {
15+
public final String flag;
16+
public final String subject;
17+
18+
public Key(final ExposureEvent event) {
19+
this.flag = event.flag == null ? null : event.flag.key;
20+
this.subject = event.subject == null ? null : event.subject.id;
21+
}
22+
23+
@Override
24+
public boolean equals(final Object o) {
25+
if (o == null || getClass() != o.getClass()) {
26+
return false;
27+
}
28+
final Key key = (Key) o;
29+
return Objects.equals(flag, key.flag) && Objects.equals(subject, key.subject);
30+
}
31+
32+
@Override
33+
public int hashCode() {
34+
return Objects.hash(flag, subject);
35+
}
36+
}
37+
38+
final class Value {
39+
public final String variant;
40+
public final String allocation;
41+
42+
public Value(final ExposureEvent event) {
43+
this.variant = event.variant == null ? null : event.variant.key;
44+
this.allocation = event.allocation == null ? null : event.allocation.key;
45+
}
46+
47+
@Override
48+
public boolean equals(final Object o) {
49+
if (o == null || getClass() != o.getClass()) {
50+
return false;
51+
}
52+
final Value value = (Value) o;
53+
return Objects.equals(variant, value.variant) && Objects.equals(allocation, value.allocation);
54+
}
55+
56+
@Override
57+
public int hashCode() {
58+
return Objects.hash(variant, allocation);
59+
}
60+
}
61+
}

dd-java-agent/agent-featureflag/src/main/java/com/datadog/featureflag/ExposureWriterImpl.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,10 @@
1313
import datadog.trace.api.featureflag.FeatureFlagGateway;
1414
import datadog.trace.api.featureflag.exposure.ExposureEvent;
1515
import datadog.trace.api.featureflag.exposure.ExposuresRequest;
16-
import datadog.trace.core.util.LRUCache;
1716
import java.util.ArrayList;
18-
import java.util.Collections;
1917
import java.util.HashMap;
2018
import java.util.List;
2119
import java.util.Map;
22-
import java.util.Set;
2320
import java.util.concurrent.TimeUnit;
2421
import okhttp3.Headers;
2522
import okhttp3.HttpUrl;
@@ -35,7 +32,7 @@ public class ExposureWriterImpl implements ExposureWriter {
3532
private static final Logger LOGGER = LoggerFactory.getLogger(ExposureWriterImpl.class);
3633
private static final int DEFAULT_CAPACITY = 1 << 16; // 65536 elements
3734
private static final int DEFAULT_FLUSH_INTERVAL_IN_SECONDS = 1;
38-
private static final int FLUSH_THRESHOLD = 50;
35+
private static final int FLUSH_THRESHOLD = 100;
3936
private static final String EXPOSURES_API_PATH = "api/v2/exposures";
4037
private static final String EVP_SUBDOMAIN_HEADER_NAME = "X-Datadog-EVP-Subdomain";
4138
private static final String EVP_SUBDOMAIN_HEADER_VALUE = "event-platform-intake";
@@ -62,7 +59,7 @@ public ExposureWriterImpl(final HttpUrl agentUrl, final Config config) {
6259
agentUrl.toString()
6360
+ DDAgentFeaturesDiscovery.V2_EVP_PROXY_ENDPOINT
6461
+ EXPOSURES_API_PATH);
65-
final Map<String, String> context = new HashMap<>();
62+
final Map<String, String> context = new HashMap<>(4);
6663
context.put("service", config.getServiceName() == null ? "unknown" : config.getServiceName());
6764
if (config.getEnv() != null) {
6865
context.put("env", config.getEnv());
@@ -103,7 +100,7 @@ private static class ExposureSerializingHandler implements Runnable {
103100
private final Headers headers;
104101

105102
private final Map<String, String> context;
106-
private final Set<ExposureEvent> cache;
103+
private final ExposureCache cache;
107104

108105
private final List<ExposureEvent> buffer = new ArrayList<>();
109106

@@ -115,7 +112,7 @@ public ExposureSerializingHandler(
115112
final Headers headers,
116113
final Map<String, String> context) {
117114
this.queue = queue;
118-
this.cache = Collections.newSetFromMap(new LRUCache<>(queue.capacity()));
115+
this.cache = new LRUExposureCache(queue.capacity());
119116
this.jsonAdapter = new Moshi.Builder().build().adapter(ExposuresRequest.class);
120117
this.httpClient = new OkHttpClient();
121118
this.submissionUrl = submissionUrl;
@@ -159,7 +156,7 @@ private void consumeBatch() {
159156
}
160157

161158
/** Adds an element to the buffer taking care of duplicated exposures thanks to the LRU cache */
162-
private boolean addToBuffer(ExposureEvent event) {
159+
private boolean addToBuffer(final ExposureEvent event) {
163160
if (cache.add(event)) {
164161
buffer.add(event);
165162
return true;
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
package com.datadog.featureflag;
2+
3+
import datadog.trace.api.featureflag.exposure.ExposureEvent;
4+
import java.util.LinkedHashMap;
5+
import java.util.Map;
6+
7+
public class LRUExposureCache implements ExposureCache {
8+
9+
private final Map<Key, Value> cache;
10+
11+
public LRUExposureCache(final int capacity) {
12+
cache = new FIFOCache<>(capacity);
13+
}
14+
15+
@Override
16+
public boolean add(final ExposureEvent event) {
17+
final Key key = new Key(event);
18+
final Value oldValue = cache.get(key);
19+
if (oldValue == null) {
20+
cache.put(key, new Value(event));
21+
return true;
22+
}
23+
final Value newValue = new Value(event);
24+
if (!newValue.equals(oldValue)) {
25+
cache.remove(key); // ensure LRU semantics
26+
cache.put(key, newValue);
27+
return true;
28+
}
29+
return false;
30+
}
31+
32+
@Override
33+
public Value get(final Key key) {
34+
return cache.get(key);
35+
}
36+
37+
@Override
38+
public int size() {
39+
return cache.size();
40+
}
41+
42+
private static class FIFOCache<K, V> extends LinkedHashMap<K, V> {
43+
44+
private final int capacity;
45+
46+
private FIFOCache(final int capacity) {
47+
this.capacity = capacity;
48+
}
49+
50+
@Override
51+
protected boolean removeEldestEntry(final Map.Entry<K, V> eldest) {
52+
return size() > capacity;
53+
}
54+
}
55+
}

dd-java-agent/agent-featureflag/src/test/groovy/com/datadog/featureflag/ExposureWriterTests.groovy

Lines changed: 73 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ class ExposureWriterTests extends DDSpecification {
6969
getEnv() >> env
7070
getVersion() >> version
7171
}
72-
def exposures = (1..5).collect { buildExposure(System.currentTimeMillis(), it) }
72+
def exposures = (1..5).collect { buildExposure() }
7373
def writer = new ExposureWriterImpl(1 << 4, 100, MILLISECONDS, serverUrl, config)
7474
writer.init()
7575

@@ -89,7 +89,7 @@ class ExposureWriterTests extends DDSpecification {
8989
}
9090
}
9191
final received = requests*.exposures.flatten() as List<ExposureEvent>
92-
assert received.containsAll(exposures)
92+
assertExposures(received, exposures)
9393
}
9494

9595
cleanup:
@@ -105,30 +105,29 @@ class ExposureWriterTests extends DDSpecification {
105105

106106
void 'test lru cache'() {
107107
setup:
108-
def ts = System.currentTimeMillis()
109-
def exposures = (0..5).collect { buildExposure(ts, it) }
108+
def exposures = (0..5).collect { buildExposure() }
110109
def writer = new ExposureWriterImpl(1 << 4, 100, MILLISECONDS, serverUrl, Config.get())
111110
writer.init()
112111

113112
when: 'populating the cache'
114113
exposures.each { writer.accept(it) }
115114

116-
then: 'all exposures are written'
117-
poll.eventually {
115+
then: 'all events are written'
116+
new PollingConditions(timeout: 1).eventually {
118117
requests*.exposures.flatten().size() == exposures.size()
119118
}
120119

121-
when: 'publishing duplicate exposures'
120+
when: 'publishing duplicate events'
122121
exposures.each { writer.accept(it) }
123122

124-
then: 'no exposures are written'
123+
then: 'no events are written'
125124
MILLISECONDS.sleep(300) // wait until a flush happens
126125
requests*.exposures.flatten().size() == exposures.size()
127126

128-
when: 'a new exposure is generated'
129-
writer.accept(buildExposure(System.currentTimeMillis(), 'new'))
127+
when: 'a new event is generated'
128+
writer.accept(buildExposure())
130129

131-
then: 'oldest exposure is evicted and the new one is submitted'
130+
then: 'oldest event is evicted and the new one is submitted'
132131
poll.eventually {
133132
requests*.exposures.flatten().size() == exposures.size() + 1
134133
}
@@ -144,7 +143,7 @@ class ExposureWriterTests extends DDSpecification {
144143
def threads = Runtime.runtime.availableProcessors()
145144
def executor = Executors.newFixedThreadPool(threads)
146145
def exposures = (1..(threads * exposuresPerThread)).collect {
147-
buildExposure(System.currentTimeMillis(), it)
146+
buildExposure()
148147
}
149148
def latch = new CountDownLatch(1)
150149
def writer = new ExposureWriterImpl(serverUrl, Config.get())
@@ -166,11 +165,8 @@ class ExposureWriterTests extends DDSpecification {
166165
then:
167166
futures.each { it.get() } // wait for all threads to finish
168167
poll.eventually {
169-
final received = requests*.exposures.flatten()
170-
final receivedSize = received.size()
171-
final expectedSize = exposures.size()
172-
assert receivedSize == expectedSize
173-
assert received.containsAll(exposures)
168+
final received = requests*.exposures.flatten() as List<ExposureEvent>
169+
assertExposures(received, exposures)
174170
}
175171

176172
cleanup:
@@ -187,7 +183,7 @@ class ExposureWriterTests extends DDSpecification {
187183
writer.init()
188184

189185
when:
190-
writer.accept(buildExposure(System.currentTimeMillis(), serviceName))
186+
writer.accept(buildExposure())
191187

192188
then:
193189
MILLISECONDS.sleep(500) // wait for a flush to happen
@@ -207,13 +203,66 @@ class ExposureWriterTests extends DDSpecification {
207203
'fail-forever' | true
208204
}
209205

210-
private static ExposureEvent buildExposure(final long ts, final Object suffix) {
206+
private static void assertExposures(final List<ExposureEvent> receivedExposures, final List<ExposureEvent> expectedExposures) {
207+
assert receivedExposures.size() == expectedExposures.size()
208+
final received = new TreeSet<ExposureEvent>(ExposureWriterTests::compare)
209+
received.addAll(expectedExposures)
210+
assert received.containsAll(expectedExposures)
211+
}
212+
213+
private static int compare(final ExposureEvent a, final ExposureEvent b) {
214+
if (a.is(b)) {
215+
return 0
216+
}
217+
if (a == null) {
218+
return -1
219+
}
220+
if (b == null) {
221+
return 1
222+
}
223+
224+
def result = a.timestamp <=> b.timestamp
225+
if (result) {
226+
return result
227+
}
228+
229+
result = (a.flag?.key ?: '') <=> (b.flag?.key ?: '')
230+
if (result) {
231+
return result
232+
}
233+
234+
result = (a.variant?.key ?: '') <=> (b.variant?.key ?: '')
235+
if (result) {
236+
return result
237+
}
238+
239+
result = (a.allocation?.key ?: '') <=> (b.allocation?.key ?: '')
240+
if (result) {
241+
return result
242+
}
243+
244+
result = (a.subject?.id ?: '') <=> (b.subject?.id ?: '')
245+
if (result) {
246+
return result
247+
}
248+
249+
final aEntry = a.subject?.attributes?.entrySet()?.iterator()?.next()
250+
final bEntry = b.subject?.attributes?.entrySet()?.iterator()?.next()
251+
result = (aEntry?.key ?: '') <=> (bEntry?.key ?: '')
252+
if (result) {
253+
return result
254+
}
255+
return (aEntry?.value?.toString() ?: '') <=> (bEntry?.value?.toString() ?: '')
256+
}
257+
258+
private static ExposureEvent buildExposure() {
259+
final idx = UUID.randomUUID().toString()
211260
return new ExposureEvent(
212-
ts,
213-
new Allocation("Allocation_$suffix"),
214-
new Flag("Flag_$suffix"),
215-
new Variant("Variant_$suffix"),
216-
new Subject("Subject_$suffix", [("key_$suffix".toString()): "value_$suffix".toString()])
261+
System.currentTimeMillis(),
262+
new Allocation("Allocation_$idx"),
263+
new Flag("Flag_$idx"),
264+
new Variant("Variant_$idx"),
265+
new Subject("Subject_$idx", [("key_$idx".toString()): "value_$idx".toString()])
217266
)
218267
}
219268
}

0 commit comments

Comments
 (0)