Skip to content

Commit 67c0cef

Browse files
committed
Non stream CommitOffset feature
1 parent d980334 commit 67c0cef

File tree

5 files changed

+83
-0
lines changed

5 files changed

+83
-0
lines changed

tests/topics/test_topic_reader.py

+20
Original file line numberDiff line numberDiff line change
@@ -183,6 +183,26 @@ def test_read_and_commit_with_ack(self, driver_sync, topic_with_messages, topic_
183183

184184
assert message != batch.messages[0]
185185

186+
def test_reader_fine_with_no_stream_commits(self, driver_sync, topic_with_messages, topic_consumer):
187+
with driver_sync.topic_client.reader(topic_with_messages, topic_consumer) as reader:
188+
for out in ["123", "456", "789", "0"]:
189+
message = reader.receive_message()
190+
assert message.data.decode() == out
191+
192+
driver_sync.topic_client.commit_offsets(
193+
topic_with_messages, topic_consumer, message.partition_id, message.offset_to_commit
194+
)
195+
196+
def test_no_stream_commits_works(self, driver_sync, topic_with_messages, topic_consumer):
197+
for out in ["123", "456", "789", "0"]:
198+
with driver_sync.topic_client.reader(topic_with_messages, topic_consumer) as reader:
199+
message = reader.receive_message()
200+
assert message.data.decode() == out
201+
202+
driver_sync.topic_client.commit_offsets(
203+
topic_with_messages, topic_consumer, message.partition_id, message.offset_to_commit
204+
)
205+
186206
def test_read_compressed_messages(self, driver_sync, topic_path, topic_consumer):
187207
with driver_sync.topic_client.writer(topic_path, codec=ydb.TopicCodec.GZIP) as writer:
188208
writer.write("123")

ydb/_apis.py

+1
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ class TopicService(object):
117117
StreamRead = "StreamRead"
118118
StreamWrite = "StreamWrite"
119119
UpdateOffsetsInTransaction = "UpdateOffsetsInTransaction"
120+
CommitOffset = "CommitOffset"
120121

121122

122123
class QueryService(object):

ydb/_grpc/grpcwrapper/ydb_topic.py

+16
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,22 @@ def from_proto(msg: ydb_topic_pb2.UpdateTokenResponse) -> typing.Any:
137137
return UpdateTokenResponse()
138138

139139

140+
@dataclass
141+
class CommitOffsetRequest(IToProto):
142+
path: str
143+
consumer: str
144+
partition_id: int
145+
offset: int
146+
147+
def to_proto(self) -> ydb_topic_pb2.CommitOffsetRequest:
148+
return ydb_topic_pb2.CommitOffsetRequest(
149+
path=self.path,
150+
consumer=self.consumer,
151+
partition_id=self.partition_id,
152+
offset=self.offset,
153+
)
154+
155+
140156
########################################################################################################################
141157
# StreamWrite
142158
########################################################################################################################

ydb/_topic_reader/datatypes.py

+16
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,14 @@ def _commit_get_offsets_range(self) -> OffsetsRange:
5656
def alive(self) -> bool:
5757
return not self._partition_session.closed
5858

59+
@property
60+
def partition_id(self) -> int:
61+
return self._partition_session.partition_id
62+
63+
@property
64+
def offset_to_commit(self) -> int:
65+
return self._commit_end_offset
66+
5967

6068
@dataclass
6169
class PartitionSession:
@@ -184,6 +192,14 @@ def empty(self) -> bool:
184192
def alive(self) -> bool:
185193
return not self._partition_session.closed
186194

195+
@property
196+
def partition_id(self) -> int:
197+
return self._partition_session.partition_id
198+
199+
@property
200+
def offset_to_commit(self) -> int:
201+
return self._commit_get_offsets_range().end
202+
187203
def pop_message(self) -> PublicMessage:
188204
return self.messages.pop(0)
189205

ydb/topic.py

+30
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,21 @@ def tx_writer(
317317

318318
return TopicTxWriterAsyncIO(tx=tx, driver=self._driver, settings=settings, _client=self)
319319

320+
async def commit_offsets(self, path: str, consumer: str, partition_id: int, offset: int) -> None:
321+
req = _ydb_topic.CommitOffsetRequest(
322+
path=path,
323+
consumer=consumer,
324+
partition_id=partition_id,
325+
offset=offset,
326+
)
327+
328+
await self._driver(
329+
req.to_proto(),
330+
_apis.TopicService.Stub,
331+
_apis.TopicService.CommitOffset,
332+
_wrap_operation,
333+
)
334+
320335
def close(self):
321336
if self._closed:
322337
return
@@ -563,6 +578,21 @@ def tx_writer(
563578

564579
return TopicTxWriter(tx, self._driver, settings, _parent=self)
565580

581+
def commit_offsets(self, path: str, consumer: str, partition_id: int, offset: int) -> None:
582+
req = _ydb_topic.CommitOffsetRequest(
583+
path=path,
584+
consumer=consumer,
585+
partition_id=partition_id,
586+
offset=offset,
587+
)
588+
589+
self._driver(
590+
req.to_proto(),
591+
_apis.TopicService.Stub,
592+
_apis.TopicService.CommitOffset,
593+
_wrap_operation,
594+
)
595+
566596
def close(self):
567597
if self._closed:
568598
return

0 commit comments

Comments
 (0)