Skip to content

Commit 65ddfed

Browse files
ZihanLi58Zihan Li
and
Zihan Li
authored
GOBBLIN-1933]Change the logic in completeness verifier to support multi reference tier (apache#3806)
* address comments * use connectionmanager when httpclient is not cloesable * [GOBBLIN-1933] Change the logic in completeness verifier to support multi reference tier * add uite test * fix typo * change the javadoc * change the javadoc --------- Co-authored-by: Zihan Li <[email protected]>
1 parent dc785d1 commit 65ddfed

File tree

4 files changed

+73
-18
lines changed

4 files changed

+73
-18
lines changed

gobblin-completeness/src/main/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifier.java

+8-4
Original file line numberDiff line numberDiff line change
@@ -156,12 +156,13 @@ private double calculateCompleteness(String datasetName, long beginInMillis, lon
156156

157157
/**
158158
* Compare source tier against reference tiers. For each reference tier, calculates percentage by srcCount/refCount.
159-
*
159+
* We will return the lowest value, which, in other words, we will wait until src tier catches up to all reference
160+
* tiers (upto 99.9%) to mark that hour as completed.
160161
* @param datasetName A dataset short name like 'PageViewEvent'
161162
* @param beginInMillis Unix timestamp in milliseconds
162163
* @param endInMillis Unix timestamp in milliseconds
163164
*
164-
* @return The highest percentage value
165+
* @return The lowest percentage value
165166
*/
166167
private double calculateClassicCompleteness(String datasetName, long beginInMillis, long endInMillis,
167168
Map<String, Long> countsByTier) throws IOException {
@@ -171,16 +172,19 @@ private double calculateClassicCompleteness(String datasetName, long beginInMill
171172
for (String refTier: this.refTiers) {
172173
long refCount = countsByTier.get(refTier);
173174
long srcCount = countsByTier.get(this.srcTier);
175+
double tmpPercent;
174176

175177
/*
176178
If we have a case where an audit map is returned, however, one of the source tiers on another fabric is 0,
177179
and the reference tiers from Kafka is reported to be 0, we can say that this hour is complete.
178180
This needs to be added as a non-zero double value divided by 0 is infinity, but 0 divided by 0 is NaN.
179181
*/
180182
if (srcCount == 0 && refCount == 0) {
181-
return 1.0;
183+
tmpPercent = 1;
184+
} else {
185+
tmpPercent = (double) srcCount / (double) refCount;
182186
}
183-
percent = Double.max(percent, (double) srcCount / (double) refCount);
187+
percent = percent < 0 ? tmpPercent : Double.min(percent, tmpPercent);
184188
}
185189

186190
if (percent < 0) {

gobblin-completeness/src/test/java/org/apache/gobblin/completeness/verifier/KafkaAuditCountVerifierTest.java

+59-10
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@
3131
public class KafkaAuditCountVerifierTest {
3232

3333
public static final String SOURCE_TIER = "gobblin";
34-
public static final String REFERENCE_TIERS = "producer";
34+
public static final String REFERENCE_TIER = "producer";
35+
public static final String REFERENCE_TIER_1 = "producer_reference";
36+
public static final String REFERENCE_TIERS = REFERENCE_TIER + "," + REFERENCE_TIER_1;
3537

3638
public static final String TOTAL_COUNT_REF_TIER_0 = "producer_0";
3739
public static final String TOTAL_COUNT_REF_TIER_1 = "producer_1";
@@ -50,7 +52,8 @@ public void testFetch() throws IOException {
5052
// All complete
5153
client.setTierCounts(ImmutableMap.of(
5254
SOURCE_TIER, 1000L,
53-
REFERENCE_TIERS, 1000L
55+
REFERENCE_TIER, 1000L,
56+
REFERENCE_TIER_1, 1000L
5457
));
5558
// Default threshold
5659
Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 0L)
@@ -59,15 +62,17 @@ public void testFetch() throws IOException {
5962
// 99.999 % complete
6063
client.setTierCounts(ImmutableMap.of(
6164
SOURCE_TIER, 999L,
62-
REFERENCE_TIERS, 1000L
65+
REFERENCE_TIER, 1000L,
66+
REFERENCE_TIER_1, 1000L
6367
));
6468
Assert.assertTrue(verifier.calculateCompleteness(topic, 0L, 0L)
6569
.get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));
6670

6771
// <= 99% complete
6872
client.setTierCounts(ImmutableMap.of(
6973
SOURCE_TIER, 990L,
70-
REFERENCE_TIERS, 1000L
74+
REFERENCE_TIER, 1000L,
75+
REFERENCE_TIER_1, 1000L
7176
));
7277
Assert.assertFalse(verifier.calculateCompleteness(topic, 0L, 0L)
7378
.get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));
@@ -86,7 +91,8 @@ public void testTotalCountCompleteness() throws IOException {
8691
// All complete
8792
client.setTierCounts(ImmutableMap.of(
8893
SOURCE_TIER, 1000L,
89-
REFERENCE_TIERS, 1000L,
94+
REFERENCE_TIER, 1000L,
95+
REFERENCE_TIER_1, 1000L,
9096
TOTAL_COUNT_REF_TIER_0, 600L,
9197
TOTAL_COUNT_REF_TIER_1, 400L
9298
));
@@ -97,7 +103,8 @@ public void testTotalCountCompleteness() throws IOException {
97103
// 99.999 % complete
98104
client.setTierCounts(ImmutableMap.of(
99105
SOURCE_TIER, 999L,
100-
REFERENCE_TIERS, 1000L,
106+
REFERENCE_TIER, 1000L,
107+
REFERENCE_TIER_1, 1000L,
101108
TOTAL_COUNT_REF_TIER_0, 600L,
102109
TOTAL_COUNT_REF_TIER_1, 400L
103110
));
@@ -107,7 +114,8 @@ public void testTotalCountCompleteness() throws IOException {
107114
// <= 99% complete
108115
client.setTierCounts(ImmutableMap.of(
109116
SOURCE_TIER, 990L,
110-
REFERENCE_TIERS, 1000L,
117+
REFERENCE_TIER, 1000L,
118+
REFERENCE_TIER_1, 1000L,
111119
TOTAL_COUNT_REF_TIER_0, 600L,
112120
TOTAL_COUNT_REF_TIER_1, 400L
113121
));
@@ -140,7 +148,8 @@ public void testEmptyAuditCount() throws IOException {
140148
client.setTierCounts(
141149
ImmutableMap.of(
142150
SOURCE_TIER, 990L,
143-
REFERENCE_TIERS, 0L,
151+
REFERENCE_TIER, 0L,
152+
REFERENCE_TIER_1, 0L,
144153
TOTAL_COUNT_REF_TIER_0, 0L,
145154
TOTAL_COUNT_REF_TIER_1, 0L
146155
));
@@ -153,7 +162,8 @@ public void testEmptyAuditCount() throws IOException {
153162
client.setTierCounts(
154163
ImmutableMap.of(
155164
SOURCE_TIER, 0L,
156-
REFERENCE_TIERS, 0L,
165+
REFERENCE_TIER, 0L,
166+
REFERENCE_TIER_1, 0L,
157167
TOTAL_COUNT_REF_TIER_0, 0L,
158168
TOTAL_COUNT_REF_TIER_1, 0L
159169
));
@@ -175,7 +185,8 @@ public void testOneCountFailed() throws IOException {
175185
// Missing total count tier which will throw exception
176186
client.setTierCounts(ImmutableMap.of(
177187
SOURCE_TIER, 999L,
178-
REFERENCE_TIERS, 1000L
188+
REFERENCE_TIER, 1000L,
189+
REFERENCE_TIER_1, 1000L
179190
));
180191

181192
// Classic completeness is still returned, but total is missing
@@ -184,4 +195,42 @@ public void testOneCountFailed() throws IOException {
184195
Assert.assertFalse(verifier.calculateCompleteness(topic, 0L, 0L)
185196
.containsKey(KafkaAuditCountVerifier.CompletenessType.TotalCountCompleteness));
186197
}
198+
199+
public void testDifferentValueInReferenceTier() throws IOException {
200+
final String topic = "testTopic";
201+
State props = new State();
202+
props.setProp(KafkaAuditCountVerifier.SOURCE_TIER, SOURCE_TIER);
203+
props.setProp(KafkaAuditCountVerifier.REFERENCE_TIERS, REFERENCE_TIERS);
204+
props.setProp(KafkaAuditCountVerifier.TOTAL_COUNT_REFERENCE_TIERS, TOTAL_COUNT_REFERENCE_TIERS);
205+
props.setProp(KafkaAuditCountVerifier.THRESHOLD, ".99");
206+
props.setProp(KafkaAuditCountVerifier.COMPLETE_ON_NO_COUNTS, true);
207+
TestAuditClient client = new TestAuditClient(props);
208+
KafkaAuditCountVerifier verifier = new KafkaAuditCountVerifier(props, client);
209+
210+
// Different value in reference tier
211+
client.setTierCounts(ImmutableMap.of(
212+
SOURCE_TIER, 999L,
213+
REFERENCE_TIER, 1000L,
214+
REFERENCE_TIER_1, 2000L
215+
));
216+
217+
// Classic completeness is fail as 999/2000 < 99.9%
218+
Assert.assertFalse(verifier.calculateCompleteness(topic, 0L, 0L)
219+
.get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));
220+
221+
// Different value in reference tier and one tier has 0 in count
222+
client.setTierCounts(ImmutableMap.of(
223+
SOURCE_TIER, 999L,
224+
REFERENCE_TIER, 0L,
225+
REFERENCE_TIER_1, 2000L
226+
));
227+
228+
// Classic completeness is fail as 999/2000 < 99.9%
229+
Assert.assertFalse(verifier.calculateCompleteness(topic, 0L, 0L)
230+
.get(KafkaAuditCountVerifier.CompletenessType.ClassicCompleteness));
231+
232+
233+
234+
}
235+
187236
}

gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaIngestionHealthCheck.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,8 @@ private String getHealthCheckReport() {
137137
public void execute() {
138138
this.ingestionLatencies.add(this.statsTracker.getMaxIngestionLatency(TimeUnit.MINUTES));
139139
this.consumptionRateMBps.add(this.statsTracker.getConsumptionRateMBps());
140+
double avgConsumptionRate = getMaxConsumptionRate();
141+
log.info("Avg. Consumption Rate = {} MBps, Target Consumption rate = {} MBps", avgConsumptionRate, this.expectedConsumptionRate);
140142
if (ingestionLatencies.size() < this.slidingWindowSize) {
141143
log.info("SUCCESS: Num observations: {} smaller than {}", ingestionLatencies.size(), this.slidingWindowSize);
142144
return;
@@ -146,8 +148,6 @@ public void execute() {
146148
log.info("SUCCESS: Ingestion Latencies = {}, Ingestion Latency Threshold: {}", this.ingestionLatencies.toString(), this.ingestionLatencyThresholdMinutes);
147149
return;
148150
}
149-
150-
double avgConsumptionRate = getMaxConsumptionRate();
151151
if (avgConsumptionRate > this.consumptionRateDropOffFraction * this.expectedConsumptionRate) {
152152
log.info("SUCCESS: Avg. Consumption Rate = {} MBps, Target Consumption rate = {} MBps", avgConsumptionRate, this.expectedConsumptionRate);
153153
return;

gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/workunit/packer/KafkaTopicGroupingWorkUnitPacker.java

+4-2
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,8 @@
7070
@Slf4j
7171
public class KafkaTopicGroupingWorkUnitPacker extends KafkaWorkUnitPacker {
7272
public static final String GOBBLIN_KAFKA_PREFIX = "gobblin.kafka.";
73-
private static final int DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER = 10;
73+
public static final String DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER_KEY = GOBBLIN_KAFKA_PREFIX + "default.num.topic.partitions.per.container";
74+
private static final int DEFAULT_DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER = 10;
7475

7576
//A global configuration for container capacity. The container capacity refers to the peak rate (in MB/s) that a
7677
//single JVM can consume from Kafka for a single topic and controls the number of partitions of a topic that will be
@@ -198,6 +199,7 @@ public List<WorkUnit> pack(Map<String, List<WorkUnit>> workUnitsByTopic, int num
198199
}
199200
//Add CONTAINER_CAPACITY into each workunit. Useful when KafkaIngestionHealthCheck is enabled.
200201
for (WorkUnit workUnit: workUnitsForTopic) {
202+
//todo: check whether it's set already to respect the topic specific capacity from user input properties
201203
workUnit.setProp(CONTAINER_CAPACITY_KEY, containerCapacity);
202204
}
203205
double estimatedDataSizeForTopic = calcTotalEstSizeForTopic(workUnitsForTopic);
@@ -293,7 +295,7 @@ private void addStatsToWorkUnits(Map<String, List<WorkUnit>> workUnitsByTopic) t
293295

294296
private Double getDefaultWorkUnitSize() {
295297
return state.getPropAsDouble(KafkaTopicGroupingWorkUnitPacker.CONTAINER_CAPACITY_KEY,
296-
KafkaTopicGroupingWorkUnitPacker.DEFAULT_CONTAINER_CAPACITY) / DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER;
298+
KafkaTopicGroupingWorkUnitPacker.DEFAULT_CONTAINER_CAPACITY) / state.getPropAsDouble(DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER_KEY, DEFAULT_DEFAULT_NUM_TOPIC_PARTITIONS_PER_CONTAINER);
297299
}
298300

299301
/**

0 commit comments

Comments
 (0)