Skip to content

Commit c57ee3f

Browse files
committed
CommitOffset feature
1 parent d980334 commit c57ee3f

File tree

5 files changed

+91
-0
lines changed

5 files changed

+91
-0
lines changed

Diff for: tests/topics/test_topic_reader.py

+40
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,26 @@ async def test_read_and_commit_with_ack(self, driver, topic_with_messages, topic
6464

6565
assert message != batch.messages[0]
6666

67+
async def test_commit_offset_works(self, driver, topic_with_messages, topic_consumer):
68+
for out in ["123", "456", "789", "0"]:
69+
async with driver.topic_client.reader(topic_with_messages, topic_consumer) as reader:
70+
message = await reader.receive_message()
71+
assert message.data.decode() == out
72+
73+
await driver.topic_client.commit_offset(
74+
topic_with_messages, topic_consumer, message.partition_id, message.offset + 1
75+
)
76+
77+
async def test_reader_reconnect_after_commit_offset(self, driver, topic_with_messages, topic_consumer):
78+
async with driver.topic_client.reader(topic_with_messages, topic_consumer) as reader:
79+
for out in ["123", "456", "789", "0"]:
80+
message = await reader.receive_message()
81+
assert message.data.decode() == out
82+
83+
await driver.topic_client.commit_offset(
84+
topic_with_messages, topic_consumer, message.partition_id, message.offset + 1
85+
)
86+
6787
async def test_read_compressed_messages(self, driver, topic_path, topic_consumer):
6888
async with driver.topic_client.writer(topic_path, codec=ydb.TopicCodec.GZIP) as writer:
6989
await writer.write("123")
@@ -183,6 +203,26 @@ def test_read_and_commit_with_ack(self, driver_sync, topic_with_messages, topic_
183203

184204
assert message != batch.messages[0]
185205

206+
def test_commit_offset_works(self, driver_sync, topic_with_messages, topic_consumer):
207+
for out in ["123", "456", "789", "0"]:
208+
with driver_sync.topic_client.reader(topic_with_messages, topic_consumer) as reader:
209+
message = reader.receive_message()
210+
assert message.data.decode() == out
211+
212+
driver_sync.topic_client.commit_offset(
213+
topic_with_messages, topic_consumer, message.partition_id, message.offset + 1
214+
)
215+
216+
def test_reader_reconnect_after_commit_offset(self, driver_sync, topic_with_messages, topic_consumer):
217+
with driver_sync.topic_client.reader(topic_with_messages, topic_consumer) as reader:
218+
for out in ["123", "456", "789", "0"]:
219+
message = reader.receive_message()
220+
assert message.data.decode() == out
221+
222+
driver_sync.topic_client.commit_offset(
223+
topic_with_messages, topic_consumer, message.partition_id, message.offset + 1
224+
)
225+
186226
def test_read_compressed_messages(self, driver_sync, topic_path, topic_consumer):
187227
with driver_sync.topic_client.writer(topic_path, codec=ydb.TopicCodec.GZIP) as writer:
188228
writer.write("123")

Diff for: ydb/_apis.py

+1
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ class TopicService(object):
117117
StreamRead = "StreamRead"
118118
StreamWrite = "StreamWrite"
119119
UpdateOffsetsInTransaction = "UpdateOffsetsInTransaction"
120+
CommitOffset = "CommitOffset"
120121

121122

122123
class QueryService(object):

Diff for: ydb/_grpc/grpcwrapper/ydb_topic.py

+16
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,22 @@ def from_proto(msg: ydb_topic_pb2.UpdateTokenResponse) -> typing.Any:
137137
return UpdateTokenResponse()
138138

139139

140+
@dataclass
141+
class CommitOffsetRequest(IToProto):
142+
path: str
143+
consumer: str
144+
partition_id: int
145+
offset: int
146+
147+
def to_proto(self) -> ydb_topic_pb2.CommitOffsetRequest:
148+
return ydb_topic_pb2.CommitOffsetRequest(
149+
path=self.path,
150+
consumer=self.consumer,
151+
partition_id=self.partition_id,
152+
offset=self.offset,
153+
)
154+
155+
140156
########################################################################################################################
141157
# StreamWrite
142158
########################################################################################################################

Diff for: ydb/_topic_reader/datatypes.py

+4
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ def _commit_get_offsets_range(self) -> OffsetsRange:
5656
def alive(self) -> bool:
5757
return not self._partition_session.closed
5858

59+
@property
60+
def partition_id(self) -> int:
61+
return self._partition_session.partition_id
62+
5963

6064
@dataclass
6165
class PartitionSession:

Diff for: ydb/topic.py

+30
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,21 @@ def tx_writer(
317317

318318
return TopicTxWriterAsyncIO(tx=tx, driver=self._driver, settings=settings, _client=self)
319319

320+
async def commit_offset(self, path: str, consumer: str, partition_id: int, offset: int) -> None:
321+
req = _ydb_topic.CommitOffsetRequest(
322+
path=path,
323+
consumer=consumer,
324+
partition_id=partition_id,
325+
offset=offset,
326+
)
327+
328+
await self._driver(
329+
req.to_proto(),
330+
_apis.TopicService.Stub,
331+
_apis.TopicService.CommitOffset,
332+
_wrap_operation,
333+
)
334+
320335
def close(self):
321336
if self._closed:
322337
return
@@ -563,6 +578,21 @@ def tx_writer(
563578

564579
return TopicTxWriter(tx, self._driver, settings, _parent=self)
565580

581+
def commit_offset(self, path: str, consumer: str, partition_id: int, offset: int) -> None:
582+
req = _ydb_topic.CommitOffsetRequest(
583+
path=path,
584+
consumer=consumer,
585+
partition_id=partition_id,
586+
offset=offset,
587+
)
588+
589+
self._driver(
590+
req.to_proto(),
591+
_apis.TopicService.Stub,
592+
_apis.TopicService.CommitOffset,
593+
_wrap_operation,
594+
)
595+
566596
def close(self):
567597
if self._closed:
568598
return

0 commit comments

Comments
 (0)