Skip to content

Commit ae771d7

Browse files
authored
KAFKA-8830 make Record Headers available in onAcknowledgement (#17099)
Two sets of tests are added: 1. KafkaProducerTest - when send success, both record.headers() and onAcknowledgement headers are read only - when send failure, record.headers() is writable as before and onAcknowledgement headers is read only 2. ProducerInterceptorsTest - make both old and new onAcknowledgement method are called successfully Reviewers: Lianet Magrans <[email protected]>, Omnia Ibrahim <[email protected]>, Matthias J. Sax <[email protected]>, Andrew Schofield <[email protected]>, Chia-Ping Tsai <[email protected]>
1 parent a04c2fe commit ae771d7

File tree

6 files changed

+185
-29
lines changed

6 files changed

+185
-29
lines changed

clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java

+8-1
Original file line numberDiff line numberDiff line change
@@ -1546,6 +1546,7 @@ private class AppendCallbacks implements RecordAccumulator.AppendCallbacks {
15461546
private final String recordLogString;
15471547
private volatile int partition = RecordMetadata.UNKNOWN_PARTITION;
15481548
private volatile TopicPartition topicPartition;
1549+
private final Headers headers;
15491550

15501551
private AppendCallbacks(Callback userCallback, ProducerInterceptors<K, V> interceptors, ProducerRecord<K, V> record) {
15511552
this.userCallback = userCallback;
@@ -1554,6 +1555,12 @@ private AppendCallbacks(Callback userCallback, ProducerInterceptors<K, V> interc
15541555
// whole lifetime of the batch.
15551556
// We don't want to have an NPE here, because the interceptors would not be notified (see .doSend).
15561557
topic = record != null ? record.topic() : null;
1558+
if (record != null) {
1559+
headers = record.headers();
1560+
} else {
1561+
headers = new RecordHeaders();
1562+
((RecordHeaders) headers).setReadOnly();
1563+
}
15571564
recordPartition = record != null ? record.partition() : null;
15581565
recordLogString = log.isTraceEnabled() && record != null ? record.toString() : "";
15591566
}
@@ -1563,7 +1570,7 @@ public void onCompletion(RecordMetadata metadata, Exception exception) {
15631570
if (metadata == null) {
15641571
metadata = new RecordMetadata(topicPartition(), -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1);
15651572
}
1566-
this.interceptors.onAcknowledgement(metadata, exception);
1573+
this.interceptors.onAcknowledgement(metadata, exception, headers);
15671574
if (this.userCallback != null)
15681575
this.userCallback.onCompletion(metadata, exception);
15691576
}

clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java

+28-2
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.apache.kafka.clients.producer;
1818

1919
import org.apache.kafka.common.Configurable;
20+
import org.apache.kafka.common.header.Headers;
2021

2122
/**
2223
* A plugin interface that allows you to intercept (and possibly mutate) the records received by the producer before
@@ -83,12 +84,37 @@ public interface ProducerInterceptor<K, V> extends Configurable, AutoCloseable {
8384
* @param metadata The metadata for the record that was sent (i.e. the partition and offset).
8485
* If an error occurred, metadata will contain only valid topic and maybe
8586
* partition. If partition is not given in ProducerRecord and an error occurs
86-
* before partition gets assigned, then partition will be set to RecordMetadata.NO_PARTITION.
87+
* before partition gets assigned, then partition will be set to {@link RecordMetadata#UNKNOWN_PARTITION}.
8788
* The metadata may be null if the client passed null record to
8889
* {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)}.
8990
* @param exception The exception thrown during processing of this record. Null if no error occurred.
9091
*/
91-
void onAcknowledgement(RecordMetadata metadata, Exception exception);
92+
default void onAcknowledgement(RecordMetadata metadata, Exception exception) {}
93+
94+
/**
95+
* This method is called when the record sent to the server has been acknowledged, or when sending the record fails before
96+
* it gets sent to the server.
97+
* <p>
98+
* This method is generally called just before the user callback is called, and in additional cases when <code>KafkaProducer.send()</code>
99+
* throws an exception.
100+
* <p>
101+
* Any exception thrown by this method will be ignored by the caller.
102+
* <p>
103+
* This method will generally execute in the background I/O thread, so the implementation should be reasonably fast.
104+
* Otherwise, sending of messages from other threads could be delayed.
105+
*
106+
* @param metadata The metadata for the record that was sent (i.e. the partition and offset).
107+
* If an error occurred, metadata will contain only valid topic and maybe
108+
* partition. If partition is not given in ProducerRecord and an error occurs
109+
* before partition gets assigned, then partition will be set to {@link RecordMetadata#UNKNOWN_PARTITION}.
110+
* The metadata may be null if the client passed null record to
111+
* {@link org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)}.
112+
* @param exception The exception thrown during processing of this record. Null if no error occurred.
113+
* @param headers The headers for the record that was sent. It is read-only.
114+
*/
115+
default void onAcknowledgement(RecordMetadata metadata, Exception exception, Headers headers) {
116+
onAcknowledgement(metadata, exception);
117+
}
92118

93119
/**
94120
* This is called when interceptor is closed

clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java

+17-6
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.apache.kafka.clients.producer.ProducerRecord;
2323
import org.apache.kafka.clients.producer.RecordMetadata;
2424
import org.apache.kafka.common.TopicPartition;
25+
import org.apache.kafka.common.header.Headers;
26+
import org.apache.kafka.common.header.internals.RecordHeaders;
2527
import org.apache.kafka.common.internals.Plugin;
2628
import org.apache.kafka.common.metrics.Metrics;
2729
import org.apache.kafka.common.record.RecordBatch;
@@ -77,19 +79,20 @@ public ProducerRecord<K, V> onSend(ProducerRecord<K, V> record) {
7779

7880
/**
7981
* This method is called when the record sent to the server has been acknowledged, or when sending the record fails before
80-
* it gets sent to the server. This method calls {@link ProducerInterceptor#onAcknowledgement(RecordMetadata, Exception)}
82+
* it gets sent to the server. This method calls {@link ProducerInterceptor#onAcknowledgement(RecordMetadata, Exception, Headers)}
8183
* method for each interceptor.
8284
*
8385
* This method does not throw exceptions. Exceptions thrown by any of interceptor methods are caught and ignored.
8486
*
8587
* @param metadata The metadata for the record that was sent (i.e. the partition and offset).
8688
* If an error occurred, metadata will only contain valid topic and maybe partition.
8789
* @param exception The exception thrown during processing of this record. Null if no error occurred.
90+
* @param headers The headers for the record that was sent
8891
*/
89-
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
92+
public void onAcknowledgement(RecordMetadata metadata, Exception exception, Headers headers) {
9093
for (Plugin<ProducerInterceptor<K, V>> interceptorPlugin : this.interceptorPlugins) {
9194
try {
92-
interceptorPlugin.get().onAcknowledgement(metadata, exception);
95+
interceptorPlugin.get().onAcknowledgement(metadata, exception, headers);
9396
} catch (Exception e) {
9497
// do not propagate interceptor exceptions, just log
9598
log.warn("Error executing interceptor onAcknowledgement callback", e);
@@ -99,7 +102,7 @@ public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
99102

100103
/**
101104
* This method is called when sending the record fails in {@link ProducerInterceptor#onSend
102-
* (ProducerRecord)} method. This method calls {@link ProducerInterceptor#onAcknowledgement(RecordMetadata, Exception)}
105+
* (ProducerRecord)} method. This method calls {@link ProducerInterceptor#onAcknowledgement(RecordMetadata, Exception, Headers)}
103106
* method for each interceptor
104107
*
105108
* @param record The record from client
@@ -110,14 +113,22 @@ public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
110113
public void onSendError(ProducerRecord<K, V> record, TopicPartition interceptTopicPartition, Exception exception) {
111114
for (Plugin<ProducerInterceptor<K, V>> interceptorPlugin : this.interceptorPlugins) {
112115
try {
116+
Headers headers = record != null ? record.headers() : new RecordHeaders();
117+
if (headers instanceof RecordHeaders && !((RecordHeaders) headers).isReadOnly()) {
118+
// make a copy of the headers to make sure we don't change the state of origin record's headers.
119+
// original headers are still writable because client might want to mutate them before retrying.
120+
RecordHeaders recordHeaders = (RecordHeaders) headers;
121+
headers = new RecordHeaders(recordHeaders);
122+
((RecordHeaders) headers).setReadOnly();
123+
}
113124
if (record == null && interceptTopicPartition == null) {
114-
interceptorPlugin.get().onAcknowledgement(null, exception);
125+
interceptorPlugin.get().onAcknowledgement(null, exception, headers);
115126
} else {
116127
if (interceptTopicPartition == null) {
117128
interceptTopicPartition = extractTopicPartition(record);
118129
}
119130
interceptorPlugin.get().onAcknowledgement(new RecordMetadata(interceptTopicPartition, -1, -1,
120-
RecordBatch.NO_TIMESTAMP, -1, -1), exception);
131+
RecordBatch.NO_TIMESTAMP, -1, -1), exception, headers);
121132
}
122133
} catch (Exception e) {
123134
// do not propagate interceptor exceptions, just log

clients/src/main/java/org/apache/kafka/common/header/internals/RecordHeaders.java

+4
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,10 @@ public void setReadOnly() {
108108
this.isReadOnly = true;
109109
}
110110

111+
public boolean isReadOnly() {
112+
return isReadOnly;
113+
}
114+
111115
public Header[] toArray() {
112116
return headers.isEmpty() ? Record.EMPTY_HEADERS : headers.toArray(new Header[0]);
113117
}

clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java

+52-2
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,9 @@
4848
import org.apache.kafka.common.errors.RecordTooLargeException;
4949
import org.apache.kafka.common.errors.TimeoutException;
5050
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
51+
import org.apache.kafka.common.header.Headers;
5152
import org.apache.kafka.common.header.internals.RecordHeader;
53+
import org.apache.kafka.common.header.internals.RecordHeaders;
5254
import org.apache.kafka.common.internals.ClusterResourceListeners;
5355
import org.apache.kafka.common.message.AddOffsetsToTxnResponseData;
5456
import org.apache.kafka.common.message.ApiVersionsResponseData;
@@ -1084,13 +1086,14 @@ public void testTopicExpiryInMetadata() throws InterruptedException {
10841086

10851087
@SuppressWarnings("unchecked")
10861088
@Test
1087-
public void testHeaders() {
1089+
public void testHeadersSuccess() {
10881090
doTestHeaders(Serializer.class);
10891091
}
10901092

10911093
private <T extends Serializer<String>> void doTestHeaders(Class<T> serializerClassToMock) {
10921094
Map<String, Object> configs = new HashMap<>();
10931095
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
1096+
configs.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptorForHeaders.class.getName());
10941097
Serializer<String> keySerializer = mock(serializerClassToMock);
10951098
Serializer<String> valueSerializer = mock(serializerClassToMock);
10961099

@@ -1119,7 +1122,9 @@ private <T extends Serializer<String>> void doTestHeaders(Class<T> serializerCla
11191122
producer.send(record, null);
11201123

11211124
//ensure headers are closed and cannot be mutated post send
1122-
assertThrows(IllegalStateException.class, () -> record.headers().add(new RecordHeader("test", "test".getBytes())));
1125+
RecordHeaders recordHeaders = (RecordHeaders) record.headers();
1126+
assertTrue(recordHeaders.isReadOnly());
1127+
assertThrows(IllegalStateException.class, () -> recordHeaders.add(new RecordHeader("test", "test".getBytes())));
11231128

11241129
//ensure existing headers are not changed, and last header for key is still original value
11251130
assertArrayEquals(record.headers().lastHeader("test").value(), "header2".getBytes());
@@ -1130,6 +1135,28 @@ private <T extends Serializer<String>> void doTestHeaders(Class<T> serializerCla
11301135
producer.close(Duration.ofMillis(0));
11311136
}
11321137

1138+
@Test
1139+
public void testHeadersFailure() {
1140+
Properties props = new Properties();
1141+
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
1142+
props.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 5);
1143+
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, ProducerInterceptorForHeaders.class.getName());
1144+
Serializer<String> keySerializer = mock(StringSerializer.class);
1145+
Serializer<String> valueSerializer = mock(StringSerializer.class);
1146+
1147+
KafkaProducer<String, String> producer = new KafkaProducer<>(props, keySerializer, valueSerializer);
1148+
ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
1149+
Future<RecordMetadata> future = producer.send(record, (recordMetadata, exception) -> { });
1150+
try {
1151+
TestUtils.assertFutureThrows(TimeoutException.class, future);
1152+
//ensure headers are writable if send failure
1153+
RecordHeaders recordHeaders = (RecordHeaders) record.headers();
1154+
assertFalse(recordHeaders.isReadOnly());
1155+
} finally {
1156+
producer.close(Duration.ofMillis(0));
1157+
}
1158+
}
1159+
11331160
@Test
11341161
public void closeShouldBeIdempotent() {
11351162
Properties producerProps = new Properties();
@@ -2500,6 +2527,29 @@ public void configure(Map<String, ?> configs) {
25002527
}
25012528
}
25022529

2530+
public static class ProducerInterceptorForHeaders implements ProducerInterceptor<byte[], byte[]> {
2531+
2532+
@Override
2533+
public ProducerRecord<byte[], byte[]> onSend(ProducerRecord<byte[], byte[]> record) {
2534+
return record;
2535+
}
2536+
2537+
@Override
2538+
public void onAcknowledgement(RecordMetadata metadata, Exception exception, Headers headers) {
2539+
RecordHeaders recordHeaders = (RecordHeaders) headers;
2540+
// Ensure that the headers are read-only, no matter send success or send failure
2541+
assertTrue(recordHeaders.isReadOnly());
2542+
}
2543+
2544+
@Override
2545+
public void close() {
2546+
}
2547+
2548+
@Override
2549+
public void configure(Map<String, ?> configs) {
2550+
}
2551+
}
2552+
25032553
public static class ProducerInterceptorForClientId implements ProducerInterceptor<byte[], byte[]> {
25042554

25052555
@Override

0 commit comments

Comments
 (0)