Skip to content

Commit

Permalink
Simplified sink flush, added unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
dlg99 committed May 6, 2023
1 parent 3d2649a commit 151ebd6
Show file tree
Hide file tree
Showing 5 changed files with 254 additions and 69 deletions.
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ ext.libraries = [
mail: 'javax.mail:mail:1.4.4',
mapreduceClientCore: 'org.apache.hadoop:hadoop-mapreduce-client-core:2.3.0',
mapreduceClientJobClient: 'org.apache.hadoop:hadoop-mapreduce-client-jobclient:2.3.0',
mockito: 'org.mockito:mockito-core:3.3.3',
mockito: 'org.mockito:mockito-core:3.12.4',
netty: 'io.netty:netty-all:4.1.52.Final',
oss: 'org.sonatype.oss:oss-parent:7',
pulsarClient: "${pulsarGroup}:pulsar-client:${pulsarVersion}",
Expand Down
4 changes: 4 additions & 0 deletions clients/venice-pulsar/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ dependencies {
implementation libraries.jacksonDatabind

implementation libraries.log4j2api
implementation libraries.log4j2core

testImplementation libraries.mockito
testImplementation libraries.testng
}

nar {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
import com.linkedin.venice.utils.Utils;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.pulsar.client.api.schema.GenericObject;
Expand All @@ -44,8 +45,7 @@ public class VeniceSink implements Sink<GenericObject> {
VeniceSinkConfig config;
VeniceSystemProducer producer;

/** thread safe, fast access to count() **/
private ArrayBlockingQueue<Record<GenericObject>> pendingFlushQueue;
private final AtomicInteger pendingRecordsCount = new AtomicInteger(0);
private final ScheduledExecutorService scheduledExecutor =
Executors.newSingleThreadScheduledExecutor(r -> new Thread(r, "pulsar-venice-sink-flush-thread"));

Expand All @@ -58,25 +58,32 @@ public class VeniceSink implements Sink<GenericObject> {

@Override
public void open(Map<String, Object> cfg, SinkContext sinkContext) throws Exception {
this.config = VeniceSinkConfig.load(cfg, sinkContext);
LOGGER.info("Starting, config {}", this.config);
VeniceSinkConfig veniceCfg = VeniceSinkConfig.load(cfg, sinkContext);
LOGGER.info("Starting, Venice config {}", veniceCfg);

VeniceSystemFactory factory = new VeniceSystemFactory();
final String systemName = "venice";
this.producer =
factory.getClosableProducer(systemName, new MapConfig(getConfig(this.config.getStoreName(), systemName)), null);
this.producer.start();
String kafkaBootstrapServers = this.producer.getKafkaBootstrapServers();
VeniceSystemProducer p =
factory.getClosableProducer(systemName, new MapConfig(getConfig(veniceCfg, systemName)), null);
p.start();
String kafkaBootstrapServers = p.getKafkaBootstrapServers();
LOGGER.info("Kafka bootstrap for Venice producer {}", kafkaBootstrapServers);
LOGGER.info("Kafka topic name is {}", p.getTopicName());

LOGGER.info("Kafka topic name is {}", this.producer.getTopicName());
open(veniceCfg, p, sinkContext);
}

/** to simplify unit testing **/
public void open(VeniceSinkConfig config, VeniceSystemProducer startedProducer, SinkContext sinkContext)
throws Exception {
this.config = config;
this.producer = startedProducer;

maxNumberUnflushedRecords = this.config.getMaxNumberUnflushedRecords();
final int capacityMutliplier = 3;
int queueSize = Integer.MAX_VALUE / capacityMutliplier < maxNumberUnflushedRecords
? Integer.MAX_VALUE
: maxNumberUnflushedRecords * capacityMutliplier;
pendingFlushQueue = new ArrayBlockingQueue<>(queueSize, false);
flushIntervalMs = this.config.getFlushIntervalMs();

scheduledExecutor.scheduleAtFixedRate(() -> flush(false), flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS);
Expand All @@ -91,9 +98,9 @@ public void write(Record<GenericObject> record) throws Exception {
}

if (doThrottle) {
LOGGER.warn("Throttling, not accepting new records; {} records pending", pendingFlushQueue.size());
LOGGER.warn("Throttling, not accepting new records; {} records pending", pendingRecordsCount.get());

while (pendingFlushQueue.size() > maxNumberUnflushedRecords) {
while (pendingRecordsCount.get() > maxNumberUnflushedRecords) {
Thread.sleep(1);
}
doThrottle = false;
Expand All @@ -116,60 +123,60 @@ public void write(Record<GenericObject> record) throws Exception {
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Processing key: {}, value: {}", key, value);
}

if (value == null) {
// here we are making it explicit, but "put(key, null) means DELETE in the API"
producer.delete(key).whenComplete((___, error) -> {
pendingRecordsCount.decrementAndGet();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Deleted key: {}", key);
}

if (error != null) {
LOGGER.error("Error deleting record with key {}", key, error);
flushException = error;
record.fail();
} else {
if (safePutToQueue(record)) {
maybeSubmitFlush();
}
record.ack();
}
});
} else {
producer.put(key, value).whenComplete((___, error) -> {
pendingRecordsCount.decrementAndGet();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Processed key: {}, value: {}", key, value);
}

if (error != null) {
LOGGER.error("Error handling record with key {}", key, error);
flushException = error;
record.fail();
} else {
if (safePutToQueue(record)) {
maybeSubmitFlush();
}
record.ack();
}
});
}
}

private boolean safePutToQueue(Record<GenericObject> record) {
if (pendingFlushQueue.offer(record)) {
return true;
}

doThrottle = true;
scheduledExecutor.submit(() -> safePutToQueue(record));
return false;
pendingRecordsCount.incrementAndGet();
maybeSubmitFlush();
}

private void maybeSubmitFlush() {
if (pendingFlushQueue.size() >= maxNumberUnflushedRecords) {
int sz = pendingRecordsCount.get();
if (sz >= maxNumberUnflushedRecords) {
scheduledExecutor.submit(() -> flush(false));
if (sz > 2 * maxNumberUnflushedRecords) {
LOGGER.info("Too many records pending: {}. Will throttle.", sz);
}
doThrottle = true;
}
}

// flush should happen on the same thread
private void flush(boolean force) {
long startTimeMillis = System.currentTimeMillis();
int sz = pendingFlushQueue.size();

int sz = pendingRecordsCount.get();
if (force || sz >= maxNumberUnflushedRecords || startTimeMillis - lastFlush > flushIntervalMs) {
lastFlush = System.currentTimeMillis();
if (sz == 0) {
Expand All @@ -187,27 +194,10 @@ private void flush(boolean force) {
} catch (Throwable t) {
LOGGER.error("Error flushing", t);
flushException = t;
failAllPendingRecords();
LOGGER.error("Error while flushing records", t);
throw new RuntimeException("Error while flushing records", flushException);
}

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Acking the records");
}
for (int i = 0; i < sz; i++) {
Record<GenericObject> rec = pendingFlushQueue.poll();
if (rec != null) {
rec.ack();
} else {
// should not happen as long as the removal from queue happens in the single thread
RuntimeException err =
new IllegalStateException("Error while flushing records: Record is null, expected actual record");
flushException = err;
failAllPendingRecords();
throw err;
}
}

LOGGER.info("Flush of {} records took {} ms", sz, System.currentTimeMillis() - startTimeMillis);
} else {
if (LOGGER.isDebugEnabled()) {
Expand All @@ -216,16 +206,6 @@ private void flush(boolean force) {
}
}

private void failAllPendingRecords() {
for (Record<GenericObject> rec: pendingFlushQueue) {
if (rec != null) {
rec.fail();
} else {
break;
}
}
}

private static Object extract(Object o) {
// Pulsar GenericRecord is a wrapper over AVRO GenericRecord
if (o instanceof org.apache.pulsar.client.api.schema.GenericRecord) {
Expand All @@ -238,28 +218,33 @@ private static Object extract(Object o) {
@Override
public void close() throws Exception {
if (producer != null) {
scheduledExecutor.submit(() -> flush(true)).get();
Future<?> f = scheduledExecutor.submit(() -> flush(true));
scheduledExecutor.shutdown();
f.get();
if (!scheduledExecutor.awaitTermination(1, TimeUnit.MINUTES)) {
LOGGER.error("Failed to shutdown scheduledExecutor");
scheduledExecutor.shutdownNow();
}
producer.close();
}
}

private Map<String, String> getConfig(String storeName, String systemName) {
private static Map<String, String> getConfig(VeniceSinkConfig veniceCfg, String systemName) {
Map<String, String> config = new HashMap<>();
String configPrefix = SYSTEMS_PREFIX + systemName + DOT;
config.put(configPrefix + VENICE_PUSH_TYPE, Version.PushType.INCREMENTAL.toString());
config.put(configPrefix + VENICE_STORE, storeName);
config.put(configPrefix + VENICE_STORE, veniceCfg.getStoreName());
config.put(configPrefix + VENICE_AGGREGATE, "false");
config.put("venice.discover.urls", this.config.getVeniceDiscoveryUrl());
config.put(VENICE_CONTROLLER_DISCOVERY_URL, this.config.getVeniceDiscoveryUrl());
config.put(VENICE_ROUTER_URL, this.config.getVeniceRouterUrl());
config.put("venice.discover.urls", veniceCfg.getVeniceDiscoveryUrl());
config.put(VENICE_CONTROLLER_DISCOVERY_URL, veniceCfg.getVeniceDiscoveryUrl());
config.put(VENICE_ROUTER_URL, veniceCfg.getVeniceRouterUrl());
config.put(DEPLOYMENT_ID, Utils.getUniqueString("venice-push-id-pulsar-sink"));
config.put(SSL_ENABLED, "false");
if (this.config.getKafkaSaslConfig() != null && !this.config.getKafkaSaslConfig().isEmpty()) {
config.put("kafka.sasl.jaas.config", this.config.getKafkaSaslConfig());
if (veniceCfg.getKafkaSaslConfig() != null && !veniceCfg.getKafkaSaslConfig().isEmpty()) {
config.put("kafka.sasl.jaas.config", veniceCfg.getKafkaSaslConfig());
}
config.put("kafka.sasl.mechanism", this.config.getKafkaSaslMechanism());
config.put("kafka.security.protocol", this.config.getKafkaSecurityProtocol());
config.put("kafka.sasl.mechanism", veniceCfg.getKafkaSaslMechanism());
config.put("kafka.security.protocol", veniceCfg.getKafkaSecurityProtocol());
LOGGER.info("CONFIG: {}", config);
return config;
}
Expand Down
Loading

0 comments on commit 151ebd6

Please sign in to comment.