Skip to content

Commit

Permalink
Make LeadershipWatcher pluggable and create tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jeffxiang committed Nov 12, 2024
1 parent b5688d7 commit fa14866
Show file tree
Hide file tree
Showing 21 changed files with 753 additions and 391 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ ts.segment.uploader.upload.timeout.ms=60000
ts.segment.uploader.upload.thread.count=3
ts.segment.uploader.upload.max.retries=10

ts.segment.uploader.zk.watcher.poll.interval.seconds=60
ts.segment.uploader.leadership.watcher.poll.interval.seconds=60

storage.service.endpoint.provider.class=com.pinterest.kafka.tieredstorage.common.discovery.s3.ExampleS3StorageServiceEndpointProvider
metrics.reporter.class=com.pinterest.kafka.tieredstorage.common.metrics.NoOpMetricsReporter
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.pinterest.kafka.tieredstorage.common.metrics.MetricRegistryManager;
import com.pinterest.kafka.tieredstorage.uploader.leadership.KafkaLeadershipWatcher;
import com.pinterest.kafka.tieredstorage.uploader.leadership.LeadershipWatcher;
import org.apache.kafka.common.TopicPartition;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
Expand All @@ -11,17 +11,14 @@
import java.io.File;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.StandardWatchEventKinds;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -56,18 +53,16 @@ public class DirectoryTreeWatcher implements Runnable {
private static final Logger LOG = LogManager.getLogger(DirectoryTreeWatcher.class);
private static final String[] MONITORED_EXTENSIONS = {".timeindex", ".index", ".log"};
private static final Pattern MONITORED_FILE_PATTERN = Pattern.compile("^\\d+(" + String.join("|", MONITORED_EXTENSIONS) + ")$");
private final Path topLevelPath;
private final WatchService watchService;
private Thread thread;
private boolean cancelled = false;
private static Map<TopicPartition, String> activeSegment;
private static Map<TopicPartition, Set<String>> segmentsQueue;
private static LeadershipWatcher leadershipWatcher;
private final Path topLevelPath;
private final WatchService watchService;
private final ConcurrentLinkedQueue<UploadTask> uploadTasks = new ConcurrentLinkedQueue<>();
private final S3FileUploader s3FileUploader;
private final ThreadLocal<WatermarkFileHandler> tempFileGenerator = ThreadLocal.withInitial(WatermarkFileHandler::new);
private final ConcurrentHashMap<TopicPartition, String> latestUploadedOffset = new ConcurrentHashMap<>();
private final S3FileDownloader s3FileDownloader;
private static KafkaLeadershipWatcher kafkaLeadershipWatcher;
private final Pattern SKIP_TOPICS_PATTERN = Pattern.compile(
"^__consumer_offsets$|^__transaction_state$|.+\\.changlog$|.+\\.repartition$"
);
Expand All @@ -78,36 +73,30 @@ public class DirectoryTreeWatcher implements Runnable {
private final SegmentUploaderConfiguration config;
private final KafkaEnvironmentProvider environmentProvider;
private final Object watchKeyMapLock = new Object();
private Thread thread;
private boolean cancelled = false;

public static void setKafkaLeadershipWatcher(DirectoryTreeWatcher directoryTreeWatcher, SegmentUploaderConfiguration config, KafkaEnvironmentProvider environmentProvider) {
if (kafkaLeadershipWatcher == null) {
try {
kafkaLeadershipWatcher = new KafkaLeadershipWatcher(directoryTreeWatcher, config, environmentProvider);
} catch (IOException | InterruptedException e) {
LOG.error("Could not launch Kafka Leadership Watcher; quitting ...");
throw new RuntimeException(e);
}
}
public static void setLeadershipWatcher(LeadershipWatcher suppliedLeadershipWatcher) {
if (leadershipWatcher == null)
leadershipWatcher = suppliedLeadershipWatcher;
}

@VisibleForTesting
protected static void unsetKafkaLeadershipWatcher() {
kafkaLeadershipWatcher = null;
protected static void unsetLeadershipWatcher() {
leadershipWatcher = null;
}

public DirectoryTreeWatcher(S3FileUploader s3FileUploader, SegmentUploaderConfiguration config, KafkaEnvironmentProvider environmentProvider) throws IOException, InterruptedException, KeeperException {
public DirectoryTreeWatcher(S3FileUploader s3FileUploader, SegmentUploaderConfiguration config, KafkaEnvironmentProvider environmentProvider) throws Exception {
this.environmentProvider = environmentProvider;
this.topLevelPath = Paths.get(environmentProvider.logDir());
this.watchService = FileSystems.getDefault().newWatchService();
setKafkaLeadershipWatcher(this, config, environmentProvider);
activeSegment = new HashMap<>();
segmentsQueue = new HashMap<>();
this.s3FileUploader = s3FileUploader;
this.s3UploadHandler = Executors.newSingleThreadExecutor();
this.s3FileDownloader = new S3FileDownloader(s3FileUploader.getStorageServiceEndpointProvider(), config);
heartbeat = new Heartbeat("watcher.logs", config, environmentProvider);
this.config = config;
initialize();
}

/**
Expand All @@ -116,7 +105,10 @@ public DirectoryTreeWatcher(S3FileUploader s3FileUploader, SegmentUploaderConfig
* @throws InterruptedException
* @throws KeeperException
*/
private void initialize() throws IOException, InterruptedException, KeeperException {
public void initialize() throws Exception {
if (leadershipWatcher == null) {
throw new IllegalStateException("LeadershipWatcher must be set before initializing DirectoryTreeWatcher");
}
s3UploadHandler.submit(() -> {
while (!cancelled) {
if (uploadTasks.isEmpty()) {
Expand Down Expand Up @@ -145,8 +137,8 @@ private void initialize() throws IOException, InterruptedException, KeeperExcept
s3FileUploader.uploadFile(task, this::handleUploadCallback);
}
});
LOG.info("Initializing KafkaLeadershipWatcher");
kafkaLeadershipWatcher.start();
LOG.info("Initializing LeadershipWatcher");
leadershipWatcher.start();
LOG.info("Submitting s3UploadHandler loop");
}

Expand Down Expand Up @@ -702,7 +694,7 @@ public void stop() throws InterruptedException {
if (thread != null && thread.isAlive()) {
thread.interrupt();
}
kafkaLeadershipWatcher.stop();
leadershipWatcher.stop();
heartbeat.stop();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.google.common.annotations.VisibleForTesting;
import com.pinterest.kafka.tieredstorage.common.discovery.StorageServiceEndpointProvider;
import com.pinterest.kafka.tieredstorage.uploader.leadership.LeadershipWatcher;
import org.apache.commons.configuration2.ex.ConfigurationException;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
Expand All @@ -21,7 +22,7 @@ public class KafkaSegmentUploader {
private final StorageServiceEndpointProvider endpointProvider;
private final SegmentUploaderConfiguration config;

public KafkaSegmentUploader(String configDirectory) throws IOException, InterruptedException, KeeperException, ClassNotFoundException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException {
public KafkaSegmentUploader(String configDirectory) throws Exception {
Utils.acquireLock();
KafkaEnvironmentProvider environmentProvider = getEnvironmentProvider();
environmentProvider.load();
Expand All @@ -33,10 +34,15 @@ public KafkaSegmentUploader(String configDirectory) throws IOException, Interrup

multiThreadedS3FileUploader = new MultiThreadedS3FileUploader(endpointProvider, config, environmentProvider);
directoryTreeWatcher = new DirectoryTreeWatcher(multiThreadedS3FileUploader, config, environmentProvider);

LeadershipWatcher leadershipWatcher = getLeadershipWatcherFromConfigs(directoryTreeWatcher, config, environmentProvider);
DirectoryTreeWatcher.setLeadershipWatcher(leadershipWatcher);

directoryTreeWatcher.initialize();
}

@VisibleForTesting
protected KafkaSegmentUploader(String configDirectory, KafkaEnvironmentProvider environmentProvider) throws IOException, ClassNotFoundException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException, InterruptedException, KeeperException {
protected KafkaSegmentUploader(String configDirectory, KafkaEnvironmentProvider environmentProvider) throws Exception {
Utils.acquireLock();
environmentProvider.load();
config = new SegmentUploaderConfiguration(configDirectory, environmentProvider.clusterId());
Expand All @@ -46,6 +52,11 @@ protected KafkaSegmentUploader(String configDirectory, KafkaEnvironmentProvider

multiThreadedS3FileUploader = new MultiThreadedS3FileUploader(endpointProvider, config, environmentProvider);
directoryTreeWatcher = new DirectoryTreeWatcher(multiThreadedS3FileUploader, config, environmentProvider);

LeadershipWatcher leadershipWatcher = getLeadershipWatcherFromConfigs(directoryTreeWatcher, config, environmentProvider);
DirectoryTreeWatcher.setLeadershipWatcher(leadershipWatcher);

directoryTreeWatcher.initialize();
}

public void start() {
Expand All @@ -69,6 +80,13 @@ private KafkaEnvironmentProvider getEnvironmentProvider() throws ClassNotFoundEx
return environmentProviderConstructor.newInstance();
}

private static LeadershipWatcher getLeadershipWatcherFromConfigs(DirectoryTreeWatcher directoryTreeWatcher, SegmentUploaderConfiguration config, KafkaEnvironmentProvider kafkaEnvironmentProvider) throws InvocationTargetException, InstantiationException, IllegalAccessException, ClassNotFoundException, NoSuchMethodException {
String leadershipWatcherClassName = config.getLeadershipWatcherClassName();
Constructor<? extends LeadershipWatcher> leadershipWatcherConstructor = Class.forName(leadershipWatcherClassName)
.asSubclass(LeadershipWatcher.class).getConstructor(DirectoryTreeWatcher.class, SegmentUploaderConfiguration.class, KafkaEnvironmentProvider.class);
return leadershipWatcherConstructor.newInstance(directoryTreeWatcher, config, kafkaEnvironmentProvider);
}

@VisibleForTesting
protected StorageServiceEndpointProvider getEndpointProvider() {
return endpointProvider;
Expand All @@ -79,13 +97,13 @@ protected SegmentUploaderConfiguration getSegmentUploaderConfiguration() {
return config;
}

private StorageServiceEndpointProvider getEndpointProviderFromConfigs(SegmentUploaderConfiguration config) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
private static StorageServiceEndpointProvider getEndpointProviderFromConfigs(SegmentUploaderConfiguration config) throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, InstantiationException, IllegalAccessException {
Constructor<? extends StorageServiceEndpointProvider> endpointProviderConstructor = Class.forName(config.getStorageServiceEndpointProviderClassName())
.asSubclass(StorageServiceEndpointProvider.class).getConstructor();
return endpointProviderConstructor.newInstance();
}

public static void main(String[] args) throws IOException, InterruptedException, KeeperException, ConfigurationException, ClassNotFoundException, InvocationTargetException, NoSuchMethodException, InstantiationException, IllegalAccessException {
public static void main(String[] args) throws Exception {
if (args.length != 1) {
LOG.error("configDirectory is required as an argument");
System.exit(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,38 @@ public class SegmentUploaderConfiguration {
private static final Logger LOG = LogManager.getLogger(SegmentUploaderConfiguration.class);
private static final String TS_SEGMENT_UPLOADER_PREFIX = "ts.segment.uploader";
private static final String KAFKA_PREFIX = TS_SEGMENT_UPLOADER_PREFIX + "." + "kafka";

// Topic inclusion / exclusion
private static final String TOPICS_INCLUDE_PREFIX = KAFKA_PREFIX + "." + "topics.include";
private static final String TOPICS_EXCLUDE_PREFIX = KAFKA_PREFIX + "." + "topics.exclude";

// Storage service endpoint provider
private static final String STORAGE_SERVICE_ENDPOINT_PROVIDER_PREFIX = "storage.service.endpoint.provider";
private static final String STORAGE_SERVICE_ENDPOINT_PROVIDER_CLASS_KEY = STORAGE_SERVICE_ENDPOINT_PROVIDER_PREFIX + "." + "class";

// Offset reset strategy
private static final String OFFSET_RESET_STRATEGY_KEY = TS_SEGMENT_UPLOADER_PREFIX + "." + "offset.reset.strategy";

// Upload configurations
private static final String UPLOADER_THREAD_COUNT_KEY = TS_SEGMENT_UPLOADER_PREFIX + "." + "upload.thread.count";
private static final String ZK_WATCHER_POLL_INTERVAL_SECONDS = TS_SEGMENT_UPLOADER_PREFIX + "." + "zk.watcher.poll.interval.seconds";
private static final String S3_PREFIX_ENTROPY_BITS = TS_SEGMENT_UPLOADER_PREFIX + "." + "s3.prefix.entropy.bits";
private static final String UPLOAD_TIMEOUT_MS = TS_SEGMENT_UPLOADER_PREFIX + "." + "upload.timeout.ms";
private static final String UPLOAD_MAX_RETRIES = TS_SEGMENT_UPLOADER_PREFIX + "." + "upload.max.retries";

// Leadership watcher
private static final String LEADERSHIP_WATCHER_CLASS_KEY = TS_SEGMENT_UPLOADER_PREFIX + "." + "leadership.watcher.class";
private static final String LEADERSHIP_WATCHER_POLL_INTERVAL_SECONDS = TS_SEGMENT_UPLOADER_PREFIX + "." + "leadership.watcher.poll.interval.seconds";

// Prefix entropy
private static final String S3_PREFIX_ENTROPY_BITS = TS_SEGMENT_UPLOADER_PREFIX + "." + "s3.prefix.entropy.bits";

// Internal structures
private final Properties properties = new Properties();
private final Set<Pattern> includeRegexes = ConcurrentHashMap.newKeySet();
private final Set<Pattern> excludeRegexes = ConcurrentHashMap.newKeySet();
private final Set<String> includeTopicsCache = ConcurrentHashMap.newKeySet();
private final Set<String> excludeTopicsCache = ConcurrentHashMap.newKeySet();
private final String storageServiceEndpointProviderClassName;
private final String leadershipWatcherClassName;
private final MetricsConfiguration metricsConfiguration;

public SegmentUploaderConfiguration(String configDirectory, String clusterId) throws IOException {
Expand All @@ -50,11 +66,12 @@ public SegmentUploaderConfiguration(String configDirectory, String clusterId) th
loadPatterns(includeRegexes, TOPICS_INCLUDE_PREFIX);
loadPatterns(excludeRegexes, TOPICS_EXCLUDE_PREFIX);

if (!properties.containsKey(STORAGE_SERVICE_ENDPOINT_PROVIDER_CLASS_KEY)) {
throw new RuntimeException(String.format("Configuration %s must be provided", STORAGE_SERVICE_ENDPOINT_PROVIDER_CLASS_KEY));
}
checkConfigExists(properties, STORAGE_SERVICE_ENDPOINT_PROVIDER_CLASS_KEY);
storageServiceEndpointProviderClassName = properties.getProperty(STORAGE_SERVICE_ENDPOINT_PROVIDER_CLASS_KEY);

checkConfigExists(properties, LEADERSHIP_WATCHER_CLASS_KEY);
leadershipWatcherClassName = properties.getProperty(LEADERSHIP_WATCHER_CLASS_KEY);

metricsConfiguration = MetricsConfiguration.getMetricsConfiguration(properties);

LOG.info(String.format("Loaded SegmentUploaderConfiguration from file: %s", filename));
Expand All @@ -66,6 +83,12 @@ public SegmentUploaderConfiguration(String configDirectory, String clusterId) th
}
}

private static void checkConfigExists(Properties properties, String key) {
if (!properties.containsKey(key)) {
throw new RuntimeException(String.format("Configuration %s must be provided", key));
}
}

public boolean deleteTopic(String topicName) {
return excludeTopicsCache.remove(topicName) || includeTopicsCache.remove(topicName);
}
Expand Down Expand Up @@ -127,11 +150,15 @@ public int getUploadThreadCount() {
return Defaults.DEFAULT_UPLOADER_THREAD_POOL_SIZE;
}

public int getZkWatcherPollIntervalSeconds() {
if (properties.containsKey(ZK_WATCHER_POLL_INTERVAL_SECONDS)) {
return Integer.parseInt(properties.getProperty(ZK_WATCHER_POLL_INTERVAL_SECONDS));
public String getLeadershipWatcherClassName() {
return this.leadershipWatcherClassName;
}

public int getLeadershipWatcherPollIntervalSeconds() {
if (properties.containsKey(LEADERSHIP_WATCHER_POLL_INTERVAL_SECONDS)) {
return Integer.parseInt(properties.getProperty(LEADERSHIP_WATCHER_POLL_INTERVAL_SECONDS));
}
return Defaults.DEFAULT_ZK_WATCHER_POLL_INTERVAL_SECONDS;
return Defaults.DEFAULT_LEADERSHIP_WATCHER_POLL_INTERVAL_SECONDS;
}

public OffsetResetStrategy getOffsetResetStrategy() {
Expand Down Expand Up @@ -170,7 +197,7 @@ public enum OffsetResetStrategy {
public static class Defaults {
private static final String DEFAULT_OFFSET_RESET_STRATEGY = "EARLIEST";
private static final int DEFAULT_UPLOADER_THREAD_POOL_SIZE = 3;
private static final int DEFAULT_ZK_WATCHER_POLL_INTERVAL_SECONDS = 60;
private static final int DEFAULT_LEADERSHIP_WATCHER_POLL_INTERVAL_SECONDS = 60;
private static final int DEFAULT_S3_PREFIX_ENTROPY_BITS = -1;
private static final int DEFAULT_UPLOAD_TIMEOUT_MS = 60000;
private static final int DEFAULT_UPLOAD_MAX_RETRIES = 3;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,9 @@ public class UploaderMetrics {
public static final String KAFKA_LEADER_UNSET_METRIC = UPLOADER_METRIC_PREFIX + "." + "kafka.leader.unset";
public static final String KAFKA_LEADER_COUNT_METRIC = UPLOADER_METRIC_PREFIX + "." + "kafka.leader.count";
public static final String WATCHER_ZK_RESET_METRIC = UPLOADER_METRIC_PREFIX + "." + "watcher.zk.reset";
public static final String WATCHER_NOT_ADDED_METRIC = UPLOADER_METRIC_PREFIX + "." + "watcher.not.added";
public static final String SKIPPED_ENTRY_MODIFY_EVENT_METRIC = UPLOADER_METRIC_PREFIX + "." + "skipped.entry.modify";
public static final String ADD_WATCHER_FAILED_METRIC = UPLOADER_METRIC_PREFIX + "." + "add.watcher.failed";
public static final String ENQUEUE_TO_UPLOAD_LATENCY_MS_METRIC = UPLOADER_METRIC_PREFIX + "." + "enqueue.to.upload.latency.ms";
public static final String RETRY_MARKED_FOR_DELETION_COUNT_METRIC = UPLOADER_METRIC_PREFIX + "." + "retry.marked.for.deletion.count";
public static final String WATCHER_ZK_EXCEPTION_METRIC = UPLOADER_METRIC_PREFIX + "." + "watcher.zk.exception";
public static final String WATCHER_LEADERSHIP_EXCEPTION_METRIC = UPLOADER_METRIC_PREFIX + "." + "watcher.leadership.exception";
}
Loading

0 comments on commit fa14866

Please sign in to comment.