From fa1486637b168b6922f2c8cf215c19585bc805c0 Mon Sep 17 00:00:00 2001 From: Jeff Xiang Date: Tue, 12 Nov 2024 18:23:08 -0500 Subject: [PATCH] Make LeadershipWatcher pluggable and create tests --- .../config/my_test_kafka_cluster.properties | 2 +- .../uploader/DirectoryTreeWatcher.java | 46 ++-- .../uploader/KafkaSegmentUploader.java | 26 ++- .../SegmentUploaderConfiguration.java | 47 +++- .../uploader/UploaderMetrics.java | 3 +- .../leadership/KafkaLeadershipWatcher.java | 204 ------------------ .../leadership/LeadershipWatcher.java | 115 ++++++++++ .../ZookeeperLeadershipWatcher.java | 133 ++++++++++++ .../tieredstorage/uploader/TestBase.java | 93 +++++++- .../uploader/TestDirectoryTreeWatcher.java | 62 ++---- .../TestDirectoryTreeWatcherMultiBroker.java | 65 ++---- .../uploader/TestKafkaSegmentUploader.java | 49 +---- .../TestMultiThreadedS3FileUploader.java | 2 +- .../leadership/TestLeadershipWatcher.java | 122 +++++++++++ .../TestZookeeperLeadershipWatcher.java | 156 ++++++++++++++ .../test/resources/test-cluster-2.properties | 3 +- .../test-cluster-all-exclude.properties | 3 +- .../test-cluster-all-include.properties | 3 +- .../test-cluster-specific-exclude.properties | 3 +- .../test-cluster-specific-include.properties | 3 +- .../test/resources/test-cluster.properties | 4 +- 21 files changed, 753 insertions(+), 391 deletions(-) delete mode 100644 ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/leadership/KafkaLeadershipWatcher.java create mode 100644 ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/leadership/LeadershipWatcher.java create mode 100644 ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/leadership/ZookeeperLeadershipWatcher.java create mode 100644 ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/leadership/TestLeadershipWatcher.java create mode 100644 ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/leadership/TestZookeeperLeadershipWatcher.java diff --git a/ts-examples/quickstart-scripts/config/my_test_kafka_cluster.properties b/ts-examples/quickstart-scripts/config/my_test_kafka_cluster.properties index db03e9d..8d3d9cd 100644 --- a/ts-examples/quickstart-scripts/config/my_test_kafka_cluster.properties +++ b/ts-examples/quickstart-scripts/config/my_test_kafka_cluster.properties @@ -7,7 +7,7 @@ ts.segment.uploader.upload.timeout.ms=60000 ts.segment.uploader.upload.thread.count=3 ts.segment.uploader.upload.max.retries=10 -ts.segment.uploader.zk.watcher.poll.interval.seconds=60 +ts.segment.uploader.leadership.watcher.poll.interval.seconds=60 storage.service.endpoint.provider.class=com.pinterest.kafka.tieredstorage.common.discovery.s3.ExampleS3StorageServiceEndpointProvider metrics.reporter.class=com.pinterest.kafka.tieredstorage.common.metrics.NoOpMetricsReporter \ No newline at end of file diff --git a/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/DirectoryTreeWatcher.java b/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/DirectoryTreeWatcher.java index d170b6a..af2e5b5 100644 --- a/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/DirectoryTreeWatcher.java +++ b/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/DirectoryTreeWatcher.java @@ -2,7 +2,7 @@ import com.google.common.annotations.VisibleForTesting; import com.pinterest.kafka.tieredstorage.common.metrics.MetricRegistryManager; -import com.pinterest.kafka.tieredstorage.uploader.leadership.KafkaLeadershipWatcher; +import com.pinterest.kafka.tieredstorage.uploader.leadership.LeadershipWatcher; import org.apache.kafka.common.TopicPartition; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -11,17 +11,14 @@ import java.io.File; import java.io.IOException; import java.nio.file.FileSystems; -import java.nio.file.FileVisitResult; import java.nio.file.Files; import java.nio.file.NoSuchFileException; import java.nio.file.Path; import java.nio.file.Paths; -import java.nio.file.SimpleFileVisitor; import java.nio.file.StandardWatchEventKinds; import java.nio.file.WatchEvent; import java.nio.file.WatchKey; import java.nio.file.WatchService; -import java.nio.file.attribute.BasicFileAttributes; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; @@ -56,18 +53,16 @@ public class DirectoryTreeWatcher implements Runnable { private static final Logger LOG = LogManager.getLogger(DirectoryTreeWatcher.class); private static final String[] MONITORED_EXTENSIONS = {".timeindex", ".index", ".log"}; private static final Pattern MONITORED_FILE_PATTERN = Pattern.compile("^\\d+(" + String.join("|", MONITORED_EXTENSIONS) + ")$"); - private final Path topLevelPath; - private final WatchService watchService; - private Thread thread; - private boolean cancelled = false; private static Map activeSegment; private static Map> segmentsQueue; + private static LeadershipWatcher leadershipWatcher; + private final Path topLevelPath; + private final WatchService watchService; private final ConcurrentLinkedQueue uploadTasks = new ConcurrentLinkedQueue<>(); private final S3FileUploader s3FileUploader; private final ThreadLocal tempFileGenerator = ThreadLocal.withInitial(WatermarkFileHandler::new); private final ConcurrentHashMap latestUploadedOffset = new ConcurrentHashMap<>(); private final S3FileDownloader s3FileDownloader; - private static KafkaLeadershipWatcher kafkaLeadershipWatcher; private final Pattern SKIP_TOPICS_PATTERN = Pattern.compile( "^__consumer_offsets$|^__transaction_state$|.+\\.changlog$|.+\\.repartition$" ); @@ -78,28 +73,23 @@ public class DirectoryTreeWatcher implements Runnable { private final SegmentUploaderConfiguration config; private final KafkaEnvironmentProvider environmentProvider; private final Object watchKeyMapLock = new Object(); + private Thread thread; + private boolean cancelled = false; - public static void setKafkaLeadershipWatcher(DirectoryTreeWatcher directoryTreeWatcher, SegmentUploaderConfiguration config, KafkaEnvironmentProvider environmentProvider) { - if (kafkaLeadershipWatcher == null) { - try { - kafkaLeadershipWatcher = new KafkaLeadershipWatcher(directoryTreeWatcher, config, environmentProvider); - } catch (IOException | InterruptedException e) { - LOG.error("Could not launch Kafka Leadership Watcher; quitting ..."); - throw new RuntimeException(e); - } - } + public static void setLeadershipWatcher(LeadershipWatcher suppliedLeadershipWatcher) { + if (leadershipWatcher == null) + leadershipWatcher = suppliedLeadershipWatcher; } @VisibleForTesting - protected static void unsetKafkaLeadershipWatcher() { - kafkaLeadershipWatcher = null; + protected static void unsetLeadershipWatcher() { + leadershipWatcher = null; } - public DirectoryTreeWatcher(S3FileUploader s3FileUploader, SegmentUploaderConfiguration config, KafkaEnvironmentProvider environmentProvider) throws IOException, InterruptedException, KeeperException { + public DirectoryTreeWatcher(S3FileUploader s3FileUploader, SegmentUploaderConfiguration config, KafkaEnvironmentProvider environmentProvider) throws Exception { this.environmentProvider = environmentProvider; this.topLevelPath = Paths.get(environmentProvider.logDir()); this.watchService = FileSystems.getDefault().newWatchService(); - setKafkaLeadershipWatcher(this, config, environmentProvider); activeSegment = new HashMap<>(); segmentsQueue = new HashMap<>(); this.s3FileUploader = s3FileUploader; @@ -107,7 +97,6 @@ public DirectoryTreeWatcher(S3FileUploader s3FileUploader, SegmentUploaderConfig this.s3FileDownloader = new S3FileDownloader(s3FileUploader.getStorageServiceEndpointProvider(), config); heartbeat = new Heartbeat("watcher.logs", config, environmentProvider); this.config = config; - initialize(); } /** @@ -116,7 +105,10 @@ public DirectoryTreeWatcher(S3FileUploader s3FileUploader, SegmentUploaderConfig * @throws InterruptedException * @throws KeeperException */ - private void initialize() throws IOException, InterruptedException, KeeperException { + public void initialize() throws Exception { + if (leadershipWatcher == null) { + throw new IllegalStateException("LeadershipWatcher must be set before initializing DirectoryTreeWatcher"); + } s3UploadHandler.submit(() -> { while (!cancelled) { if (uploadTasks.isEmpty()) { @@ -145,8 +137,8 @@ private void initialize() throws IOException, InterruptedException, KeeperExcept s3FileUploader.uploadFile(task, this::handleUploadCallback); } }); - LOG.info("Initializing KafkaLeadershipWatcher"); - kafkaLeadershipWatcher.start(); + LOG.info("Initializing LeadershipWatcher"); + leadershipWatcher.start(); LOG.info("Submitting s3UploadHandler loop"); } @@ -702,7 +694,7 @@ public void stop() throws InterruptedException { if (thread != null && thread.isAlive()) { thread.interrupt(); } - kafkaLeadershipWatcher.stop(); + leadershipWatcher.stop(); heartbeat.stop(); } diff --git a/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/KafkaSegmentUploader.java b/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/KafkaSegmentUploader.java index bd100ff..7c0d1f0 100644 --- a/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/KafkaSegmentUploader.java +++ b/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/KafkaSegmentUploader.java @@ -2,6 +2,7 @@ import com.google.common.annotations.VisibleForTesting; import com.pinterest.kafka.tieredstorage.common.discovery.StorageServiceEndpointProvider; +import com.pinterest.kafka.tieredstorage.uploader.leadership.LeadershipWatcher; import org.apache.commons.configuration2.ex.ConfigurationException; import org.apache.log4j.LogManager; import org.apache.log4j.Logger; @@ -21,7 +22,7 @@ public class KafkaSegmentUploader { private final StorageServiceEndpointProvider endpointProvider; private final SegmentUploaderConfiguration config; - public KafkaSegmentUploader(String configDirectory) throws IOException, InterruptedException, KeeperException, ClassNotFoundException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException { + public KafkaSegmentUploader(String configDirectory) throws Exception { Utils.acquireLock(); KafkaEnvironmentProvider environmentProvider = getEnvironmentProvider(); environmentProvider.load(); @@ -33,10 +34,15 @@ public KafkaSegmentUploader(String configDirectory) throws IOException, Interrup multiThreadedS3FileUploader = new MultiThreadedS3FileUploader(endpointProvider, config, environmentProvider); directoryTreeWatcher = new DirectoryTreeWatcher(multiThreadedS3FileUploader, config, environmentProvider); + + LeadershipWatcher leadershipWatcher = getLeadershipWatcherFromConfigs(directoryTreeWatcher, config, environmentProvider); + DirectoryTreeWatcher.setLeadershipWatcher(leadershipWatcher); + + directoryTreeWatcher.initialize(); } @VisibleForTesting - protected KafkaSegmentUploader(String configDirectory, KafkaEnvironmentProvider environmentProvider) throws IOException, ClassNotFoundException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException, InterruptedException, KeeperException { + protected KafkaSegmentUploader(String configDirectory, KafkaEnvironmentProvider environmentProvider) throws Exception { Utils.acquireLock(); environmentProvider.load(); config = new SegmentUploaderConfiguration(configDirectory, environmentProvider.clusterId()); @@ -46,6 +52,11 @@ protected KafkaSegmentUploader(String configDirectory, KafkaEnvironmentProvider multiThreadedS3FileUploader = new MultiThreadedS3FileUploader(endpointProvider, config, environmentProvider); directoryTreeWatcher = new DirectoryTreeWatcher(multiThreadedS3FileUploader, config, environmentProvider); + + LeadershipWatcher leadershipWatcher = getLeadershipWatcherFromConfigs(directoryTreeWatcher, config, environmentProvider); + DirectoryTreeWatcher.setLeadershipWatcher(leadershipWatcher); + + directoryTreeWatcher.initialize(); } public void start() { @@ -69,6 +80,13 @@ private KafkaEnvironmentProvider getEnvironmentProvider() throws ClassNotFoundEx return environmentProviderConstructor.newInstance(); } + private static LeadershipWatcher getLeadershipWatcherFromConfigs(DirectoryTreeWatcher directoryTreeWatcher, SegmentUploaderConfiguration config, KafkaEnvironmentProvider kafkaEnvironmentProvider) throws InvocationTargetException, InstantiationException, IllegalAccessException, ClassNotFoundException, NoSuchMethodException { + String leadershipWatcherClassName = config.getLeadershipWatcherClassName(); + Constructor leadershipWatcherConstructor = Class.forName(leadershipWatcherClassName) + .asSubclass(LeadershipWatcher.class).getConstructor(DirectoryTreeWatcher.class, SegmentUploaderConfiguration.class, KafkaEnvironmentProvider.class); + return leadershipWatcherConstructor.newInstance(directoryTreeWatcher, config, kafkaEnvironmentProvider); + } + @VisibleForTesting protected StorageServiceEndpointProvider getEndpointProvider() { return endpointProvider; @@ -79,13 +97,13 @@ protected SegmentUploaderConfiguration getSegmentUploaderConfiguration() { return config; } - private StorageServiceEndpointProvider getEndpointProviderFromConfigs(SegmentUploaderConfiguration config) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException { + private static StorageServiceEndpointProvider getEndpointProviderFromConfigs(SegmentUploaderConfiguration config) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException { Constructor endpointProviderConstructor = Class.forName(config.getStorageServiceEndpointProviderClassName()) .asSubclass(StorageServiceEndpointProvider.class).getConstructor(); return endpointProviderConstructor.newInstance(); } - public static void main(String[] args) throws IOException, InterruptedException, KeeperException, ConfigurationException, ClassNotFoundException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException { + public static void main(String[] args) throws Exception { if (args.length != 1) { LOG.error("configDirectory is required as an argument"); System.exit(1); diff --git a/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/SegmentUploaderConfiguration.java b/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/SegmentUploaderConfiguration.java index 33f200e..7846c69 100644 --- a/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/SegmentUploaderConfiguration.java +++ b/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/SegmentUploaderConfiguration.java @@ -22,22 +22,38 @@ public class SegmentUploaderConfiguration { private static final Logger LOG = LogManager.getLogger(SegmentUploaderConfiguration.class); private static final String TS_SEGMENT_UPLOADER_PREFIX = "ts.segment.uploader"; private static final String KAFKA_PREFIX = TS_SEGMENT_UPLOADER_PREFIX + "." + "kafka"; + + // Topic inclusion / exclusion private static final String TOPICS_INCLUDE_PREFIX = KAFKA_PREFIX + "." + "topics.include"; private static final String TOPICS_EXCLUDE_PREFIX = KAFKA_PREFIX + "." + "topics.exclude"; + + // Storage service endpoint provider private static final String STORAGE_SERVICE_ENDPOINT_PROVIDER_PREFIX = "storage.service.endpoint.provider"; private static final String STORAGE_SERVICE_ENDPOINT_PROVIDER_CLASS_KEY = STORAGE_SERVICE_ENDPOINT_PROVIDER_PREFIX + "." + "class"; + + // Offset reset strategy private static final String OFFSET_RESET_STRATEGY_KEY = TS_SEGMENT_UPLOADER_PREFIX + "." + "offset.reset.strategy"; + + // Upload configurations private static final String UPLOADER_THREAD_COUNT_KEY = TS_SEGMENT_UPLOADER_PREFIX + "." + "upload.thread.count"; - private static final String ZK_WATCHER_POLL_INTERVAL_SECONDS = TS_SEGMENT_UPLOADER_PREFIX + "." + "zk.watcher.poll.interval.seconds"; - private static final String S3_PREFIX_ENTROPY_BITS = TS_SEGMENT_UPLOADER_PREFIX + "." + "s3.prefix.entropy.bits"; private static final String UPLOAD_TIMEOUT_MS = TS_SEGMENT_UPLOADER_PREFIX + "." + "upload.timeout.ms"; private static final String UPLOAD_MAX_RETRIES = TS_SEGMENT_UPLOADER_PREFIX + "." + "upload.max.retries"; + + // Leadership watcher + private static final String LEADERSHIP_WATCHER_CLASS_KEY = TS_SEGMENT_UPLOADER_PREFIX + "." + "leadership.watcher.class"; + private static final String LEADERSHIP_WATCHER_POLL_INTERVAL_SECONDS = TS_SEGMENT_UPLOADER_PREFIX + "." + "leadership.watcher.poll.interval.seconds"; + + // Prefix entropy + private static final String S3_PREFIX_ENTROPY_BITS = TS_SEGMENT_UPLOADER_PREFIX + "." + "s3.prefix.entropy.bits"; + + // Internal structures private final Properties properties = new Properties(); private final Set includeRegexes = ConcurrentHashMap.newKeySet(); private final Set excludeRegexes = ConcurrentHashMap.newKeySet(); private final Set includeTopicsCache = ConcurrentHashMap.newKeySet(); private final Set excludeTopicsCache = ConcurrentHashMap.newKeySet(); private final String storageServiceEndpointProviderClassName; + private final String leadershipWatcherClassName; private final MetricsConfiguration metricsConfiguration; public SegmentUploaderConfiguration(String configDirectory, String clusterId) throws IOException { @@ -50,11 +66,12 @@ public SegmentUploaderConfiguration(String configDirectory, String clusterId) th loadPatterns(includeRegexes, TOPICS_INCLUDE_PREFIX); loadPatterns(excludeRegexes, TOPICS_EXCLUDE_PREFIX); - if (!properties.containsKey(STORAGE_SERVICE_ENDPOINT_PROVIDER_CLASS_KEY)) { - throw new RuntimeException(String.format("Configuration %s must be provided", STORAGE_SERVICE_ENDPOINT_PROVIDER_CLASS_KEY)); - } + checkConfigExists(properties, STORAGE_SERVICE_ENDPOINT_PROVIDER_CLASS_KEY); storageServiceEndpointProviderClassName = properties.getProperty(STORAGE_SERVICE_ENDPOINT_PROVIDER_CLASS_KEY); + checkConfigExists(properties, LEADERSHIP_WATCHER_CLASS_KEY); + leadershipWatcherClassName = properties.getProperty(LEADERSHIP_WATCHER_CLASS_KEY); + metricsConfiguration = MetricsConfiguration.getMetricsConfiguration(properties); LOG.info(String.format("Loaded SegmentUploaderConfiguration from file: %s", filename)); @@ -66,6 +83,12 @@ public SegmentUploaderConfiguration(String configDirectory, String clusterId) th } } + private static void checkConfigExists(Properties properties, String key) { + if (!properties.containsKey(key)) { + throw new RuntimeException(String.format("Configuration %s must be provided", key)); + } + } + public boolean deleteTopic(String topicName) { return excludeTopicsCache.remove(topicName) || includeTopicsCache.remove(topicName); } @@ -127,11 +150,15 @@ public int getUploadThreadCount() { return Defaults.DEFAULT_UPLOADER_THREAD_POOL_SIZE; } - public int getZkWatcherPollIntervalSeconds() { - if (properties.containsKey(ZK_WATCHER_POLL_INTERVAL_SECONDS)) { - return Integer.parseInt(properties.getProperty(ZK_WATCHER_POLL_INTERVAL_SECONDS)); + public String getLeadershipWatcherClassName() { + return this.leadershipWatcherClassName; + } + + public int getLeadershipWatcherPollIntervalSeconds() { + if (properties.containsKey(LEADERSHIP_WATCHER_POLL_INTERVAL_SECONDS)) { + return Integer.parseInt(properties.getProperty(LEADERSHIP_WATCHER_POLL_INTERVAL_SECONDS)); } - return Defaults.DEFAULT_ZK_WATCHER_POLL_INTERVAL_SECONDS; + return Defaults.DEFAULT_LEADERSHIP_WATCHER_POLL_INTERVAL_SECONDS; } public OffsetResetStrategy getOffsetResetStrategy() { @@ -170,7 +197,7 @@ public enum OffsetResetStrategy { public static class Defaults { private static final String DEFAULT_OFFSET_RESET_STRATEGY = "EARLIEST"; private static final int DEFAULT_UPLOADER_THREAD_POOL_SIZE = 3; - private static final int DEFAULT_ZK_WATCHER_POLL_INTERVAL_SECONDS = 60; + private static final int DEFAULT_LEADERSHIP_WATCHER_POLL_INTERVAL_SECONDS = 60; private static final int DEFAULT_S3_PREFIX_ENTROPY_BITS = -1; private static final int DEFAULT_UPLOAD_TIMEOUT_MS = 60000; private static final int DEFAULT_UPLOAD_MAX_RETRIES = 3; diff --git a/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/UploaderMetrics.java b/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/UploaderMetrics.java index 13aca8c..e22f376 100644 --- a/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/UploaderMetrics.java +++ b/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/UploaderMetrics.java @@ -29,10 +29,9 @@ public class UploaderMetrics { public static final String KAFKA_LEADER_UNSET_METRIC = UPLOADER_METRIC_PREFIX + "." + "kafka.leader.unset"; public static final String KAFKA_LEADER_COUNT_METRIC = UPLOADER_METRIC_PREFIX + "." + "kafka.leader.count"; public static final String WATCHER_ZK_RESET_METRIC = UPLOADER_METRIC_PREFIX + "." + "watcher.zk.reset"; - public static final String WATCHER_NOT_ADDED_METRIC = UPLOADER_METRIC_PREFIX + "." + "watcher.not.added"; public static final String SKIPPED_ENTRY_MODIFY_EVENT_METRIC = UPLOADER_METRIC_PREFIX + "." + "skipped.entry.modify"; public static final String ADD_WATCHER_FAILED_METRIC = UPLOADER_METRIC_PREFIX + "." + "add.watcher.failed"; public static final String ENQUEUE_TO_UPLOAD_LATENCY_MS_METRIC = UPLOADER_METRIC_PREFIX + "." + "enqueue.to.upload.latency.ms"; public static final String RETRY_MARKED_FOR_DELETION_COUNT_METRIC = UPLOADER_METRIC_PREFIX + "." + "retry.marked.for.deletion.count"; - public static final String WATCHER_ZK_EXCEPTION_METRIC = UPLOADER_METRIC_PREFIX + "." + "watcher.zk.exception"; + public static final String WATCHER_LEADERSHIP_EXCEPTION_METRIC = UPLOADER_METRIC_PREFIX + "." + "watcher.leadership.exception"; } diff --git a/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/leadership/KafkaLeadershipWatcher.java b/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/leadership/KafkaLeadershipWatcher.java deleted file mode 100644 index 9128cb4..0000000 --- a/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/leadership/KafkaLeadershipWatcher.java +++ /dev/null @@ -1,204 +0,0 @@ -package com.pinterest.kafka.tieredstorage.uploader.leadership; - -import com.google.common.collect.Sets; -import com.google.gson.Gson; -import com.pinterest.kafka.tieredstorage.common.metrics.MetricRegistryManager; -import com.pinterest.kafka.tieredstorage.uploader.DirectoryTreeWatcher; -import com.pinterest.kafka.tieredstorage.uploader.Heartbeat; -import com.pinterest.kafka.tieredstorage.uploader.KafkaEnvironmentProvider; -import com.pinterest.kafka.tieredstorage.uploader.SegmentUploaderConfiguration; -import com.pinterest.kafka.tieredstorage.uploader.TopicPartitionState; -import com.pinterest.kafka.tieredstorage.uploader.UploaderMetrics; -import org.apache.kafka.common.TopicPartition; -import org.apache.log4j.LogManager; -import org.apache.log4j.Logger; -import org.apache.zookeeper.KeeperException; -import org.apache.zookeeper.ZooKeeper; -import org.apache.zookeeper.data.Stat; - -import java.io.IOException; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; - -/** - * KafkaLeadershipWatcher watches the leadership of Kafka partitions by pulling the state from ZooKeeper every X seconds. - * The poll interval is configurable via the zk.watcher.poll.interval.seconds configuration. - * - * Note that we are not using ZkWatcher push-based implementation because it was not reliably pushing all updates whenever - * a large amount of metadata updates were happening at the same time (e.g. a topic with thousands of partitions - * is being rebalanced). Pull-based approach is more reliable until we can integrate a more reliable push-based library - * that provides ZK updates. - */ -public class KafkaLeadershipWatcher { - private final static Logger LOG = LogManager.getLogger(KafkaLeadershipWatcher.class); - private ZooKeeper zooKeeper; - private final static int SESSION_TIMEOUT_MS = 60000; - private final static String TOPICS_ZK_PATH = "/brokers/topics"; - private final static String PARTITIONS_ZK_SUBPATH = "partitions"; - private final Set leadingPartitions = new HashSet<>(); - private final Stat stat = new Stat(); - private DirectoryTreeWatcher directoryTreeWatcher; - private final Gson gson = new Gson(); - private Heartbeat heartbeat; - private final ScheduledExecutorService executorService; - private final SegmentUploaderConfiguration config; - private final KafkaEnvironmentProvider environmentProvider; - - public KafkaLeadershipWatcher(DirectoryTreeWatcher directoryTreeWatcher, SegmentUploaderConfiguration config, KafkaEnvironmentProvider environmentProvider) throws IOException, InterruptedException { - this.config = config; - this.environmentProvider = environmentProvider; - executorService = Executors.newSingleThreadScheduledExecutor(); - initialize(directoryTreeWatcher); - } - - private void initialize(DirectoryTreeWatcher directoryTreeWatcher) throws IOException, InterruptedException { - initialize(environmentProvider.zookeeperConnect(), directoryTreeWatcher); - } - - private void initialize(String zkEndpoints, DirectoryTreeWatcher directoryTreeWatcher) throws IOException, InterruptedException { - if (zkEndpoints == null) - throw new IllegalArgumentException("Unexpect ZK endpoint: null"); - - this.directoryTreeWatcher = directoryTreeWatcher; - this.zooKeeper = new ZooKeeper(zkEndpoints, SESSION_TIMEOUT_MS, null); - while (!zooKeeper.getState().isConnected()) { - Thread.sleep(500); - } - heartbeat = new Heartbeat("watcher.zk", config, environmentProvider); - LOG.info(String.format("ZK watcher state: %s", zooKeeper.getState())); - } - - private void applyCurrentState() throws InterruptedException, KeeperException { - LOG.info("Starting to apply current ZK state"); - long startTime = System.currentTimeMillis(); - Set currentLeadingPartitions = new HashSet<>(); - int numPartitionsProcessed = 0; - try { - for (String topic: zooKeeper.getChildren(TOPICS_ZK_PATH, true)) { - String partitionsPath = String.format("%s/%s/%s", TOPICS_ZK_PATH, topic, PARTITIONS_ZK_SUBPATH); - for (String partition: zooKeeper.getChildren(partitionsPath, true)) { - String partitionStatePath = String.format("%s/%s/state", partitionsPath, partition); - processNodeDataChangedForPartitionState(partitionStatePath, currentLeadingPartitions); - numPartitionsProcessed++; - } - } - unwatchDeletedPartitions(currentLeadingPartitions); - LOG.info(String.format("Finished applying current ZK state in %dms. " + - "Number of partitions processed=%d, number of leading partitions=%d", - System.currentTimeMillis() - startTime, numPartitionsProcessed, currentLeadingPartitions.size())); - } catch (KeeperException | InterruptedException e) { - tryResetZkClient(e); - throw e; - } - } - - public void start() throws InterruptedException, KeeperException { - applyCurrentState(); - executorService.scheduleAtFixedRate(() -> { - try { - applyCurrentState(); - } catch (Exception e) { - LOG.error("Caught exception while applying current state", e); - MetricRegistryManager.getInstance(config.getMetricsConfiguration()).incrementCounter( - null, - null, - UploaderMetrics.WATCHER_ZK_EXCEPTION_METRIC, - "cluster=" + environmentProvider.clusterId(), - "broker=" + environmentProvider.brokerId() - ); - } - }, config.getZkWatcherPollIntervalSeconds(), config.getZkWatcherPollIntervalSeconds(), java.util.concurrent.TimeUnit.SECONDS); - } - - public void stop() throws InterruptedException { - zooKeeper.close(); - heartbeat.stop(); - } - - private void processNodeDataChangedForPartitionState(String path, Set currentLeadingPartitions) throws InterruptedException, KeeperException { - LOG.debug("Processing NodeDataChangedForPartitionState " + path); - String data = null; - try { - data = new String(zooKeeper.getData(path, true, stat)); - } catch (KeeperException.NoNodeException e) { - LOG.warn("Caught exception trying to get zk data from path. Don't panic if zNode was deleted," + - " we will unwatch it." + path, e); - } - TopicPartition topicPartition = fromPath(path); - if (data != null) { - TopicPartitionState topicPartitionState = gson.fromJson(data, TopicPartitionState.class); - if (topicPartitionState.getLeader() == environmentProvider.brokerId()) { - LOG.info(String.format("Current leader of %s matches this broker ID: %s", path, topicPartitionState.getLeader())); - currentLeadingPartitions.add(topicPartition); - leadingPartitions.add(topicPartition); - directoryTreeWatcher.watch(topicPartition); - MetricRegistryManager.getInstance(config.getMetricsConfiguration()).updateCounter( - topicPartition.topic(), - topicPartition.partition(), - UploaderMetrics.KAFKA_LEADER_COUNT_METRIC, - 1, - "cluster=" + environmentProvider.clusterId(), - "broker=" + environmentProvider.brokerId() - ); - } else if (leadingPartitions.contains(topicPartition)) { - // leadership change event - unwatchPartition(topicPartition); - } - } else if (leadingPartitions.contains(topicPartition)) { - // node deletion event - unwatchPartition(topicPartition); - } - } - - private void tryResetZkClient(Exception e) { - LOG.error("Hit a ZK exception. Will bounce the ZK client.", e); - try { - MetricRegistryManager.getInstance(config.getMetricsConfiguration()).incrementCounter( - null, - null, - UploaderMetrics.WATCHER_ZK_RESET_METRIC, - "cluster=" + environmentProvider.clusterId(), - "broker=" + environmentProvider.brokerId() - ); - stop(); - initialize(directoryTreeWatcher); - } catch (IOException | InterruptedException ex) { - LOG.error("Could not restore the ZK client.", ex); - throw new RuntimeException(ex); - } - } - - private void unwatchDeletedPartitions(Set currentLeadingPartitions) { - Set deletedPartitions = Sets.difference(leadingPartitions, currentLeadingPartitions).immutableCopy(); - for (TopicPartition leadingPartition: deletedPartitions) { - unwatchPartition(leadingPartition); - } - } - - private void unwatchPartition(TopicPartition topicPartition) { - leadingPartitions.remove(topicPartition); - directoryTreeWatcher.unwatch(topicPartition); - MetricRegistryManager.getInstance(config.getMetricsConfiguration()).incrementCounter( - topicPartition.topic(), - topicPartition.partition(), - UploaderMetrics.KAFKA_LEADER_UNSET_METRIC, - "cluster=" + environmentProvider.clusterId(), - "broker=" + environmentProvider.brokerId() - ); - MetricRegistryManager.getInstance(config.getMetricsConfiguration()).updateCounter( - topicPartition.topic(), - topicPartition.partition(), - UploaderMetrics.KAFKA_LEADER_COUNT_METRIC, - 0, - "cluster=" + environmentProvider.clusterId(), - "broker=" + environmentProvider.brokerId() - ); - } - - private TopicPartition fromPath(String path) { - String[] parts = path.split("/"); - return new TopicPartition(parts[3], Integer.parseInt(parts[5])); - } -} diff --git a/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/leadership/LeadershipWatcher.java b/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/leadership/LeadershipWatcher.java new file mode 100644 index 0000000..6486164 --- /dev/null +++ b/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/leadership/LeadershipWatcher.java @@ -0,0 +1,115 @@ +package com.pinterest.kafka.tieredstorage.uploader.leadership; + +import com.google.common.collect.Sets; +import com.pinterest.kafka.tieredstorage.common.metrics.MetricRegistryManager; +import com.pinterest.kafka.tieredstorage.uploader.DirectoryTreeWatcher; +import com.pinterest.kafka.tieredstorage.uploader.KafkaEnvironmentProvider; +import com.pinterest.kafka.tieredstorage.uploader.SegmentUploaderConfiguration; +import com.pinterest.kafka.tieredstorage.uploader.UploaderMetrics; +import org.apache.kafka.common.TopicPartition; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +public abstract class LeadershipWatcher { + private final static Logger LOG = LogManager.getLogger(LeadershipWatcher.class); + protected final ScheduledExecutorService executorService; + protected final DirectoryTreeWatcher directoryTreeWatcher; + protected final SegmentUploaderConfiguration config; + protected final KafkaEnvironmentProvider environmentProvider; + protected final Set leadingPartitions = new HashSet<>(); + private long lastPollTime = -1; + + public LeadershipWatcher(DirectoryTreeWatcher directoryTreeWatcher, SegmentUploaderConfiguration config, KafkaEnvironmentProvider environmentProvider) throws IOException, InterruptedException { + this.directoryTreeWatcher = directoryTreeWatcher; + this.config = config; + this.environmentProvider = environmentProvider; + executorService = Executors.newSingleThreadScheduledExecutor(); + initialize(); + } + + protected abstract void initialize() throws IOException, InterruptedException; + + protected abstract Set queryCurrentLeadingPartitions() throws Exception; + + protected void applyCurrentState() throws Exception { + long start = System.currentTimeMillis(); + LOG.info("Applying current leadership state. Last successful run was " + (System.currentTimeMillis() - lastPollTime) + "ms ago"); + Set currentLeadingPartitions = queryCurrentLeadingPartitions(); + LOG.info(String.format("Current leading partitions (%s): %s", currentLeadingPartitions.size(), currentLeadingPartitions)); + Set newPartitions = Sets.difference(currentLeadingPartitions, leadingPartitions).immutableCopy(); + LOG.info(String.format("Newly detected leading partitions (%s): %s", newPartitions.size(), newPartitions)); + Set removedPartitions = Sets.difference(leadingPartitions, currentLeadingPartitions).immutableCopy(); + LOG.info(String.format("No longer leading partitions (%s): %s", removedPartitions.size(), removedPartitions)); + for (TopicPartition topicPartition : newPartitions) { + watchPartition(topicPartition); + } + for (TopicPartition topicPartition : removedPartitions) { + unwatchPartition(topicPartition); + } + LOG.info("Finished applying current leadership state in " + (System.currentTimeMillis() - start) + "ms"); + lastPollTime = System.currentTimeMillis(); + } + + private void watchPartition(TopicPartition topicPartition) { + directoryTreeWatcher.watch(topicPartition); + leadingPartitions.add(topicPartition); + MetricRegistryManager.getInstance(config.getMetricsConfiguration()).incrementCounter( + topicPartition.topic(), + topicPartition.partition(), + UploaderMetrics.KAFKA_LEADER_SET_METRIC, + "cluster=" + environmentProvider.clusterId(), + "broker=" + environmentProvider.brokerId() + ); + } + + private void unwatchPartition(TopicPartition topicPartition) { + leadingPartitions.remove(topicPartition); + directoryTreeWatcher.unwatch(topicPartition); + MetricRegistryManager.getInstance(config.getMetricsConfiguration()).incrementCounter( + topicPartition.topic(), + topicPartition.partition(), + UploaderMetrics.KAFKA_LEADER_UNSET_METRIC, + "cluster=" + environmentProvider.clusterId(), + "broker=" + environmentProvider.brokerId() + ); + MetricRegistryManager.getInstance(config.getMetricsConfiguration()).updateCounter( + topicPartition.topic(), + topicPartition.partition(), + UploaderMetrics.KAFKA_LEADER_COUNT_METRIC, + 0, + "cluster=" + environmentProvider.clusterId(), + "broker=" + environmentProvider.brokerId() + ); + } + + public void start() throws Exception { + applyCurrentState(); + executorService.scheduleAtFixedRate(() -> { + try { + applyCurrentState(); + } catch (Exception e) { + LOG.error("Caught exception while applying current state", e); + MetricRegistryManager.getInstance(config.getMetricsConfiguration()).incrementCounter( + null, + null, + UploaderMetrics.WATCHER_LEADERSHIP_EXCEPTION_METRIC, + "cluster=" + environmentProvider.clusterId(), + "broker=" + environmentProvider.brokerId() + ); + } + }, config.getLeadershipWatcherPollIntervalSeconds(), config.getLeadershipWatcherPollIntervalSeconds(), java.util.concurrent.TimeUnit.SECONDS); + LOG.info("Started LeadershipWatcher with poll interval: " + config.getLeadershipWatcherPollIntervalSeconds()); + } + + public void stop() throws InterruptedException { + executorService.shutdown(); + LOG.info("Stopped LeadershipWatcher"); + } + +} diff --git a/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/leadership/ZookeeperLeadershipWatcher.java b/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/leadership/ZookeeperLeadershipWatcher.java new file mode 100644 index 0000000..7ac79bc --- /dev/null +++ b/ts-segment-uploader/src/main/java/com/pinterest/kafka/tieredstorage/uploader/leadership/ZookeeperLeadershipWatcher.java @@ -0,0 +1,133 @@ +package com.pinterest.kafka.tieredstorage.uploader.leadership; + +import com.google.gson.Gson; +import com.pinterest.kafka.tieredstorage.common.metrics.MetricRegistryManager; +import com.pinterest.kafka.tieredstorage.uploader.DirectoryTreeWatcher; +import com.pinterest.kafka.tieredstorage.uploader.Heartbeat; +import com.pinterest.kafka.tieredstorage.uploader.KafkaEnvironmentProvider; +import com.pinterest.kafka.tieredstorage.uploader.SegmentUploaderConfiguration; +import com.pinterest.kafka.tieredstorage.uploader.TopicPartitionState; +import com.pinterest.kafka.tieredstorage.uploader.UploaderMetrics; +import org.apache.kafka.common.TopicPartition; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.data.Stat; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Watches the leadership of Kafka partitions by pulling the state from ZooKeeper every X seconds and returning the + * current active set of partitions that are being led by this broker in the {@link #queryCurrentLeadingPartitions()} + * + * The poll interval is configurable via the leadership.watcher.poll.interval.seconds configuration, and relies on the + * {@link java.util.concurrent.ExecutorService} in parent {@link LeadershipWatcher} to schedule the polling. + */ +public class ZookeeperLeadershipWatcher extends LeadershipWatcher { + private final static Logger LOG = LogManager.getLogger(ZookeeperLeadershipWatcher.class); + private ZooKeeper zooKeeper; + private final static int SESSION_TIMEOUT_MS = 60000; + private final static String TOPICS_ZK_PATH = "/brokers/topics"; + private final static String PARTITIONS_ZK_SUBPATH = "partitions"; + private final Stat stat = new Stat(); + private final Gson gson = new Gson(); + private Heartbeat heartbeat; + private Set currentLeadingPartitions = new HashSet<>(); + + public ZookeeperLeadershipWatcher(DirectoryTreeWatcher directoryTreeWatcher, SegmentUploaderConfiguration config, KafkaEnvironmentProvider environmentProvider) throws IOException, InterruptedException { + super(directoryTreeWatcher, config, environmentProvider); + } + + @Override + protected void initialize() throws IOException, InterruptedException { + initializeZookeeperWatcher(environmentProvider.zookeeperConnect()); + } + + private void initializeZookeeperWatcher(String zkEndpoints) throws IOException, InterruptedException { + if (zkEndpoints == null) + throw new IllegalArgumentException("Unexpect ZK endpoint: null"); + + this.zooKeeper = new ZooKeeper(zkEndpoints, SESSION_TIMEOUT_MS, null); + while (!zooKeeper.getState().isConnected()) { + Thread.sleep(500); + } + heartbeat = new Heartbeat("watcher.zk", config, environmentProvider); + LOG.info(String.format("ZK watcher state: %s", zooKeeper.getState())); + } + + @Override + protected Set queryCurrentLeadingPartitions() throws Exception { + LOG.info("Starting to apply current ZK state"); + long startTime = System.currentTimeMillis(); + currentLeadingPartitions.clear(); + int numPartitionsProcessed = 0; + try { + for (String topic: zooKeeper.getChildren(TOPICS_ZK_PATH, true)) { + String partitionsPath = String.format("%s/%s/%s", TOPICS_ZK_PATH, topic, PARTITIONS_ZK_SUBPATH); + for (String partition : zooKeeper.getChildren(partitionsPath, true)) { + String path = String.format("%s/%s/state", partitionsPath, partition); + TopicPartition topicPartition = fromPath(path); + String data = null; + try { + data = new String(zooKeeper.getData(path, true, stat)); + } catch (KeeperException.NoNodeException e) { + LOG.warn("Caught exception trying to get zk data from path. Don't panic if zNode was deleted," + + " we will unwatch it." + path, e); + } + if (data != null) { + TopicPartitionState topicPartitionState = gson.fromJson(data, TopicPartitionState.class); + if (topicPartitionState.getLeader() == environmentProvider.brokerId()) { + LOG.info(String.format("Current leader of %s matches this broker ID: %s", path, topicPartitionState.getLeader())); + currentLeadingPartitions.add(topicPartition); + } + } + numPartitionsProcessed++; + } + } + LOG.info(String.format("Finished querying ZK for current leading partitions in %dms. " + + "Number of partitions processed=%d, number of leading partitions=%d", + System.currentTimeMillis() - startTime, numPartitionsProcessed, currentLeadingPartitions.size())); + return currentLeadingPartitions; + } catch (KeeperException | InterruptedException e) { + LOG.info("Caught exception trying to get zk data from path. Will reset the ZK client.", e); + tryResetZkClient(e); + throw e; + } + } + + @Override + public void stop() throws InterruptedException { + super.stop(); + zooKeeper.close(); + heartbeat.stop(); + LOG.info("Stopped ZookeeperLeadershipWatcher"); + } + + private void tryResetZkClient(Exception e) { + LOG.error("Hit a ZK exception. Will bounce the ZK client.", e); + try { + MetricRegistryManager.getInstance(config.getMetricsConfiguration()).incrementCounter( + null, + null, + UploaderMetrics.WATCHER_ZK_RESET_METRIC, + "cluster=" + environmentProvider.clusterId(), + "broker=" + environmentProvider.brokerId() + ); + this.zooKeeper.close(); + this.heartbeat.stop(); + initialize(); + } catch (Exception ex) { + LOG.error("Could not restore the ZK client.", ex); + throw new RuntimeException(ex); + } + } + + private TopicPartition fromPath(String path) { + String[] parts = path.split("/"); + return new TopicPartition(parts[3], Integer.parseInt(parts[5])); + } +} diff --git a/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestBase.java b/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestBase.java index 31cacd4..0eaa1f7 100644 --- a/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestBase.java +++ b/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestBase.java @@ -5,7 +5,6 @@ import com.google.gson.JsonArray; import com.google.gson.JsonObject; import com.salesforce.kafka.test.junit5.SharedKafkaTestResource; -import org.apache.commons.configuration2.ex.ConfigurationException; import org.apache.kafka.clients.admin.DescribeConfigsResult; import org.apache.kafka.common.config.ConfigResource; import org.apache.zookeeper.CreateMode; @@ -26,7 +25,6 @@ import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; import java.io.IOException; -import java.lang.reflect.InvocationTargetException; import java.net.URI; import java.nio.file.Files; import java.nio.file.Path; @@ -48,7 +46,7 @@ public class TestBase { protected S3Client s3Client; @BeforeEach - void setup() throws InterruptedException, IOException, KeeperException, ConfigurationException, ExecutionException, ClassNotFoundException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException { + public void setup() throws Exception { s3Client = S3Client.builder() .endpointOverride(URI.create(S3_MOCK.getServiceEndpoint())) .region(Region.US_EAST_1) @@ -58,11 +56,88 @@ void setup() throws InterruptedException, IOException, KeeperException, Configur } @AfterEach - void tearDown() throws IOException, InterruptedException, ExecutionException { + public void tearDown() throws IOException, InterruptedException, ExecutionException { Thread.sleep(5000); s3Client.close(); } + public static void overrideS3ClientForFileUploaderAndDownloader(S3Client s3Client) { + MultiThreadedS3FileUploader.overrideS3Client(s3Client); + S3FileDownloader.overrideS3Client(s3Client); + } + + public static KafkaEnvironmentProvider createTestEnvironmentProvider(String suppliedZkConnect, String suppliedLogDir) { + KafkaEnvironmentProvider environmentProvider = new KafkaEnvironmentProvider() { + + private String zookeeperConnect; + private String logDir; + @Override + public void load() { + this.zookeeperConnect = suppliedZkConnect; + this.logDir = suppliedLogDir; + } + + @Override + public String clusterId() { + return TEST_CLUSTER; + } + + @Override + public Integer brokerId() { + return 1; + } + + @Override + public String zookeeperConnect() { + return zookeeperConnect; + } + + @Override + public String logDir() { + return logDir; + } + }; + return environmentProvider; + } + + public static KafkaEnvironmentProvider createTestEnvironmentProvider(SharedKafkaTestResource sharedKafkaTestResource) { + KafkaEnvironmentProvider environmentProvider = new KafkaEnvironmentProvider() { + + private String zookeeperConnect; + private String logDir; + @Override + public void load() { + this.zookeeperConnect = sharedKafkaTestResource.getZookeeperConnectString(); + try { + this.logDir = getBrokerConfig(sharedKafkaTestResource, 1, "log.dir"); + } catch (ExecutionException | InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public String clusterId() { + return TEST_CLUSTER; + } + + @Override + public Integer brokerId() { + return 1; + } + + @Override + public String zookeeperConnect() { + return zookeeperConnect; + } + + @Override + public String logDir() { + return logDir; + } + }; + return environmentProvider; + } + protected static void sendTestData(SharedKafkaTestResource sharedKafkaTestResource, String topic, int partition, int numRecords) { sharedKafkaTestResource.getKafkaTestUtils().produceRecords(numRecords, topic, partition); } @@ -129,9 +204,17 @@ protected static ListObjectsV2Response getListObjectsV2Response(String bucket, S } protected static void reassignPartitions(SharedKafkaTestResource sharedKafkaTestResource, Map>> assignmentMap) throws IOException, InterruptedException, KeeperException { + String path = "/admin/reassign_partitions"; ZooKeeper zk = new ZooKeeper(sharedKafkaTestResource.getZookeeperConnectString(), 10000, null); String assignmentJson = assignmentMapToJson(assignmentMap); - zk.create("/admin/reassign_partitions", assignmentJson.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + if (zk.exists(path, false) == null) { + zk.create(path, assignmentJson.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); + } + while (zk.exists(path, false) == null) { + // wait until znode exists + Thread.sleep(200); + } + zk.setData(path, assignmentJson.getBytes(), -1); zk.close(); // wait for reassignment to complete diff --git a/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestDirectoryTreeWatcher.java b/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestDirectoryTreeWatcher.java index d96b630..8ce023e 100644 --- a/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestDirectoryTreeWatcher.java +++ b/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestDirectoryTreeWatcher.java @@ -1,12 +1,11 @@ package com.pinterest.kafka.tieredstorage.uploader; import com.pinterest.kafka.tieredstorage.common.discovery.s3.MockS3StorageServiceEndpointProvider; +import com.pinterest.kafka.tieredstorage.uploader.leadership.ZookeeperLeadershipWatcher; import com.salesforce.kafka.test.junit5.SharedKafkaTestResource; -import org.apache.commons.configuration2.ex.ConfigurationException; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.NewPartitions; import org.apache.kafka.common.TopicPartition; -import org.apache.zookeeper.KeeperException; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -16,7 +15,6 @@ import java.io.File; import java.io.IOException; -import java.lang.reflect.InvocationTargetException; import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.WatchKey; @@ -42,61 +40,39 @@ public class TestDirectoryTreeWatcher extends TestBase { @BeforeEach @Override - void setup() throws ConfigurationException, IOException, InterruptedException, KeeperException, ExecutionException, ClassNotFoundException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException { + public void setup() throws Exception { super.setup(); - environmentProvider = new KafkaEnvironmentProvider() { - - private String zookeeperConnect; - private String logDir; - @Override - public void load() { - this.zookeeperConnect = sharedKafkaTestResource.getZookeeperConnectString(); - try { - this.logDir = getBrokerConfig(sharedKafkaTestResource, 1, "log.dir"); - } catch (ExecutionException | InterruptedException e) { - throw new RuntimeException(e); - } - } - - @Override - public String clusterId() { - return TEST_CLUSTER; - } - - @Override - public Integer brokerId() { - return 1; - } - - @Override - public String zookeeperConnect() { - return zookeeperConnect; - } - @Override - public String logDir() { - return logDir; - } - }; + // environment provider setup + environmentProvider = createTestEnvironmentProvider(sharedKafkaTestResource); environmentProvider.load(); - MultiThreadedS3FileUploader.overrideS3Client(s3Client); - S3FileDownloader.overrideS3Client(s3Client); - createTopicAndVerify(sharedKafkaTestResource, TEST_TOPIC_A, 3); + // override s3 client + overrideS3ClientForFileUploaderAndDownloader(s3Client); + + // endpoint provider setup MockS3StorageServiceEndpointProvider endpointProvider = new MockS3StorageServiceEndpointProvider(); endpointProvider.initialize(TEST_CLUSTER); - MultiThreadedS3FileUploader.overrideS3Client(s3Client); + + // s3 uploader setup SegmentUploaderConfiguration config = new SegmentUploaderConfiguration("src/test/resources", TEST_CLUSTER); S3FileUploader s3FileUploader = new MultiThreadedS3FileUploader(endpointProvider, config, environmentProvider); + + // create topic + createTopicAndVerify(sharedKafkaTestResource, TEST_TOPIC_A, 3); + + // start directory tree watcher directoryTreeWatcher = new DirectoryTreeWatcher(s3FileUploader, config, environmentProvider); + DirectoryTreeWatcher.setLeadershipWatcher(new ZookeeperLeadershipWatcher(directoryTreeWatcher, config, environmentProvider)); + directoryTreeWatcher.initialize(); directoryTreeWatcher.start(); } @AfterEach @Override - void tearDown() throws IOException, ExecutionException, InterruptedException { + public void tearDown() throws IOException, ExecutionException, InterruptedException { directoryTreeWatcher.stop(); - DirectoryTreeWatcher.unsetKafkaLeadershipWatcher(); + DirectoryTreeWatcher.unsetLeadershipWatcher(); super.tearDown(); } diff --git a/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestDirectoryTreeWatcherMultiBroker.java b/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestDirectoryTreeWatcherMultiBroker.java index efe9b08..5695420 100644 --- a/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestDirectoryTreeWatcherMultiBroker.java +++ b/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestDirectoryTreeWatcherMultiBroker.java @@ -1,10 +1,9 @@ package com.pinterest.kafka.tieredstorage.uploader; import com.pinterest.kafka.tieredstorage.common.discovery.s3.MockS3StorageServiceEndpointProvider; +import com.pinterest.kafka.tieredstorage.uploader.leadership.ZookeeperLeadershipWatcher; import com.salesforce.kafka.test.junit5.SharedKafkaTestResource; -import org.apache.commons.configuration2.ex.ConfigurationException; import org.apache.kafka.clients.admin.AdminClient; -import org.apache.zookeeper.KeeperException; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -13,7 +12,6 @@ import org.junit.jupiter.api.extension.RegisterExtension; import java.io.IOException; -import java.lang.reflect.InvocationTargetException; import java.nio.file.Path; import java.nio.file.WatchKey; import java.util.HashMap; @@ -36,62 +34,39 @@ public class TestDirectoryTreeWatcherMultiBroker extends TestBase { @BeforeEach @Override - void setup() throws ConfigurationException, IOException, InterruptedException, KeeperException, ExecutionException, ClassNotFoundException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException { + public void setup() throws Exception { super.setup(); - KafkaEnvironmentProvider environmentProvider = new KafkaEnvironmentProvider() { - - private String zookeeperConnect; - private String logDir; - - @Override - public void load() { - this.zookeeperConnect = sharedKafkaTestResource.getZookeeperConnectString(); - try { - this.logDir = getBrokerConfig(sharedKafkaTestResource, 1, "log.dir"); - } catch (ExecutionException | InterruptedException e) { - throw new RuntimeException(e); - } - } - - @Override - public String clusterId() { - return TEST_CLUSTER; - } - - @Override - public Integer brokerId() { - return 1; - } - - @Override - public String zookeeperConnect() { - return zookeeperConnect; - } - - @Override - public String logDir() { - return logDir; - } - }; + + // environment provider setup + KafkaEnvironmentProvider environmentProvider = createTestEnvironmentProvider(sharedKafkaTestResource); environmentProvider.load(); - MultiThreadedS3FileUploader.overrideS3Client(s3Client); - S3FileDownloader.overrideS3Client(s3Client); - createTopicAndVerify(sharedKafkaTestResource, TEST_TOPIC_A, 6, (short) 2); + // override s3 client + overrideS3ClientForFileUploaderAndDownloader(s3Client); + + // endpoint provider setup MockS3StorageServiceEndpointProvider endpointProvider = new MockS3StorageServiceEndpointProvider(); endpointProvider.initialize(TEST_CLUSTER); - MultiThreadedS3FileUploader.overrideS3Client(s3Client); + + // s3 uploader setup SegmentUploaderConfiguration config = new SegmentUploaderConfiguration("src/test/resources", TEST_CLUSTER); S3FileUploader s3FileUploader = new MultiThreadedS3FileUploader(endpointProvider, config, environmentProvider); + + // create topic with replicationFactor = 2 + createTopicAndVerify(sharedKafkaTestResource, TEST_TOPIC_A, 6, (short) 2); + + // start directory tree watcher directoryTreeWatcher = new DirectoryTreeWatcher(s3FileUploader, config, environmentProvider); + DirectoryTreeWatcher.setLeadershipWatcher(new ZookeeperLeadershipWatcher(directoryTreeWatcher, config, environmentProvider)); + directoryTreeWatcher.initialize(); directoryTreeWatcher.start(); } @AfterEach @Override - void tearDown() throws IOException, ExecutionException, InterruptedException { + public void tearDown() throws IOException, ExecutionException, InterruptedException { directoryTreeWatcher.stop(); - DirectoryTreeWatcher.unsetKafkaLeadershipWatcher(); + DirectoryTreeWatcher.unsetLeadershipWatcher(); super.tearDown(); } diff --git a/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestKafkaSegmentUploader.java b/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestKafkaSegmentUploader.java index 718a6b1..0dc980a 100644 --- a/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestKafkaSegmentUploader.java +++ b/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestKafkaSegmentUploader.java @@ -3,7 +3,6 @@ import com.pinterest.kafka.tieredstorage.common.discovery.s3.S3StorageServiceEndpoint; import com.pinterest.kafka.tieredstorage.common.discovery.s3.S3StorageServiceEndpointProvider; import com.salesforce.kafka.test.junit5.SharedKafkaTestResource; -import org.apache.commons.configuration2.ex.ConfigurationException; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -12,7 +11,6 @@ import org.apache.kafka.common.header.internals.RecordHeader; import org.apache.kafka.common.header.internals.RecordHeaders; import org.apache.kafka.common.serialization.ByteArraySerializer; -import org.apache.zookeeper.KeeperException; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; @@ -26,7 +24,6 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; -import java.lang.reflect.InvocationTargetException; import java.nio.charset.StandardCharsets; import java.util.Properties; import java.util.concurrent.ExecutionException; @@ -49,42 +46,9 @@ public class TestKafkaSegmentUploader extends TestBase { @Override @BeforeEach - void setup() throws InterruptedException, IOException, KeeperException, ConfigurationException, ExecutionException, ClassNotFoundException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException { + public void setup() throws Exception { super.setup(); - environmentProvider = new KafkaEnvironmentProvider() { - - private String zookeeperConnect; - private String logDir; - @Override - public void load() { - this.zookeeperConnect = sharedKafkaTestResource.getZookeeperConnectString(); - try { - this.logDir = getBrokerConfig(sharedKafkaTestResource, 1, "log.dir"); - } catch (ExecutionException | InterruptedException e) { - throw new RuntimeException(e); - } - } - - @Override - public String clusterId() { - return TEST_CLUSTER; - } - - @Override - public Integer brokerId() { - return 1; - } - - @Override - public String zookeeperConnect() { - return zookeeperConnect; - } - - @Override - public String logDir() { - return logDir; - } - }; + environmentProvider = createTestEnvironmentProvider(sharedKafkaTestResource); createTopicAndVerify(sharedKafkaTestResource, TEST_TOPIC_A, 3); createTopicAndVerify(sharedKafkaTestResource, TEST_TOPIC_B, 6); startSegmentUploaderThread(); @@ -92,9 +56,9 @@ public String logDir() { @Override @AfterEach - void tearDown() throws IOException, InterruptedException, ExecutionException { + public void tearDown() throws IOException, InterruptedException, ExecutionException { uploader.stop(); - DirectoryTreeWatcher.unsetKafkaLeadershipWatcher(); + DirectoryTreeWatcher.unsetLeadershipWatcher(); deleteTopicAndVerify(sharedKafkaTestResource, TEST_TOPIC_A); sharedKafkaTestResource.getKafkaTestUtils().getAdminClient().close(); super.tearDown(); @@ -110,10 +74,9 @@ static void tearDownAll() { adminClient.close(); } - private void startSegmentUploaderThread() throws InterruptedException, IOException, KeeperException, ClassNotFoundException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException { + private void startSegmentUploaderThread() throws Exception { String configDirectory = "src/test/resources"; - MultiThreadedS3FileUploader.overrideS3Client(s3Client); - S3FileDownloader.overrideS3Client(s3Client); + overrideS3ClientForFileUploaderAndDownloader(s3Client); uploader = new KafkaSegmentUploader(configDirectory, environmentProvider); uploader.start(); } diff --git a/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestMultiThreadedS3FileUploader.java b/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestMultiThreadedS3FileUploader.java index 0b98e43..4dbf817 100644 --- a/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestMultiThreadedS3FileUploader.java +++ b/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/TestMultiThreadedS3FileUploader.java @@ -27,7 +27,7 @@ public class TestMultiThreadedS3FileUploader extends TestBase { private SegmentUploaderConfiguration config; @BeforeEach - void setup() throws ConfigurationException, IOException, InterruptedException, KeeperException, ExecutionException, ClassNotFoundException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException { + public void setup() throws Exception { super.setup(); // NO-OP KafkaEnvironmentProvider environmentProvider = new KafkaEnvironmentProvider() { diff --git a/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/leadership/TestLeadershipWatcher.java b/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/leadership/TestLeadershipWatcher.java new file mode 100644 index 0000000..87dfffe --- /dev/null +++ b/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/leadership/TestLeadershipWatcher.java @@ -0,0 +1,122 @@ +package com.pinterest.kafka.tieredstorage.uploader.leadership; + +import com.pinterest.kafka.tieredstorage.common.discovery.s3.MockS3StorageServiceEndpointProvider; +import com.pinterest.kafka.tieredstorage.uploader.DirectoryTreeWatcher; +import com.pinterest.kafka.tieredstorage.uploader.KafkaEnvironmentProvider; +import com.pinterest.kafka.tieredstorage.uploader.MultiThreadedS3FileUploader; +import com.pinterest.kafka.tieredstorage.uploader.S3FileUploader; +import com.pinterest.kafka.tieredstorage.uploader.SegmentUploaderConfiguration; +import com.pinterest.kafka.tieredstorage.uploader.TestBase; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestLeadershipWatcher extends TestBase { + private static final Set watchedTopicPartitions = new HashSet<>(); + private static final Set currentLeadingPartitions = new HashSet<>(); + private LeadershipWatcher mockLeadershipWatcher; + + @BeforeEach + @Override + public void setup() throws Exception { + super.setup(); + + // environment provider setup + KafkaEnvironmentProvider environmentProvider = createTestEnvironmentProvider("sampleZkConnect", "sampleLogDir"); + environmentProvider.load(); + + // override s3 client + overrideS3ClientForFileUploaderAndDownloader(s3Client); + + // endpoint provider setup + MockS3StorageServiceEndpointProvider endpointProvider = new MockS3StorageServiceEndpointProvider(); + endpointProvider.initialize(TEST_CLUSTER); + + // s3 uploader setup + SegmentUploaderConfiguration config = new SegmentUploaderConfiguration("src/test/resources", TEST_CLUSTER); + S3FileUploader s3FileUploader = new MultiThreadedS3FileUploader(endpointProvider, config, environmentProvider); + + // create watchers + DirectoryTreeWatcher directoryTreeWatcher = new MockDirectoryTreeWatcher(watchedTopicPartitions, s3FileUploader, config, environmentProvider); + mockLeadershipWatcher = new MockLeadershipWatcher(directoryTreeWatcher, config, environmentProvider); + } + + @Test + void testApplyCurrentState() throws Exception { + // test initial state + mockLeadershipWatcher.applyCurrentState(); + assertEquals(0, watchedTopicPartitions.size()); + + // test addition + currentLeadingPartitions.add(new TopicPartition(TEST_TOPIC_A, 0)); + currentLeadingPartitions.add(new TopicPartition(TEST_TOPIC_A, 1)); + currentLeadingPartitions.add(new TopicPartition(TEST_TOPIC_A, 2)); + mockLeadershipWatcher.applyCurrentState(); + assertEquals(3, watchedTopicPartitions.size()); + assertEquals(currentLeadingPartitions, watchedTopicPartitions); + + // test removal + currentLeadingPartitions.remove(new TopicPartition(TEST_TOPIC_A, 0)); + mockLeadershipWatcher.applyCurrentState(); + assertEquals(2, watchedTopicPartitions.size()); + assertEquals(currentLeadingPartitions, watchedTopicPartitions); + + // test addition and removal + currentLeadingPartitions.add(new TopicPartition(TEST_TOPIC_A, 0)); + currentLeadingPartitions.add(new TopicPartition(TEST_TOPIC_A, 3)); + currentLeadingPartitions.add(new TopicPartition(TEST_TOPIC_A, 4)); + currentLeadingPartitions.add(new TopicPartition(TEST_TOPIC_A, 5)); + currentLeadingPartitions.remove(new TopicPartition(TEST_TOPIC_A, 1)); + mockLeadershipWatcher.applyCurrentState(); + assertEquals(5, watchedTopicPartitions.size()); + assertEquals(currentLeadingPartitions, watchedTopicPartitions); + + // test no change + mockLeadershipWatcher.applyCurrentState(); + assertEquals(5, watchedTopicPartitions.size()); + assertEquals(currentLeadingPartitions, watchedTopicPartitions); + } + + private static class MockLeadershipWatcher extends LeadershipWatcher { + + public MockLeadershipWatcher(DirectoryTreeWatcher directoryTreeWatcher, SegmentUploaderConfiguration config, KafkaEnvironmentProvider environmentProvider) throws IOException, InterruptedException { + super(directoryTreeWatcher, config, environmentProvider); + } + + @Override + protected void initialize() throws IOException, InterruptedException { + // no-op + } + + @Override + protected Set queryCurrentLeadingPartitions() throws Exception { + return currentLeadingPartitions; + } + } + + protected static class MockDirectoryTreeWatcher extends DirectoryTreeWatcher { + + private final Set watchedTopicPartitions; + + public MockDirectoryTreeWatcher(Set watchedTopicPartitions, S3FileUploader s3FileUploader, SegmentUploaderConfiguration config, KafkaEnvironmentProvider environmentProvider) throws Exception { + super(s3FileUploader, config, environmentProvider); + this.watchedTopicPartitions = watchedTopicPartitions; + } + + @Override + public void watch(TopicPartition topicPartition) { + watchedTopicPartitions.add(topicPartition); + } + + @Override + public void unwatch(TopicPartition topicPartition) { + watchedTopicPartitions.remove(topicPartition); + } + } +} diff --git a/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/leadership/TestZookeeperLeadershipWatcher.java b/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/leadership/TestZookeeperLeadershipWatcher.java new file mode 100644 index 0000000..8700e1b --- /dev/null +++ b/ts-segment-uploader/src/test/java/com/pinterest/kafka/tieredstorage/uploader/leadership/TestZookeeperLeadershipWatcher.java @@ -0,0 +1,156 @@ +package com.pinterest.kafka.tieredstorage.uploader.leadership; + +import com.pinterest.kafka.tieredstorage.common.discovery.s3.MockS3StorageServiceEndpointProvider; +import com.pinterest.kafka.tieredstorage.uploader.DirectoryTreeWatcher; +import com.pinterest.kafka.tieredstorage.uploader.KafkaEnvironmentProvider; +import com.pinterest.kafka.tieredstorage.uploader.MultiThreadedS3FileUploader; +import com.pinterest.kafka.tieredstorage.uploader.S3FileUploader; +import com.pinterest.kafka.tieredstorage.uploader.SegmentUploaderConfiguration; +import com.pinterest.kafka.tieredstorage.uploader.TestBase; +import com.salesforce.kafka.test.junit5.SharedKafkaTestResource; +import org.apache.kafka.common.TopicPartition; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ExecutionException; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +public class TestZookeeperLeadershipWatcher extends TestBase { + + @RegisterExtension + private static final SharedKafkaTestResource sharedKafkaTestResource = new SharedKafkaTestResource() + .withBrokerProperty("log.segment.bytes", "30000") + .withBrokerProperty("log.segment.delete.delay.ms", "5000") + .withBrokers(12); + private static Set watchedTopicPartitions = new HashSet<>(); + private DirectoryTreeWatcher mockDirectoryTreeWatcher; + private LeadershipWatcher zkLeadershipWatcher; + + @BeforeEach + @Override + public void setup() throws Exception { + super.setup(); + + // environment provider setup + KafkaEnvironmentProvider environmentProvider = createTestEnvironmentProvider(sharedKafkaTestResource); + environmentProvider.load(); + + // override s3 client + overrideS3ClientForFileUploaderAndDownloader(s3Client); + + // endpoint provider setup + MockS3StorageServiceEndpointProvider endpointProvider = new MockS3StorageServiceEndpointProvider(); + endpointProvider.initialize(TEST_CLUSTER); + + // s3 uploader setup + SegmentUploaderConfiguration config = new SegmentUploaderConfiguration("src/test/resources", TEST_CLUSTER); + S3FileUploader s3FileUploader = new MultiThreadedS3FileUploader(endpointProvider, config, environmentProvider); + + // start directory tree watcher + mockDirectoryTreeWatcher = new TestLeadershipWatcher.MockDirectoryTreeWatcher(watchedTopicPartitions, s3FileUploader, config, environmentProvider); + zkLeadershipWatcher = new ZookeeperLeadershipWatcher(mockDirectoryTreeWatcher, config, environmentProvider); + DirectoryTreeWatcher.setLeadershipWatcher(zkLeadershipWatcher); + zkLeadershipWatcher.initialize(); + } + + @AfterEach + @Override + public void tearDown() throws IOException, ExecutionException, InterruptedException { + deleteTopicAndVerify(sharedKafkaTestResource, TEST_TOPIC_A); + mockDirectoryTreeWatcher.stop(); + super.tearDown(); + } + + @Test + void testQueryCurrentLeadingPartitions() throws Exception { + Set currentLeadingPartitions = zkLeadershipWatcher.queryCurrentLeadingPartitions(); + + // currently no leading partitions + assertEquals(0, currentLeadingPartitions.size()); + assertEquals(0, watchedTopicPartitions.size()); + + // create topic with 1 partition led by each of the 12 brokers + createTopicAndVerify(sharedKafkaTestResource, TEST_TOPIC_A, 12, (short) 2); + + currentLeadingPartitions = zkLeadershipWatcher.queryCurrentLeadingPartitions(); + assertEquals(1, currentLeadingPartitions.size()); + TopicPartition leadingPartition = currentLeadingPartitions.iterator().next(); + + Map>> assignmentMap = new HashMap<>(); + assignmentMap.put(TEST_TOPIC_A, new HashMap<>()); + + // reassign original leading partition to broker 2 + assignmentMap.get(TEST_TOPIC_A).put(leadingPartition.partition(), List.of(2)); + reassignPartitions(sharedKafkaTestResource, assignmentMap); + Thread.sleep(2000); // wait for reassignment to complete + + currentLeadingPartitions = zkLeadershipWatcher.queryCurrentLeadingPartitions(); + assertEquals(0, currentLeadingPartitions.size()); + + // broker 1 leads partitions 0-2 + assignmentMap.get(TEST_TOPIC_A).clear(); + assignmentMap.get(TEST_TOPIC_A).put(0, List.of(1)); + assignmentMap.get(TEST_TOPIC_A).put(1, List.of(1)); + assignmentMap.get(TEST_TOPIC_A).put(2, List.of(1)); + reassignPartitions(sharedKafkaTestResource, assignmentMap); + Thread.sleep(2000); // wait for reassignment to complete + + currentLeadingPartitions = zkLeadershipWatcher.queryCurrentLeadingPartitions(); + assertEquals(3, currentLeadingPartitions.size()); + assertLeadingPartitions(Set.of(0, 1, 2), currentLeadingPartitions); + + // broker 1 leads partitions 0-2, 4-6, 8-11 + assignmentMap.get(TEST_TOPIC_A).clear(); + assignmentMap.get(TEST_TOPIC_A).put(4, List.of(1)); + assignmentMap.get(TEST_TOPIC_A).put(5, List.of(1)); + assignmentMap.get(TEST_TOPIC_A).put(6, List.of(1)); + assignmentMap.get(TEST_TOPIC_A).put(8, List.of(1)); + assignmentMap.get(TEST_TOPIC_A).put(9, List.of(1)); + assignmentMap.get(TEST_TOPIC_A).put(10, List.of(1)); + assignmentMap.get(TEST_TOPIC_A).put(11, List.of(1)); + reassignPartitions(sharedKafkaTestResource, assignmentMap); + Thread.sleep(2000); // wait for reassignment to complete + + currentLeadingPartitions = zkLeadershipWatcher.queryCurrentLeadingPartitions(); + assertEquals(10, currentLeadingPartitions.size()); + assertLeadingPartitions(Set.of(0, 1, 2, 4, 5, 6, 8, 9, 10, 11), currentLeadingPartitions); + + // broker 1 leads partitions 4-11 + assignmentMap.get(TEST_TOPIC_A).clear(); + assignmentMap.get(TEST_TOPIC_A).put(0, List.of(2)); + assignmentMap.get(TEST_TOPIC_A).put(1, List.of(2)); + assignmentMap.get(TEST_TOPIC_A).put(2, List.of(2)); + assignmentMap.get(TEST_TOPIC_A).put(3, List.of(2)); + assignmentMap.get(TEST_TOPIC_A).put(4, List.of(1)); + assignmentMap.get(TEST_TOPIC_A).put(5, List.of(1)); + assignmentMap.get(TEST_TOPIC_A).put(6, List.of(1)); + assignmentMap.get(TEST_TOPIC_A).put(7, List.of(1)); + assignmentMap.get(TEST_TOPIC_A).put(8, List.of(1)); + assignmentMap.get(TEST_TOPIC_A).put(9, List.of(1)); + assignmentMap.get(TEST_TOPIC_A).put(10, List.of(1)); + assignmentMap.get(TEST_TOPIC_A).put(11, List.of(1)); + reassignPartitions(sharedKafkaTestResource, assignmentMap); + Thread.sleep(2000); // wait for reassignment to complete + + currentLeadingPartitions = zkLeadershipWatcher.queryCurrentLeadingPartitions(); + assertEquals(8, currentLeadingPartitions.size()); + assertLeadingPartitions(Set.of(4, 5, 6, 7, 8, 9, 10, 11), currentLeadingPartitions); + } + + private static void assertLeadingPartitions(Set expectedPartitions, Set currentLeadingPartitions) { + Set expectedLeadingPartitions = new HashSet<>(); + for (int partition : expectedPartitions) { + expectedLeadingPartitions.add(new TopicPartition(TEST_TOPIC_A, partition)); + } + assertEquals(expectedLeadingPartitions, currentLeadingPartitions); + } +} diff --git a/ts-segment-uploader/src/test/resources/test-cluster-2.properties b/ts-segment-uploader/src/test/resources/test-cluster-2.properties index ac464c8..5ef41d0 100644 --- a/ts-segment-uploader/src/test/resources/test-cluster-2.properties +++ b/ts-segment-uploader/src/test/resources/test-cluster-2.properties @@ -5,5 +5,6 @@ ts.segment.uploader.kafka.topics.exclude= storage.service.endpoint.provider.class=com.pinterest.kafka.tieredstorage.common.discovery.s3.MockS3StorageServiceEndpointProvider -ts.segment.uploader.zk.watcher.poll.interval.seconds=5 +ts.segment.uploader.leadership.watcher.class=com.pinterest.kafka.tieredstorage.uploader.leadership.ZookeeperLeadershipWatcher +ts.segment.uploader.leadership.watcher.poll.interval.seconds=5 ts.segment.uploader.s3.prefix.entropy.bits=5 \ No newline at end of file diff --git a/ts-segment-uploader/src/test/resources/test-cluster-all-exclude.properties b/ts-segment-uploader/src/test/resources/test-cluster-all-exclude.properties index b69ffb5..f16b3d0 100644 --- a/ts-segment-uploader/src/test/resources/test-cluster-all-exclude.properties +++ b/ts-segment-uploader/src/test/resources/test-cluster-all-exclude.properties @@ -3,4 +3,5 @@ ts.segment.uploader.kafka.topics.include=test_topic ts.segment.uploader.kafka.topics.exclude=.* -storage.service.endpoint.provider.class=com.pinterest.kafka.tieredstorage.common.discovery.s3.MockS3StorageServiceEndpointProvider \ No newline at end of file +storage.service.endpoint.provider.class=com.pinterest.kafka.tieredstorage.common.discovery.s3.MockS3StorageServiceEndpointProvider +ts.segment.uploader.leadership.watcher.class=com.pinterest.kafka.tieredstorage.uploader.leadership.ZookeeperLeadershipWatcher \ No newline at end of file diff --git a/ts-segment-uploader/src/test/resources/test-cluster-all-include.properties b/ts-segment-uploader/src/test/resources/test-cluster-all-include.properties index 51c7200..4305eb6 100644 --- a/ts-segment-uploader/src/test/resources/test-cluster-all-include.properties +++ b/ts-segment-uploader/src/test/resources/test-cluster-all-include.properties @@ -3,4 +3,5 @@ ts.segment.uploader.kafka.topics.include=.*, test_topic ts.segment.uploader.kafka.topics.exclude= -storage.service.endpoint.provider.class=com.pinterest.kafka.tieredstorage.common.discovery.s3.MockS3StorageServiceEndpointProvider \ No newline at end of file +storage.service.endpoint.provider.class=com.pinterest.kafka.tieredstorage.common.discovery.s3.MockS3StorageServiceEndpointProvider +ts.segment.uploader.leadership.watcher.class=com.pinterest.kafka.tieredstorage.uploader.leadership.ZookeeperLeadershipWatcher \ No newline at end of file diff --git a/ts-segment-uploader/src/test/resources/test-cluster-specific-exclude.properties b/ts-segment-uploader/src/test/resources/test-cluster-specific-exclude.properties index f41b7b3..df39111 100644 --- a/ts-segment-uploader/src/test/resources/test-cluster-specific-exclude.properties +++ b/ts-segment-uploader/src/test/resources/test-cluster-specific-exclude.properties @@ -3,4 +3,5 @@ ts.segment.uploader.kafka.topics.include=.* ts.segment.uploader.kafka.topics.exclude=test_topic_0, test_topic_1,dev_topic_0, my_topic,test_topic_2, foo_topic, bar_topic -storage.service.endpoint.provider.class=com.pinterest.kafka.tieredstorage.common.discovery.s3.MockS3StorageServiceEndpointProvider \ No newline at end of file +storage.service.endpoint.provider.class=com.pinterest.kafka.tieredstorage.common.discovery.s3.MockS3StorageServiceEndpointProvider +ts.segment.uploader.leadership.watcher.class=com.pinterest.kafka.tieredstorage.uploader.leadership.ZookeeperLeadershipWatcher \ No newline at end of file diff --git a/ts-segment-uploader/src/test/resources/test-cluster-specific-include.properties b/ts-segment-uploader/src/test/resources/test-cluster-specific-include.properties index bdb601d..d0df079 100644 --- a/ts-segment-uploader/src/test/resources/test-cluster-specific-include.properties +++ b/ts-segment-uploader/src/test/resources/test-cluster-specific-include.properties @@ -3,4 +3,5 @@ ts.segment.uploader.kafka.topics.include=test_topic_0, test_topic_1,dev_topic_0, my_topic,test_topic_2, foo_topic, bar_topic ts.segment.uploader.kafka.topics.exclude= -storage.service.endpoint.provider.class=com.pinterest.kafka.tieredstorage.common.discovery.s3.MockS3StorageServiceEndpointProvider \ No newline at end of file +storage.service.endpoint.provider.class=com.pinterest.kafka.tieredstorage.common.discovery.s3.MockS3StorageServiceEndpointProvider +ts.segment.uploader.leadership.watcher.class=com.pinterest.kafka.tieredstorage.uploader.leadership.ZookeeperLeadershipWatcher \ No newline at end of file diff --git a/ts-segment-uploader/src/test/resources/test-cluster.properties b/ts-segment-uploader/src/test/resources/test-cluster.properties index 97b0d73..7455c8b 100644 --- a/ts-segment-uploader/src/test/resources/test-cluster.properties +++ b/ts-segment-uploader/src/test/resources/test-cluster.properties @@ -3,4 +3,6 @@ ts.segment.uploader.kafka.topics.include=test_topic_0, test_topic_1,dev_topic_0, my_topic,test_topic_2, foo_topic, bar_topic ts.segment.uploader.kafka.topics.exclude=test_topic.*, .*_topic -storage.service.endpoint.provider.class=com.pinterest.kafka.tieredstorage.common.discovery.s3.MockS3StorageServiceEndpointProvider \ No newline at end of file +storage.service.endpoint.provider.class=com.pinterest.kafka.tieredstorage.common.discovery.s3.MockS3StorageServiceEndpointProvider + +ts.segment.uploader.leadership.watcher.class=com.pinterest.kafka.tieredstorage.uploader.leadership.ZookeeperLeadershipWatcher \ No newline at end of file