Skip to content

Commit

Permalink
add chain_future to send method
Browse files Browse the repository at this point in the history
  • Loading branch information
anexplore committed Jan 7, 2022
1 parent f0a57a6 commit 5c7807c
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 4 deletions.
6 changes: 4 additions & 2 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ def _estimate_size_in_bytes(self, key, value, headers=[]):
return LegacyRecordBatchBuilder.estimate_size_in_bytes(
magic, self.config['compression_type'], key, value)

def send(self, topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None):
def send(self, topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None, chain_future=None):
"""Publish a message to a topic.
Arguments:
Expand All @@ -563,6 +563,7 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest
are tuples of str key and bytes value.
timestamp_ms (int, optional): epoch milliseconds (from Jan 1 1970 UTC)
to use as the message timestamp. Defaults to current time.
chain_future (Future, optional): chained success and failure method
Returns:
FutureRecordMetadata: resolves to RecordMetadata
Expand Down Expand Up @@ -603,7 +604,8 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest
result = self._accumulator.append(tp, timestamp_ms,
key_bytes, value_bytes, headers,
self.config['max_block_ms'],
estimated_size=message_size)
estimated_size=message_size,
chain_future=chain_future)
future, batch_is_full, new_batch_created = result
if batch_is_full or new_batch_created:
log.debug("Waking up the sender since %s is either full or"
Expand Down
13 changes: 11 additions & 2 deletions kafka/producer/record_accumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import time

import kafka.errors as Errors
from kafka.future import Future
from kafka.producer.buffer import SimpleBufferPool
from kafka.producer.future import FutureRecordMetadata, FutureProduceResult
from kafka.record.memory_records import MemoryRecordsBuilder
Expand Down Expand Up @@ -198,7 +199,7 @@ def __init__(self, **configs):
self._drain_index = 0

def append(self, tp, timestamp_ms, key, value, headers, max_time_to_block_ms,
estimated_size=0):
estimated_size=0, chain_future=None):
"""Add a record to the accumulator, return the append result.
The append result will contain the future metadata, and flag for
Expand All @@ -213,12 +214,14 @@ def append(self, tp, timestamp_ms, key, value, headers, max_time_to_block_ms,
headers (List[Tuple[str, bytes]]): The header fields for the record
max_time_to_block_ms (int): The maximum time in milliseconds to
block for buffer memory to be available
chain_future (Future): chain future
Returns:
tuple: (future, batch_is_full, new_batch_created)
"""
assert isinstance(tp, TopicPartition), 'not TopicPartition'
assert not self._closed, 'RecordAccumulator is closed'
if chain_future is not None:
assert isinstance(chain_future, Future), 'not Future'
# We keep track of the number of appending thread to make sure we do
# not miss batches in abortIncompleteBatches().
self._appends_in_progress.increment()
Expand All @@ -235,6 +238,8 @@ def append(self, tp, timestamp_ms, key, value, headers, max_time_to_block_ms,
last = dq[-1]
future = last.try_append(timestamp_ms, key, value, headers)
if future is not None:
if chain_future:
future.chain(chain_future)
batch_is_full = len(dq) > 1 or last.records.is_full()
return future, batch_is_full, False

Expand All @@ -253,6 +258,8 @@ def append(self, tp, timestamp_ms, key, value, headers, max_time_to_block_ms,
# Somebody else found us a batch, return the one we
# waited for! Hopefully this doesn't happen often...
self._free.deallocate(buf)
if chain_future:
future.chain(chain_future)
batch_is_full = len(dq) > 1 or last.records.is_full()
return future, batch_is_full, False

Expand All @@ -269,6 +276,8 @@ def append(self, tp, timestamp_ms, key, value, headers, max_time_to_block_ms,

dq.append(batch)
self._incomplete.add(batch)
if chain_future:
future.chain(chain_future)
batch_is_full = len(dq) > 1 or batch.records.is_full()
return future, batch_is_full, True
finally:
Expand Down

0 comments on commit 5c7807c

Please sign in to comment.