Skip to content

Commit 36cdf2b

Browse files
ZihanLi58Zihan Li
and
Zihan Li
authored
[GOBBLIN-2040] Abstract comparable watermark (apache#3919)
* address comments * use connectionmanager when httpclient is not cloesable * [GOBBLIN-2040]Abstract comparable watermark * address comments --------- Co-authored-by: Zihan Li <[email protected]>
1 parent 62f645c commit 36cdf2b

File tree

8 files changed

+41
-30
lines changed

8 files changed

+41
-30
lines changed

gobblin-api/src/main/java/org/apache/gobblin/source/extractor/ComparableWatermark.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
/**
2121
* {@link Watermark} that is also {@link Comparable}.
2222
*/
23-
public interface ComparableWatermark extends Watermark, Comparable<ComparableWatermark>{
23+
public interface ComparableWatermark<V extends Comparable<V>> extends Watermark, Comparable<ComparableWatermark>{
24+
25+
V getValue();
2426

2527
}

gobblin-core-base/src/main/java/org/apache/gobblin/source/extractor/extract/LongWatermark.java

+12-3
Original file line numberDiff line numberDiff line change
@@ -29,20 +29,29 @@
2929
import org.apache.gobblin.source.extractor.Watermark;
3030

3131
import lombok.EqualsAndHashCode;
32-
import lombok.Getter;
3332
import lombok.Setter;
3433
import lombok.ToString;
3534

35+
36+
/**
37+
* Long based {@link ComparableWatermark} implementation.
38+
*/
3639
@ToString
3740
@EqualsAndHashCode
38-
public class LongWatermark implements ComparableWatermark {
41+
public class LongWatermark implements ComparableWatermark<Long> {
3942

4043
private static final Gson GSON = new Gson();
4144

42-
@Getter
4345
@Setter
4446
private long value;
4547

48+
@Override
49+
// Returns a Long object due to Java generics' requirement for object types.
50+
// The underlying variable is maintained as a primitive long to optimize performance for mathematical operations.
51+
public Long getValue(){
52+
return value;
53+
}
54+
4655
public LongWatermark(long value) {
4756
this.value = value;
4857
}

gobblin-core-base/src/test/java/org/apache/gobblin/writer/FineGrainedWatermarkTrackerTest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ private static void verifyCommitables(FineGrainedWatermarkTracker tracker, Sorte
9898
} else {
9999
Assert.assertEquals(uncommitted.size(), 1);
100100
CheckpointableWatermark uncommitable = uncommitted.get("default");
101-
Assert.assertEquals(((LongWatermark) uncommitable.getWatermark()).getValue(), (long) holes.first());
101+
Assert.assertEquals((long)((LongWatermark) uncommitable.getWatermark()).getValue(), (long)holes.first());
102102
}
103103

104104
Map<String, CheckpointableWatermark> commitables = tracker.getCommittableWatermarks();
@@ -109,9 +109,9 @@ private static void verifyCommitables(FineGrainedWatermarkTracker tracker, Sorte
109109
Assert.assertEquals(commitables.size(), 1);
110110
CheckpointableWatermark commitable = commitables.get("default");
111111
if (holes.isEmpty()) {
112-
Assert.assertEquals(((LongWatermark) commitable.getWatermark()).getValue(), maxWatermark);
112+
Assert.assertEquals((long)((LongWatermark) commitable.getWatermark()).getValue(), maxWatermark);
113113
} else {
114-
Assert.assertEquals(((LongWatermark) commitable.getWatermark()).getValue(), holes.first() - 1);
114+
Assert.assertEquals((long)((LongWatermark) commitable.getWatermark()).getValue(), holes.first() - 1);
115115
}
116116
}
117117

gobblin-core-base/src/test/java/org/apache/gobblin/writer/WatermarkTrackerTest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public void testSingleSource() {
4242
commits(watermarkTracker, "default", 0, 4, 5, 6);
4343

4444
Assert.assertEquals(watermarkTracker.getCommittableWatermark("default").get().getSource(), "default");
45-
Assert.assertEquals(((LongWatermark) watermarkTracker.getCommittableWatermark("default")
45+
Assert.assertEquals((long)((LongWatermark) watermarkTracker.getCommittableWatermark("default")
4646
.get().getWatermark()).getValue(), 6L);
4747
}
4848

@@ -54,10 +54,10 @@ public void testMultiSource() {
5454
commits(watermarkTracker, "other", 1, 3, 5, 7);
5555

5656
Assert.assertEquals(watermarkTracker.getCommittableWatermark("default").get().getSource(), "default");
57-
Assert.assertEquals(((LongWatermark) watermarkTracker.getCommittableWatermark("default")
57+
Assert.assertEquals((long)((LongWatermark) watermarkTracker.getCommittableWatermark("default")
5858
.get().getWatermark()).getValue(), 6L);
5959
Assert.assertEquals(watermarkTracker.getCommittableWatermark("other").get().getSource(), "other");
60-
Assert.assertEquals(((LongWatermark) watermarkTracker.getCommittableWatermark("other")
60+
Assert.assertEquals((long)((LongWatermark) watermarkTracker.getCommittableWatermark("other")
6161
.get().getWatermark()).getValue(), 7L);
6262

6363
}

gobblin-data-management/src/main/java/org/apache/gobblin/data/management/copy/watermark/StringWatermark.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -20,24 +20,24 @@
2020
import com.google.common.base.Preconditions;
2121
import com.google.gson.JsonElement;
2222

23+
import lombok.Getter;
2324
import org.apache.gobblin.source.extractor.ComparableWatermark;
2425
import org.apache.gobblin.source.extractor.Watermark;
2526
import org.apache.gobblin.source.extractor.WatermarkSerializerHelper;
2627

2728
import lombok.AllArgsConstructor;
2829
import lombok.EqualsAndHashCode;
29-
import lombok.Getter;
3030

3131

3232
/**
3333
* String based {@link ComparableWatermark} implementation.
3434
*/
3535
@AllArgsConstructor
3636
@EqualsAndHashCode
37-
public class StringWatermark implements ComparableWatermark {
37+
public class StringWatermark implements ComparableWatermark<String> {
3838

3939
@Getter
40-
String value;
40+
private String value;
4141

4242
@Override
4343
public int compareTo(ComparableWatermark other) {

gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/watermarker/PartitionLevelWatermarkerTest.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -96,11 +96,11 @@ public void testExpectedHighWatermarkNoPreviousState() throws Exception {
9696
List<WorkUnit> workunits = Lists.newArrayList();
9797
watermarker.onGetWorkunitsEnd(workunits);
9898

99-
Assert.assertEquals(watermarker.getPreviousHighWatermark(part1).getValue(), 0l);
100-
Assert.assertEquals(watermarker.getPreviousHighWatermark(table).getValue(), 0l);
99+
Assert.assertEquals((long)watermarker.getPreviousHighWatermark(part1).getValue(), 0l);
100+
Assert.assertEquals((long)watermarker.getPreviousHighWatermark(table).getValue(), 0l);
101101

102-
Assert.assertEquals(watermarker.getPreviousHighWatermark(part2).getValue(), 0l);
103-
Assert.assertEquals(watermarker.getPreviousHighWatermark(table2).getValue(), 0l);
102+
Assert.assertEquals((long)watermarker.getPreviousHighWatermark(part2).getValue(), 0l);
103+
Assert.assertEquals((long)watermarker.getPreviousHighWatermark(table2).getValue(), 0l);
104104

105105
Assert.assertEquals(workunits.size(), 2);
106106

gobblin-data-management/src/test/java/org/apache/gobblin/data/management/source/LoopingDatasetFinderSourceTest.java

+12-12
Original file line numberDiff line numberDiff line change
@@ -205,17 +205,17 @@ public void testNonDrilldownDatasetState()
205205
List<LongWatermark> watermarks1 = new ArrayList<>();
206206
List<Dataset> datasets1 = new ArrayList<>();
207207
Assert.assertEquals(workUnits.get(0).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset1");
208-
Assert.assertEquals(workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(), 0);
208+
Assert.assertEquals((long)workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(), 0);
209209
watermarks1.add(workUnits.get(0).getExpectedHighWatermark(LongWatermark.class));
210210
datasets1.add(dataset1);
211211

212212
Assert.assertEquals(workUnits.get(1).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset2");
213-
Assert.assertEquals(workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(), 0);
213+
Assert.assertEquals((long)workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(), 0);
214214
watermarks1.add(workUnits.get(1).getExpectedHighWatermark(LongWatermark.class));
215215
datasets1.add(dataset2);
216216

217217
Assert.assertEquals(workUnits.get(2).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset3");
218-
Assert.assertEquals(workUnits.get(2).getLowWatermark(LongWatermark.class).getValue(), 0);
218+
Assert.assertEquals((long)workUnits.get(2).getLowWatermark(LongWatermark.class).getValue(), 0);
219219
watermarks1.add(workUnits.get(2).getExpectedHighWatermark(LongWatermark.class));
220220
datasets1.add(dataset3);
221221

@@ -244,12 +244,12 @@ public void testNonDrilldownDatasetState()
244244

245245
Assert.assertEquals(workUnits.size(), 3);
246246
Assert.assertEquals(workUnits.get(0).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset4");
247-
Assert.assertEquals(workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(), 0);
247+
Assert.assertEquals((long)workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(), 0);
248248
watermarks2.add(workUnits.get(0).getExpectedHighWatermark(LongWatermark.class));
249249
datasets2.add(dataset4);
250250

251251
Assert.assertEquals(workUnits.get(1).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset5");
252-
Assert.assertEquals(workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(), 0);
252+
Assert.assertEquals((long)workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(), 0);
253253
watermarks2.add(workUnits.get(1).getExpectedHighWatermark(LongWatermark.class));
254254
datasets2.add(dataset5);
255255

@@ -327,17 +327,17 @@ public void testDrilldownDatasetState()
327327

328328
Assert.assertEquals(workUnits.size(), 4);
329329
Assert.assertEquals(workUnits.get(0).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset1");
330-
Assert.assertEquals(workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(), 0);
330+
Assert.assertEquals((long)workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(), 0);
331331
watermarks1.add(workUnits.get(0).getExpectedHighWatermark(LongWatermark.class));
332332
datasets1.add(dataset1);
333333

334334
Assert.assertEquals(workUnits.get(1).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset2@p1");
335-
Assert.assertEquals(workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(), 0);
335+
Assert.assertEquals((long)workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(), 0);
336336
watermarks1.add(workUnits.get(1).getExpectedHighWatermark(LongWatermark.class));
337337
datasets1.add(new SimpleDatasetForTesting("dataset2@p1"));
338338

339339
Assert.assertEquals(workUnits.get(2).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset2@p2");
340-
Assert.assertEquals(workUnits.get(2).getLowWatermark(LongWatermark.class).getValue(), 0);
340+
Assert.assertEquals((long)workUnits.get(2).getLowWatermark(LongWatermark.class).getValue(), 0);
341341
watermarks1.add(workUnits.get(2).getExpectedHighWatermark(LongWatermark.class));
342342
datasets1.add(new SimpleDatasetForTesting("dataset2@p2"));
343343

@@ -367,17 +367,17 @@ public void testDrilldownDatasetState()
367367

368368
Assert.assertEquals(workUnits.size(), 4);
369369
Assert.assertEquals(workUnits.get(0).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset2@p3");
370-
Assert.assertEquals(workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(), 0);
370+
Assert.assertEquals((long)workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(), 0);
371371
watermarks2.add(workUnits.get(0).getExpectedHighWatermark(LongWatermark.class));
372372
datasets2.add(new SimpleDatasetForTesting("dataset2@p3"));
373373

374374
Assert.assertEquals(workUnits.get(1).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset3@p1");
375-
Assert.assertEquals(workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(), 0);
375+
Assert.assertEquals((long)workUnits.get(1).getLowWatermark(LongWatermark.class).getValue(), 0);
376376
watermarks2.add(workUnits.get(1).getExpectedHighWatermark(LongWatermark.class));
377377
datasets2.add(new SimpleDatasetForTesting("dataset3@p1"));
378378

379379
Assert.assertEquals(workUnits.get(2).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset3@p2");
380-
Assert.assertEquals(workUnits.get(2).getLowWatermark(LongWatermark.class).getValue(), 0);
380+
Assert.assertEquals((long)workUnits.get(2).getLowWatermark(LongWatermark.class).getValue(), 0);
381381
watermarks2.add(workUnits.get(2).getExpectedHighWatermark(LongWatermark.class));
382382
datasets2.add(new SimpleDatasetForTesting("dataset3@p2"));
383383

@@ -404,7 +404,7 @@ public void testDrilldownDatasetState()
404404

405405
Assert.assertEquals(workUnits.size(), 2);
406406
Assert.assertEquals(workUnits.get(0).getProp(ConfigurationKeys.DATASET_URN_KEY), "dataset3@p3");
407-
Assert.assertEquals(workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(), 0);
407+
Assert.assertEquals((long)workUnits.get(0).getLowWatermark(LongWatermark.class).getValue(), 0);
408408
watermarks3.add(workUnits.get(0).getExpectedHighWatermark(LongWatermark.class));
409409
datasets3.add(new SimpleDatasetForTesting("dataset3@p3"));
410410

gobblin-modules/gobblin-helix/src/test/java/org/apache/gobblin/runtime/StateStoreWatermarkStorageTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public void testPersistWatermarkStateToZk() throws IOException {
6868
ImmutableList.of("source"));
6969

7070
Assert.assertEquals(watermarkMap.size(), 1);
71-
Assert.assertEquals(((LongWatermark) watermarkMap.get("source").getWatermark()).getValue(), startTime);
71+
Assert.assertEquals((long)((LongWatermark) watermarkMap.get("source").getWatermark()).getValue(), startTime);
7272
}
7373

7474
@AfterClass

0 commit comments

Comments
 (0)