Skip to content

Commit 0ca4313

Browse files
hnousiainenjeffwidman
authored andcommitted
Expose record headers in ConsumerRecords
1 parent 9d30ab8 commit 0ca4313

File tree

3 files changed

+13
-6
lines changed

3 files changed

+13
-6
lines changed

Diff for: README.rst

+5
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,11 @@ that expose basic message attributes: topic, partition, offset, key, and value:
7070
>>> for msg in consumer:
7171
... assert isinstance(msg.value, dict)
7272

73+
>>> # Access record headers. The returned value is a list of tuples
74+
>>> # with str, bytes for key and value
75+
>>> for msg in consumer:
76+
... print (msg.headers)
77+
7378
>>> # Get consumer metrics
7479
>>> metrics = consumer.metrics()
7580

Diff for: kafka/consumer/fetcher.py

+5-3
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929

3030
ConsumerRecord = collections.namedtuple("ConsumerRecord",
3131
["topic", "partition", "offset", "timestamp", "timestamp_type",
32-
"key", "value", "checksum", "serialized_key_size", "serialized_value_size"])
32+
"key", "value", "headers", "checksum", "serialized_key_size", "serialized_value_size", "serialized_header_size"])
3333

3434

3535
CompletedFetch = collections.namedtuple("CompletedFetch",
@@ -456,10 +456,12 @@ def _unpack_message_set(self, tp, records):
456456
value = self._deserialize(
457457
self.config['value_deserializer'],
458458
tp.topic, record.value)
459+
headers = record.headers
460+
header_size = sum(len(h_key.encode("utf-8")) + len(h_val) for h_key, h_val in headers) if headers else -1
459461
yield ConsumerRecord(
460462
tp.topic, tp.partition, record.offset, record.timestamp,
461-
record.timestamp_type, key, value, record.checksum,
462-
key_size, value_size)
463+
record.timestamp_type, key, value, headers, record.checksum,
464+
key_size, value_size, header_size)
463465

464466
batch = records.next_batch()
465467

Diff for: test/test_fetcher.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -509,7 +509,7 @@ def test_partition_records_offset():
509509
fetch_offset = 123
510510
tp = TopicPartition('foo', 0)
511511
messages = [ConsumerRecord(tp.topic, tp.partition, i,
512-
None, None, 'key', 'value', 'checksum', 0, 0)
512+
None, None, 'key', 'value', [], 'checksum', 0, 0, -1)
513513
for i in range(batch_start, batch_end)]
514514
records = Fetcher.PartitionRecords(fetch_offset, None, messages)
515515
assert len(records) > 0
@@ -534,7 +534,7 @@ def test_partition_records_no_fetch_offset():
534534
fetch_offset = 123
535535
tp = TopicPartition('foo', 0)
536536
messages = [ConsumerRecord(tp.topic, tp.partition, i,
537-
None, None, 'key', 'value', 'checksum', 0, 0)
537+
None, None, 'key', 'value', None, 'checksum', 0, 0, -1)
538538
for i in range(batch_start, batch_end)]
539539
records = Fetcher.PartitionRecords(fetch_offset, None, messages)
540540
assert len(records) == 0
@@ -549,7 +549,7 @@ def test_partition_records_compacted_offset():
549549
fetch_offset = 42
550550
tp = TopicPartition('foo', 0)
551551
messages = [ConsumerRecord(tp.topic, tp.partition, i,
552-
None, None, 'key', 'value', 'checksum', 0, 0)
552+
None, None, 'key', 'value', None, 'checksum', 0, 0, -1)
553553
for i in range(batch_start, batch_end) if i != fetch_offset]
554554
records = Fetcher.PartitionRecords(fetch_offset, None, messages)
555555
assert len(records) == batch_end - fetch_offset - 1

0 commit comments

Comments
 (0)