Skip to content

Commit 08c7749

Browse files
hnousiainenjeffwidman
authored andcommitted
Support produce with Kafka record headers
1 parent 0ca4313 commit 08c7749

File tree

5 files changed

+40
-18
lines changed

5 files changed

+40
-18
lines changed

README.rst

+4
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,10 @@ for more details.
117117
>>> for i in range(1000):
118118
... producer.send('foobar', b'msg %d' % i)
119119

120+
>>> # Include record headers. The format is list of tuples with string key
121+
>>> # and bytes value.
122+
>>> producer.send('foobar', value=b'c29tZSB2YWx1ZQ==', headers=[('content-encoding', b'base64')])
123+
120124
>>> # Get producer performance metrics
121125
>>> metrics = producer.metrics()
122126

kafka/producer/future.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,11 @@ def wait(self, timeout=None):
2929

3030

3131
class FutureRecordMetadata(Future):
32-
def __init__(self, produce_future, relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size):
32+
def __init__(self, produce_future, relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size):
3333
super(FutureRecordMetadata, self).__init__()
3434
self._produce_future = produce_future
3535
# packing args as a tuple is a minor speed optimization
36-
self.args = (relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size)
36+
self.args = (relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size)
3737
produce_future.add_callback(self._produce_success)
3838
produce_future.add_errback(self.failure)
3939

@@ -42,7 +42,7 @@ def _produce_success(self, offset_and_timestamp):
4242

4343
# Unpacking from args tuple is minor speed optimization
4444
(relative_offset, timestamp_ms, checksum,
45-
serialized_key_size, serialized_value_size) = self.args
45+
serialized_key_size, serialized_value_size, serialized_header_size) = self.args
4646

4747
# None is when Broker does not support the API (<0.10) and
4848
# -1 is when the broker is configured for CREATE_TIME timestamps
@@ -53,7 +53,7 @@ def _produce_success(self, offset_and_timestamp):
5353
tp = self._produce_future.topic_partition
5454
metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms,
5555
checksum, serialized_key_size,
56-
serialized_value_size)
56+
serialized_value_size, serialized_header_size)
5757
self.success(metadata)
5858

5959
def get(self, timeout=None):
@@ -68,4 +68,4 @@ def get(self, timeout=None):
6868

6969
RecordMetadata = collections.namedtuple(
7070
'RecordMetadata', ['topic', 'partition', 'topic_partition', 'offset', 'timestamp',
71-
'checksum', 'serialized_key_size', 'serialized_value_size'])
71+
'checksum', 'serialized_key_size', 'serialized_value_size', 'serialized_header_size'])

kafka/producer/kafka.py

+13-5
Original file line numberDiff line numberDiff line change
@@ -513,7 +513,7 @@ def _estimate_size_in_bytes(self, key, value, headers=[]):
513513
return LegacyRecordBatchBuilder.estimate_size_in_bytes(
514514
magic, self.config['compression_type'], key, value)
515515

516-
def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
516+
def send(self, topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None):
517517
"""Publish a message to a topic.
518518
519519
Arguments:
@@ -534,6 +534,8 @@ def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
534534
partition (but if key is None, partition is chosen randomly).
535535
Must be type bytes, or be serializable to bytes via configured
536536
key_serializer.
537+
headers (optional): a list of header key value pairs. List items
538+
are tuples of str key and bytes value.
537539
timestamp_ms (int, optional): epoch milliseconds (from Jan 1 1970 UTC)
538540
to use as the message timestamp. Defaults to current time.
539541
@@ -563,13 +565,18 @@ def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
563565
partition = self._partition(topic, partition, key, value,
564566
key_bytes, value_bytes)
565567

566-
message_size = self._estimate_size_in_bytes(key_bytes, value_bytes)
568+
if headers is None:
569+
headers = []
570+
assert type(headers) == list
571+
assert all(type(item) == tuple and len(item) == 2 and type(item[0]) == str and type(item[1]) == bytes for item in headers)
572+
573+
message_size = self._estimate_size_in_bytes(key_bytes, value_bytes, headers)
567574
self._ensure_valid_record_size(message_size)
568575

569576
tp = TopicPartition(topic, partition)
570-
log.debug("Sending (key=%r value=%r) to %s", key, value, tp)
577+
log.debug("Sending (key=%r value=%r headers=%r) to %s", key, value, headers, tp)
571578
result = self._accumulator.append(tp, timestamp_ms,
572-
key_bytes, value_bytes,
579+
key_bytes, value_bytes, headers,
573580
self.config['max_block_ms'],
574581
estimated_size=message_size)
575582
future, batch_is_full, new_batch_created = result
@@ -588,7 +595,8 @@ def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
588595
FutureProduceResult(TopicPartition(topic, partition)),
589596
-1, None, None,
590597
len(key_bytes) if key_bytes is not None else -1,
591-
len(value_bytes) if value_bytes is not None else -1
598+
len(value_bytes) if value_bytes is not None else -1,
599+
sum(len(h_key.encode("utf-8")) + len(h_value) for h_key, h_value in headers) if headers else -1,
592600
).failure(e)
593601

594602
def flush(self, timeout=None):

kafka/producer/record_accumulator.py

+9-7
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ def __init__(self, tp, records, buffer):
5555
def record_count(self):
5656
return self.records.next_offset()
5757

58-
def try_append(self, timestamp_ms, key, value):
59-
metadata = self.records.append(timestamp_ms, key, value)
58+
def try_append(self, timestamp_ms, key, value, headers):
59+
metadata = self.records.append(timestamp_ms, key, value, headers)
6060
if metadata is None:
6161
return None
6262

@@ -65,7 +65,8 @@ def try_append(self, timestamp_ms, key, value):
6565
future = FutureRecordMetadata(self.produce_future, metadata.offset,
6666
metadata.timestamp, metadata.crc,
6767
len(key) if key is not None else -1,
68-
len(value) if value is not None else -1)
68+
len(value) if value is not None else -1,
69+
sum(len(h_key.encode("utf-8")) + len(h_val) for h_key, h_val in headers) if headers else -1)
6970
return future
7071

7172
def done(self, base_offset=None, timestamp_ms=None, exception=None):
@@ -196,7 +197,7 @@ def __init__(self, **configs):
196197
self.muted = set()
197198
self._drain_index = 0
198199

199-
def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms,
200+
def append(self, tp, timestamp_ms, key, value, headers, max_time_to_block_ms,
200201
estimated_size=0):
201202
"""Add a record to the accumulator, return the append result.
202203
@@ -209,6 +210,7 @@ def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms,
209210
timestamp_ms (int): The timestamp of the record (epoch ms)
210211
key (bytes): The key for the record
211212
value (bytes): The value for the record
213+
headers (List[Tuple[str, bytes]]): The header fields for the record
212214
max_time_to_block_ms (int): The maximum time in milliseconds to
213215
block for buffer memory to be available
214216
@@ -231,7 +233,7 @@ def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms,
231233
dq = self._batches[tp]
232234
if dq:
233235
last = dq[-1]
234-
future = last.try_append(timestamp_ms, key, value)
236+
future = last.try_append(timestamp_ms, key, value, headers)
235237
if future is not None:
236238
batch_is_full = len(dq) > 1 or last.records.is_full()
237239
return future, batch_is_full, False
@@ -246,7 +248,7 @@ def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms,
246248

247249
if dq:
248250
last = dq[-1]
249-
future = last.try_append(timestamp_ms, key, value)
251+
future = last.try_append(timestamp_ms, key, value, headers)
250252
if future is not None:
251253
# Somebody else found us a batch, return the one we
252254
# waited for! Hopefully this doesn't happen often...
@@ -261,7 +263,7 @@ def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms,
261263
)
262264

263265
batch = ProducerBatch(tp, records, buf)
264-
future = batch.try_append(timestamp_ms, key, value)
266+
future = batch.try_append(timestamp_ms, key, value, headers)
265267
if not future:
266268
raise Exception()
267269

test/test_producer.py

+9-1
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,16 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
9191
compression_type=compression)
9292
magic = producer._max_usable_produce_magic()
9393

94+
# record headers are supported in 0.11.0
95+
if version() < (0, 11, 0):
96+
headers = None
97+
else:
98+
headers = [("Header Key", b"Header Value")]
99+
94100
topic = random_string(5)
95101
future = producer.send(
96102
topic,
97-
value=b"Simple value", key=b"Simple key", timestamp_ms=9999999,
103+
value=b"Simple value", key=b"Simple key", headers=headers, timestamp_ms=9999999,
98104
partition=0)
99105
record = future.get(timeout=5)
100106
assert record is not None
@@ -116,6 +122,8 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
116122

117123
assert record.serialized_key_size == 10
118124
assert record.serialized_value_size == 12
125+
if headers:
126+
assert record.serialized_header_size == 22
119127

120128
# generated timestamp case is skipped for broker 0.9 and below
121129
if magic == 0:

0 commit comments

Comments
 (0)