Skip to content

Commit 65902ca

Browse files
authored
#39 Fix empty threaded messages (#40)
* Handle empty messages in threaded consumer * Update version.py
1 parent e02965c commit 65902ca

File tree

2 files changed

+4
-5
lines changed

2 files changed

+4
-5
lines changed

src/ConfluentKafkaLibrary/consumer.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -361,10 +361,9 @@ def get_messages_from_thread(self, running_thread, decode_format=None):
361361
- ``decode_format`` (str) - If you need to decode data to specific format
362362
(See https://docs.python.org/3/library/codecs.html#standard-encodings). Default: None.
363363
"""
364-
records = self._decode_data(
365-
data=running_thread.get_messages(),
366-
decode_format=decode_format
367-
)
364+
records = running_thread.get_messages()
365+
if records:
366+
records = self._decode_data(records, decode_format)
368367
return records
369368

370369
def get_thread_group_id(self, running_thread):

src/ConfluentKafkaLibrary/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
VERSION = '2.0.2-2'
1+
VERSION = '2.0.2-3'

0 commit comments

Comments
 (0)