Skip to content

Commit

Permalink
DBZ-2897 Make Offsets interface non-static, refactor test for updatin…
Browse files Browse the repository at this point in the history
…g offsets
  • Loading branch information
twthorn authored and gunnarmorling committed Feb 17, 2021
1 parent dcb6b04 commit 72f8c06
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ public static interface RecordCommitter<R> {
* Contract that should be passed to {@link RecordCommitter#markProcessed(Object, Offsets)} for marking a record
* as processed with updated offsets.
*/
public static interface Offsets {
public interface Offsets {

/**
* Associates a key with a specific value, overwrites the value if the key is already present.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@

import static org.fest.assertions.Assertions.assertThat;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
Expand All @@ -21,6 +19,7 @@
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
Expand All @@ -31,16 +30,20 @@
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.file.FileStreamSourceConnector;
import org.apache.kafka.connect.json.JsonDeserializer;
import org.apache.kafka.connect.runtime.WorkerConfig;
import org.apache.kafka.connect.runtime.standalone.StandaloneConfig;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.storage.OffsetBackingStore;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.util.Callback;
import org.apache.kafka.connect.util.SafeObjectInputStream;
import org.fest.assertions.Assertions;
import org.junit.Before;
import org.junit.Test;

import com.fasterxml.jackson.databind.JsonNode;

import io.debezium.DebeziumException;
import io.debezium.config.Configuration;
import io.debezium.connector.simple.SimpleSourceConnector;
Expand Down Expand Up @@ -336,15 +339,17 @@ public void shouldRunEngineWithConsumerSettingOffsets() throws Exception {
// Add initial content to the file ...
appendLinesToSource(NUMBER_OF_LINES);

String TEST_TOPIC = "topicX";
String CUSTOM_SOURCE_OFFSET_PARTITION = "test_topic_partition1";
Long EXPECTED_CUSTOM_OFFSET = 1L;

final Properties props = new Properties();
props.setProperty("name", "debezium-engine");
props.setProperty("connector.class", "org.apache.kafka.connect.file.FileStreamSourceConnector");
props.setProperty(StandaloneConfig.OFFSET_STORAGE_FILE_FILENAME_CONFIG, OFFSET_STORE_PATH.toAbsolutePath().toString());
props.setProperty("offset.flush.interval.ms", "0");
props.setProperty("file", TEST_FILE_PATH.toAbsolutePath().toString());
props.setProperty("topic", "topicX");

String CUSTOM_SOURCE_OFFSET_PARTITION = "test_topic_partition1";
props.setProperty("topic", TEST_TOPIC);

CountDownLatch firstLatch = new CountDownLatch(1);
CountDownLatch allLatch = new CountDownLatch(6);
Expand All @@ -358,7 +363,7 @@ public void shouldRunEngineWithConsumerSettingOffsets() throws Exception {

for (RecordChangeEvent<SourceRecord> r : records) {
DebeziumEngine.Offsets offsets = committer.buildOffsets();
offsets.set(CUSTOM_SOURCE_OFFSET_PARTITION, 1L);
offsets.set(CUSTOM_SOURCE_OFFSET_PARTITION, EXPECTED_CUSTOM_OFFSET);
logger.info(r.record().sourceOffset().toString());
committer.markProcessed(r, offsets);
}
Expand Down Expand Up @@ -389,17 +394,17 @@ public void shouldRunEngineWithConsumerSettingOffsets() throws Exception {
allLatch.await(5000, TimeUnit.MILLISECONDS);
assertThat(allLatch.getCount()).isEqualTo(0);

boolean containsCustomPartition = false;
try (BufferedReader br = new BufferedReader(new FileReader(OFFSET_STORE_PATH.toString()))) {
String line;
while ((line = br.readLine()) != null) {
logger.info(line);
if (line.contains(CUSTOM_SOURCE_OFFSET_PARTITION)) {
containsCustomPartition = true;
}
}
}
assertThat(containsCustomPartition).isTrue();
SafeObjectInputStream inputStream = new SafeObjectInputStream(java.nio.file.Files.newInputStream(OFFSET_STORE_PATH.toAbsolutePath()));
Object obj = inputStream.readObject();
Map<byte[], byte[]> raw = (Map) obj;
Set<Map.Entry<byte[], byte[]>> fileOffsetStoreEntrySingleton = raw.entrySet();
assertThat(fileOffsetStoreEntrySingleton.size()).isEqualTo(1);
Map.Entry<byte[], byte[]> fileOffsetEntry = fileOffsetStoreEntrySingleton.iterator().next();
ByteBuffer offsetJsonString = fileOffsetEntry.getValue() != null ? ByteBuffer.wrap(fileOffsetEntry.getValue()) : null;
JsonDeserializer jsonDeserializer = new JsonDeserializer();
JsonNode partitionToOffsetMap = jsonDeserializer.deserialize(TEST_TOPIC, offsetJsonString.array());
Long actualOffset = partitionToOffsetMap.get(CUSTOM_SOURCE_OFFSET_PARTITION).asLong();
assertThat(actualOffset).isEqualTo(EXPECTED_CUSTOM_OFFSET);

// Stop the connector ...
stopConnector();
Expand Down

0 comments on commit 72f8c06

Please sign in to comment.