Skip to content

Commit

Permalink
KAFKA-15448: Streams Standby Update Listener (KIP-988) (apache#14735)
Browse files Browse the repository at this point in the history
Implementation for KIP-988, adds the new StandbyUpdateListener interface

Reviewers: Anna Sophie Blee-Goldman <[email protected]>, Colt McNealy <[email protected]>
  • Loading branch information
eduwercamacaro authored Dec 6, 2023
1 parent 6df192b commit 83110e2
Show file tree
Hide file tree
Showing 17 changed files with 544 additions and 181 deletions.
74 changes: 74 additions & 0 deletions streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.kafka.streams.errors.UnknownStateStoreException;
import org.apache.kafka.streams.internals.ClientInstanceIdsImpl;
import org.apache.kafka.streams.internals.metrics.ClientMetrics;
import org.apache.kafka.streams.processor.StandbyUpdateListener;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StreamPartitioner;
Expand Down Expand Up @@ -176,6 +177,7 @@ public class KafkaStreams implements AutoCloseable {
private final KafkaClientSupplier clientSupplier;
protected final TopologyMetadata topologyMetadata;
private final QueryableStoreProvider queryableStoreProvider;
private final DelegatingStandbyUpdateListener delegatingStandbyUpdateListener;

GlobalStreamThread globalStreamThread;
private KafkaStreams.StateListener stateListener;
Expand Down Expand Up @@ -581,6 +583,23 @@ public void setGlobalStateRestoreListener(final StateRestoreListener globalState
}
}

/**
* Set the listener which is triggered whenever a standby task is updated
*
* @param standbyListener The listener triggered when a standby task is updated.
* @throws IllegalStateException if this {@code KafkaStreams} instance has already been started.
*/
public void setStandbyUpdateListener(final StandbyUpdateListener standbyListener) {
synchronized (stateLock) {
if (state.hasNotStarted()) {
this.delegatingStandbyUpdateListener.setUserStandbyListener(standbyListener);
} else {
throw new IllegalStateException("Can only set StandbyUpdateListener before calling start(). " +
"Current state is: " + state);
}
}
}

/**
* Get read-only handle on global metrics registry, including streams client's own metrics plus
* its embedded producer, consumer and admin clients' metrics.
Expand Down Expand Up @@ -743,6 +762,59 @@ public void onRestoreSuspended(final TopicPartition topicPartition, final String
}
}

final static class DelegatingStandbyUpdateListener implements StandbyUpdateListener {
private StandbyUpdateListener userStandbyListener;

private void throwOnFatalException(final Exception fatalUserException,
final TopicPartition topicPartition,
final String storeName) {
throw new StreamsException(
String.format("Fatal user code error in standby update listener for store %s, partition %s.",
storeName,
topicPartition),
fatalUserException);
}

@Override
public void onUpdateStart(final TopicPartition topicPartition,
final String storeName,
final long startingOffset) {
if (userStandbyListener != null) {
try {
userStandbyListener.onUpdateStart(topicPartition, storeName, startingOffset);
} catch (final Exception fatalUserException) {
throwOnFatalException(fatalUserException, topicPartition, storeName);
}
}
}

@Override
public void onBatchLoaded(final TopicPartition topicPartition, final String storeName, final TaskId taskId, final long batchEndOffset, final long batchSize, final long currentEndOffset) {
if (userStandbyListener != null) {
try {
userStandbyListener.onBatchLoaded(topicPartition, storeName, taskId, batchEndOffset, batchSize, currentEndOffset);
} catch (final Exception fatalUserException) {
throwOnFatalException(fatalUserException, topicPartition, storeName);
}
}
}

@Override
public void onUpdateSuspended(final TopicPartition topicPartition, final String storeName, final long storeOffset, final long currentEndOffset, final SuspendReason reason) {
if (userStandbyListener != null) {
try {
userStandbyListener.onUpdateSuspended(topicPartition, storeName, storeOffset, currentEndOffset, reason);
} catch (final Exception fatalUserException) {
throwOnFatalException(fatalUserException, topicPartition, storeName);
}
}
}

void setUserStandbyListener(final StandbyUpdateListener userStandbyListener) {
this.userStandbyListener = userStandbyListener;
}
}

/**
* Create a {@code KafkaStreams} instance.
* <p>
Expand Down Expand Up @@ -940,6 +1012,7 @@ private KafkaStreams(final TopologyMetadata topologyMetadata,
oldHandler = false;
streamsUncaughtExceptionHandler = this::defaultStreamsUncaughtExceptionHandler;
delegatingStateRestoreListener = new DelegatingStateRestoreListener();
delegatingStandbyUpdateListener = new DelegatingStandbyUpdateListener();

totalCacheSize = getTotalCacheSize(applicationConfigs);
final int numStreamThreads = topologyMetadata.getNumStreamThreads(applicationConfigs);
Expand Down Expand Up @@ -995,6 +1068,7 @@ private StreamThread createAndAddStreamThread(final long cacheSizePerThread, fin
cacheSizePerThread,
stateDirectory,
delegatingStateRestoreListener,
delegatingStandbyUpdateListener,
threadIdx,
KafkaStreams.this::closeToError,
streamsUncaughtExceptionHandler
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.processor;

import org.apache.kafka.common.TopicPartition;

public interface StandbyUpdateListener {

enum SuspendReason {
MIGRATED,
PROMOTED
}

/**
* A callback that will be invoked after registering the changelogs for each state store in a standby
* task. It is guaranteed to always be invoked before any records are loaded into the standby store.
*
* @param topicPartition the changelog TopicPartition for this standby task
* @param storeName the name of the store being loaded
* @param startingOffset the offset from which the standby task begins consuming from the changelog
*/
void onUpdateStart(final TopicPartition topicPartition,
final String storeName,
final long startingOffset);

/**
* Method called after loading a batch of records. In this case the maximum size of the batch is whatever
* the value of the MAX_POLL_RECORDS is set to.
* <n>
* This method is called after loading each batch and it is advised to keep processing to a minimum.
* Any heavy processing will block the state updater thread and slow down the rate of standby task
* loading. Therefore, if you need to do any extended processing or connect to an external service,
* consider doing so asynchronously.
*
* @param topicPartition the changelog TopicPartition for this standby task
* @param storeName the name of the store being loaded
* @param batchEndOffset batchEndOffset the changelog end offset (inclusive) of the batch that was just loaded
* @param batchSize the total number of records in the batch that was just loaded
* @param currentEndOffset the current end offset of the changelog topic partition.
*/
void onBatchLoaded(final TopicPartition topicPartition,
final String storeName,
final TaskId taskId,
final long batchEndOffset,
final long batchSize,
final long currentEndOffset);

/**
* This method is called when the corresponding standby task stops updating, for the provided reason.
* <p>
* If the task was {@code MIGRATED} to another instance, this callback will be invoked after this
* state store (and the task itself) are closed (in which case the data will be cleaned up after
* state.cleanup.delay.ms).
* If the task was {@code PROMOTED} to an active task, the state store will not be closed, and the
* callback will be invoked after unregistering it as a standby task but before re-registering it as an active task
* and beginning restoration. In other words, this will always called before the corresponding
* {@link StateRestoreListener#onRestoreStart} call is made.
*
* @param topicPartition the changelog TopicPartition for this standby task
* @param storeName the name of the store being loaded
* @param storeOffset is the offset of the last changelog record that was read and put into the store at the time
* of suspension.
* @param currentEndOffset the current end offset of the changelog topic partition.
* @param reason is the reason why the standby task was suspended.
*/
void onUpdateSuspended(final TopicPartition topicPartition,
final String storeName,
final long storeOffset,
final long currentEndOffset,
final SuspendReason reason);
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,9 @@ public void revive() {

final void transitionTo(final Task.State newState) {
final State oldState = state();

if (oldState.isValidTransition(newState)) {
state = newState;
stateMgr.transitionTaskState(newState);
} else {
throw new IllegalStateException("Invalid transition from " + oldState + " to " + newState);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.OptionalLong;
import java.util.Set;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -97,9 +98,12 @@ public static class StateStoreMetadata {
// update blindly with the given offset
private Long offset;

// Will be updated on batch restored
private Long endOffset;
// corrupted state store should not be included in checkpointing
private boolean corrupted;


private StateStoreMetadata(final StateStore stateStore,
final CommitCallback commitCallback) {
this.stateStore = stateStore;
Expand Down Expand Up @@ -137,6 +141,14 @@ Long offset() {
return this.offset;
}

Long endOffset() {
return this.endOffset;
}

public void setEndOffset(final Long endOffset) {
this.endOffset = endOffset;
}

TopicPartition changelogPartition() {
return this.changelogPartition;
}
Expand All @@ -157,6 +169,7 @@ public String toString() {
private String logPrefix;

private final TaskId taskId;
private Task.State taskState;
private final boolean eosEnabled;
private final ChangelogRegister changelogReader;
private final Collection<TopicPartition> sourcePartitions;
Expand Down Expand Up @@ -412,6 +425,14 @@ TaskId taskId() {
return taskId;
}

void transitionTaskState(final Task.State taskState) {
this.taskState = taskState;
}

Task.State taskState() {
return taskState;
}

// used by the changelog reader only
boolean changelogAsSource(final TopicPartition partition) {
return sourcePartitions.contains(partition);
Expand All @@ -433,7 +454,7 @@ StateStoreMetadata storeMetadata(final TopicPartition partition) {
}

// used by the changelog reader only
void restore(final StateStoreMetadata storeMetadata, final List<ConsumerRecord<byte[], byte[]>> restoreRecords) {
void restore(final StateStoreMetadata storeMetadata, final List<ConsumerRecord<byte[], byte[]>> restoreRecords, final OptionalLong optionalLag) {
if (!stores.containsValue(storeMetadata)) {
throw new IllegalStateException("Restoring " + storeMetadata + " which is not registered in this state manager, " +
"this should not happen.");
Expand All @@ -457,6 +478,10 @@ void restore(final StateStoreMetadata storeMetadata, final List<ConsumerRecord<b
}

storeMetadata.setOffset(batchEndOffset);
// If null means the lag for this partition is not known yet
if (optionalLag.isPresent()) {
storeMetadata.setEndOffset(optionalLag.getAsLong() + batchEndOffset);
}
}
}

Expand Down
Loading

0 comments on commit 83110e2

Please sign in to comment.