Skip to content

Commit 3d250e9

Browse files
authored
Merge pull request #253 topic-writer: add set result for handle acks
2 parents 7580bb2 + 6f4f7e0 commit 3d250e9

File tree

2 files changed

+15
-3
lines changed

2 files changed

+15
-3
lines changed

tests/topics/test_topic_writer.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,8 @@ async def test_wait_last_seqno(self, driver: ydb.aio.Driver, topic_path):
1616
producer_id="test",
1717
auto_seqno=False,
1818
) as writer:
19-
await writer.write_with_ack(ydb.TopicWriterMessage(data="123".encode(), seqno=5))
19+
ret = await writer.write_with_ack(ydb.TopicWriterMessage(data="123".encode(), seqno=5))
20+
assert ret.offset == 0
2021

2122
async with driver.topic_client.writer(
2223
topic_path,
@@ -62,12 +63,14 @@ async def test_write_multi_message_with_ack(
6263
self, driver: ydb.aio.Driver, topic_path, topic_reader: ydb.TopicReaderAsyncIO
6364
):
6465
async with driver.topic_client.writer(topic_path) as writer:
65-
await writer.write_with_ack(
66+
res1, res2 = await writer.write_with_ack(
6667
[
6768
ydb.TopicWriterMessage(data="123".encode()),
6869
ydb.TopicWriterMessage(data="456".encode()),
6970
]
7071
)
72+
assert res1.offset == 0
73+
assert res2.offset == 1
7174

7275
batch = await topic_reader.receive_batch()
7376

ydb/_topic_writer/topic_writer_asyncio.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
TopicWriterStopped,
1919
TopicWriterError,
2020
messages_to_proto_requests,
21+
PublicWriteResult,
2122
PublicWriteResultTypes,
2223
Message,
2324
)
@@ -505,7 +506,15 @@ def _handle_receive_ack(self, ack):
505506
"internal error - receive unexpected ack. Expected seqno: %s, received seqno: %s"
506507
% (current_message.seq_no, ack.seq_no)
507508
)
508-
message_future.set_result(None) # todo - return result with offset or skip status
509+
write_ack_msg = StreamWriteMessage.WriteResponse.WriteAck
510+
status = ack.message_write_status
511+
if isinstance(status, write_ack_msg.StatusSkipped):
512+
result = PublicWriteResult.Skipped()
513+
elif isinstance(status, write_ack_msg.StatusWritten):
514+
result = PublicWriteResult.Written(offset=status.offset)
515+
else:
516+
raise TopicWriterError("internal error - receive unexpected ack message.")
517+
message_future.set_result(result)
509518

510519
async def _send_loop(self, writer: "WriterAsyncIOStream"):
511520
try:

0 commit comments

Comments
 (0)