diff --git a/build.gradle b/build.gradle index 799d5542576..c81f6065ea0 100644 --- a/build.gradle +++ b/build.gradle @@ -91,7 +91,7 @@ ext.libraries = [ mail: 'javax.mail:mail:1.4.4', mapreduceClientCore: 'org.apache.hadoop:hadoop-mapreduce-client-core:2.3.0', mapreduceClientJobClient: 'org.apache.hadoop:hadoop-mapreduce-client-jobclient:2.3.0', - mockito: 'org.mockito:mockito-core:3.3.3', + mockito: 'org.mockito:mockito-core:3.12.4', netty: 'io.netty:netty-all:4.1.52.Final', oss: 'org.sonatype.oss:oss-parent:7', pulsarClient: "${pulsarGroup}:pulsar-client:${pulsarVersion}", diff --git a/clients/venice-pulsar/build.gradle b/clients/venice-pulsar/build.gradle index 36dedcc1d48..81a00958d5b 100644 --- a/clients/venice-pulsar/build.gradle +++ b/clients/venice-pulsar/build.gradle @@ -18,6 +18,10 @@ dependencies { implementation libraries.jacksonDatabind implementation libraries.log4j2api + implementation libraries.log4j2core + + testImplementation libraries.mockito + testImplementation libraries.testng } nar { diff --git a/clients/venice-pulsar/src/main/java/com/linkedin/venice/pulsar/sink/VeniceSink.java b/clients/venice-pulsar/src/main/java/com/linkedin/venice/pulsar/sink/VeniceSink.java index 0c846f29c2f..7702a023b2a 100644 --- a/clients/venice-pulsar/src/main/java/com/linkedin/venice/pulsar/sink/VeniceSink.java +++ b/clients/venice-pulsar/src/main/java/com/linkedin/venice/pulsar/sink/VeniceSink.java @@ -16,10 +16,11 @@ import com.linkedin.venice.utils.Utils; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.pulsar.client.api.schema.GenericObject; @@ -44,8 +45,7 @@ public class VeniceSink implements Sink { VeniceSinkConfig config; VeniceSystemProducer producer; - /** thread safe, fast access to count() **/ - private ArrayBlockingQueue> pendingFlushQueue; + private final AtomicInteger pendingRecordsCount = new AtomicInteger(0); private final ScheduledExecutorService scheduledExecutor = Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "pulsar-venice-sink-flush-thread")); @@ -58,25 +58,32 @@ public class VeniceSink implements Sink { @Override public void open(Map cfg, SinkContext sinkContext) throws Exception { - this.config = VeniceSinkConfig.load(cfg, sinkContext); - LOGGER.info("Starting, config {}", this.config); + VeniceSinkConfig veniceCfg = VeniceSinkConfig.load(cfg, sinkContext); + LOGGER.info("Starting, Venice config {}", veniceCfg); VeniceSystemFactory factory = new VeniceSystemFactory(); final String systemName = "venice"; - this.producer = - factory.getClosableProducer(systemName, new MapConfig(getConfig(this.config.getStoreName(), systemName)), null); - this.producer.start(); - String kafkaBootstrapServers = this.producer.getKafkaBootstrapServers(); + VeniceSystemProducer p = + factory.getClosableProducer(systemName, new MapConfig(getConfig(veniceCfg, systemName)), null); + p.start(); + String kafkaBootstrapServers = p.getKafkaBootstrapServers(); LOGGER.info("Kafka bootstrap for Venice producer {}", kafkaBootstrapServers); + LOGGER.info("Kafka topic name is {}", p.getTopicName()); - LOGGER.info("Kafka topic name is {}", this.producer.getTopicName()); + open(veniceCfg, p, sinkContext); + } + + /** to simplify unit testing **/ + public void open(VeniceSinkConfig config, VeniceSystemProducer startedProducer, SinkContext sinkContext) + throws Exception { + this.config = config; + this.producer = startedProducer; maxNumberUnflushedRecords = this.config.getMaxNumberUnflushedRecords(); final int capacityMutliplier = 3; int queueSize = Integer.MAX_VALUE / capacityMutliplier < maxNumberUnflushedRecords ? Integer.MAX_VALUE : maxNumberUnflushedRecords * capacityMutliplier; - pendingFlushQueue = new ArrayBlockingQueue<>(queueSize, false); flushIntervalMs = this.config.getFlushIntervalMs(); scheduledExecutor.scheduleAtFixedRate(() -> flush(false), flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS); @@ -91,9 +98,9 @@ public void write(Record record) throws Exception { } if (doThrottle) { - LOGGER.warn("Throttling, not accepting new records; {} records pending", pendingFlushQueue.size()); + LOGGER.warn("Throttling, not accepting new records; {} records pending", pendingRecordsCount.get()); - while (pendingFlushQueue.size() > maxNumberUnflushedRecords) { + while (pendingRecordsCount.get() > maxNumberUnflushedRecords) { Thread.sleep(1); } doThrottle = false; @@ -116,60 +123,60 @@ public void write(Record record) throws Exception { if (LOGGER.isDebugEnabled()) { LOGGER.debug("Processing key: {}, value: {}", key, value); } + if (value == null) { // here we are making it explicit, but "put(key, null) means DELETE in the API" producer.delete(key).whenComplete((___, error) -> { + pendingRecordsCount.decrementAndGet(); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Deleted key: {}", key); } if (error != null) { LOGGER.error("Error deleting record with key {}", key, error); + flushException = error; record.fail(); } else { - if (safePutToQueue(record)) { - maybeSubmitFlush(); - } + record.ack(); } }); } else { producer.put(key, value).whenComplete((___, error) -> { + pendingRecordsCount.decrementAndGet(); if (LOGGER.isDebugEnabled()) { LOGGER.debug("Processed key: {}, value: {}", key, value); } if (error != null) { LOGGER.error("Error handling record with key {}", key, error); + flushException = error; record.fail(); } else { - if (safePutToQueue(record)) { - maybeSubmitFlush(); - } + record.ack(); } }); } - } - - private boolean safePutToQueue(Record record) { - if (pendingFlushQueue.offer(record)) { - return true; - } - doThrottle = true; - scheduledExecutor.submit(() -> safePutToQueue(record)); - return false; + pendingRecordsCount.incrementAndGet(); + maybeSubmitFlush(); } private void maybeSubmitFlush() { - if (pendingFlushQueue.size() >= maxNumberUnflushedRecords) { + int sz = pendingRecordsCount.get(); + if (sz >= maxNumberUnflushedRecords) { scheduledExecutor.submit(() -> flush(false)); + if (sz > 2 * maxNumberUnflushedRecords) { + LOGGER.info("Too many records pending: {}. Will throttle.", sz); + } + doThrottle = true; } } // flush should happen on the same thread private void flush(boolean force) { long startTimeMillis = System.currentTimeMillis(); - int sz = pendingFlushQueue.size(); + + int sz = pendingRecordsCount.get(); if (force || sz >= maxNumberUnflushedRecords || startTimeMillis - lastFlush > flushIntervalMs) { lastFlush = System.currentTimeMillis(); if (sz == 0) { @@ -187,27 +194,10 @@ private void flush(boolean force) { } catch (Throwable t) { LOGGER.error("Error flushing", t); flushException = t; - failAllPendingRecords(); + LOGGER.error("Error while flushing records", t); throw new RuntimeException("Error while flushing records", flushException); } - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Acking the records"); - } - for (int i = 0; i < sz; i++) { - Record rec = pendingFlushQueue.poll(); - if (rec != null) { - rec.ack(); - } else { - // should not happen as long as the removal from queue happens in the single thread - RuntimeException err = - new IllegalStateException("Error while flushing records: Record is null, expected actual record"); - flushException = err; - failAllPendingRecords(); - throw err; - } - } - LOGGER.info("Flush of {} records took {} ms", sz, System.currentTimeMillis() - startTimeMillis); } else { if (LOGGER.isDebugEnabled()) { @@ -216,16 +206,6 @@ private void flush(boolean force) { } } - private void failAllPendingRecords() { - for (Record rec: pendingFlushQueue) { - if (rec != null) { - rec.fail(); - } else { - break; - } - } - } - private static Object extract(Object o) { // Pulsar GenericRecord is a wrapper over AVRO GenericRecord if (o instanceof org.apache.pulsar.client.api.schema.GenericRecord) { @@ -238,28 +218,33 @@ private static Object extract(Object o) { @Override public void close() throws Exception { if (producer != null) { - scheduledExecutor.submit(() -> flush(true)).get(); + Future f = scheduledExecutor.submit(() -> flush(true)); scheduledExecutor.shutdown(); + f.get(); + if (!scheduledExecutor.awaitTermination(1, TimeUnit.MINUTES)) { + LOGGER.error("Failed to shutdown scheduledExecutor"); + scheduledExecutor.shutdownNow(); + } producer.close(); } } - private Map getConfig(String storeName, String systemName) { + private static Map getConfig(VeniceSinkConfig veniceCfg, String systemName) { Map config = new HashMap<>(); String configPrefix = SYSTEMS_PREFIX + systemName + DOT; config.put(configPrefix + VENICE_PUSH_TYPE, Version.PushType.INCREMENTAL.toString()); - config.put(configPrefix + VENICE_STORE, storeName); + config.put(configPrefix + VENICE_STORE, veniceCfg.getStoreName()); config.put(configPrefix + VENICE_AGGREGATE, "false"); - config.put("venice.discover.urls", this.config.getVeniceDiscoveryUrl()); - config.put(VENICE_CONTROLLER_DISCOVERY_URL, this.config.getVeniceDiscoveryUrl()); - config.put(VENICE_ROUTER_URL, this.config.getVeniceRouterUrl()); + config.put("venice.discover.urls", veniceCfg.getVeniceDiscoveryUrl()); + config.put(VENICE_CONTROLLER_DISCOVERY_URL, veniceCfg.getVeniceDiscoveryUrl()); + config.put(VENICE_ROUTER_URL, veniceCfg.getVeniceRouterUrl()); config.put(DEPLOYMENT_ID, Utils.getUniqueString("venice-push-id-pulsar-sink")); config.put(SSL_ENABLED, "false"); - if (this.config.getKafkaSaslConfig() != null && !this.config.getKafkaSaslConfig().isEmpty()) { - config.put("kafka.sasl.jaas.config", this.config.getKafkaSaslConfig()); + if (veniceCfg.getKafkaSaslConfig() != null && !veniceCfg.getKafkaSaslConfig().isEmpty()) { + config.put("kafka.sasl.jaas.config", veniceCfg.getKafkaSaslConfig()); } - config.put("kafka.sasl.mechanism", this.config.getKafkaSaslMechanism()); - config.put("kafka.security.protocol", this.config.getKafkaSecurityProtocol()); + config.put("kafka.sasl.mechanism", veniceCfg.getKafkaSaslMechanism()); + config.put("kafka.security.protocol", veniceCfg.getKafkaSecurityProtocol()); LOGGER.info("CONFIG: {}", config); return config; } diff --git a/clients/venice-pulsar/src/test/java/com/linkedin/venice/pulsar/sink/VeniceSinkTest.java b/clients/venice-pulsar/src/test/java/com/linkedin/venice/pulsar/sink/VeniceSinkTest.java new file mode 100644 index 00000000000..dc4fc818dc5 --- /dev/null +++ b/clients/venice-pulsar/src/test/java/com/linkedin/venice/pulsar/sink/VeniceSinkTest.java @@ -0,0 +1,174 @@ +package com.linkedin.venice.pulsar.sink; + +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertTrue; + +import com.linkedin.venice.samza.VeniceSystemProducer; +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.pulsar.client.api.schema.GenericObject; +import org.apache.pulsar.common.schema.KeyValue; +import org.apache.pulsar.common.schema.SchemaType; +import org.apache.pulsar.functions.api.Record; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.testng.annotations.AfterTest; +import org.testng.annotations.BeforeTest; +import org.testng.annotations.Test; + + +public class VeniceSinkTest { + VeniceSinkConfig config; + VeniceSystemProducer producer; + + ScheduledExecutorService executor; + + @BeforeTest + public void setUp() { + executor = Executors.newScheduledThreadPool(20); + config = new VeniceSinkConfig(); + config.setVeniceDiscoveryUrl("http://test:5555") + .setVeniceRouterUrl("http://test:7777") + .setStoreName("t1_n1_s1") + .setKafkaSaslMechanism("PLAIN") + .setKafkaSecurityProtocol("SASL_PLAINTEXT") + .setKafkaSaslConfig(""); + + producer = Mockito.mock(VeniceSystemProducer.class); + } + + @AfterTest + public void tearDown() { + if (executor != null) { + executor.shutdownNow(); + executor = null; + } + } + + @Test + public void testVeniceSink() throws Exception { + ConcurrentLinkedQueue> futures = new ConcurrentLinkedQueue<>(); + + when(producer.put(Mockito.any(), Mockito.any())).thenAnswer((InvocationOnMock invocation) -> { + CompletableFuture future = new CompletableFuture<>(); + executor.schedule(() -> future.complete(null), ThreadLocalRandom.current().nextInt(1, 25), TimeUnit.MILLISECONDS); + + futures.add(future); + return future; + }); + + when(producer.delete(Mockito.any())).thenAnswer((InvocationOnMock invocation) -> { + CompletableFuture future = new CompletableFuture<>(); + executor.schedule(() -> future.complete(null), ThreadLocalRandom.current().nextInt(1, 25), TimeUnit.MILLISECONDS); + + futures.add(future); + return future; + }); + + doAnswer((InvocationOnMock invocation) -> { + while (true) { + CompletableFuture future = futures.poll(); + if (future == null) { + break; + } + future.complete(null); + } + return null; + }).when(producer).flush(anyString()); + + VeniceSink sink = new VeniceSink(); + sink.open(config, producer, null); + + List> records = new LinkedList<>(); + + // send a few records, enough to trigger a flush and throttle + for (int i = 0; i < 100; i++) { + Record rec = getRecord("k" + i, "v" + i); + records.add(rec); + sink.write(rec); + } + + for (int i = 0; i < 100; i++) { + Record rec = getRecord("k" + i, null); + records.add(rec); + sink.write(rec); + } + + for (Record rec: records) { + verify(rec, timeout(5000).times(1)).ack(); + } + + sink.close(); + } + + @Test + public void testVeniceSinkFlushFail() throws Exception { + ConcurrentLinkedQueue> futures = new ConcurrentLinkedQueue<>(); + + AtomicInteger count = new AtomicInteger(0); + when(producer.put(Mockito.any(), Mockito.any())).thenAnswer((InvocationOnMock invocation) -> { + CompletableFuture future = new CompletableFuture<>(); + executor.schedule(() -> { + if (count.incrementAndGet() % 10 == 0) { + future.completeExceptionally(new Exception("Injected error")); + } else { + future.complete(null); + } + }, ThreadLocalRandom.current().nextInt(1, 25), TimeUnit.MILLISECONDS); + + futures.add(future); + return future; + }); + + doAnswer((InvocationOnMock invocation) -> null).when(producer).flush(anyString()); + + VeniceSink sink = new VeniceSink(); + sink.open(config, producer, null); + + List> records = new LinkedList<>(); + + try { + for (int i = 0; i < 20; i++) { + Record rec = getRecord("k" + i, "v" + i); + records.add(rec); + sink.write(rec); + } + } catch (Exception e) { + assertTrue(e.getMessage().contains("Error while flushing records")); + assertTrue(e.getCause().getMessage().contains("Injected error")); + } + + sink.close(); + } + + private Record getRecord(String key, String value) { + Record rec = Mockito.mock(Record.class); + when(rec.getValue()).thenReturn(getGenericObject(key, value)); + return rec; + } + + private GenericObject getGenericObject(String key, String value) { + return new GenericObject() { + @Override + public SchemaType getSchemaType() { + return SchemaType.KEY_VALUE; + } + + @Override + public Object getNativeObject() { + return new KeyValue<>(key, value); + } + }; + } +} diff --git a/clients/venice-pulsar/src/test/resources/log4j2.properties b/clients/venice-pulsar/src/test/resources/log4j2.properties new file mode 100644 index 00000000000..52bc2f7edd0 --- /dev/null +++ b/clients/venice-pulsar/src/test/resources/log4j2.properties @@ -0,0 +1,22 @@ +status = error +name = PropertiesConfig + +filters = threshold + +filter.threshold.type = ThresholdFilter +filter.threshold.level = debug + +appenders = console + +appender.console.type = Console +appender.console.name = STDOUT +appender.console.layout.type = PatternLayout +appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss} %p [%c{1}] [%t] %m%n + +rootLogger.level = info +rootLogger.appenderRefs = stdout +rootLogger.appenderRef.stdout.ref = STDOUT + +# Set package org.apache.zookeeper log level to error. +logger.zk.name = org.apache.zookeeper +logger.zk.level = error \ No newline at end of file