Skip to content

Commit eba948e

Browse files
authored
Merge pull request #115 start topic integration tests
2 parents 117ad24 + 994eef3 commit eba948e

File tree

4 files changed

+58
-6
lines changed

4 files changed

+58
-6
lines changed

tests/conftest.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import pytest
55
import ydb
66
import time
7+
import subprocess
78

89

910
@pytest.fixture(autouse=True, scope="session")
@@ -96,3 +97,21 @@ async def driver(endpoint, database, event_loop):
9697
yield driver
9798

9899
await driver.stop(timeout=10)
100+
101+
102+
@pytest.fixture()
103+
def topic_path(endpoint) -> str:
104+
subprocess.run(
105+
"""docker-compose exec -T ydb /ydb -e grpc://%s -d /local topic drop /local/test-topic"""
106+
% endpoint,
107+
shell=True,
108+
)
109+
res = subprocess.run(
110+
"""docker-compose exec -T ydb /ydb -e grpc://%s -d /local topic create /local/test-topic"""
111+
% endpoint,
112+
shell=True,
113+
capture_output=True,
114+
)
115+
assert res.returncode == 0, res.stderr + res.stdout
116+
117+
return "/local/test-topic"

tests/topics/test_topic_writer.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
import pytest
2+
3+
import ydb.aio
4+
5+
6+
@pytest.mark.asyncio
7+
class TestTopicWriterAsyncIO:
8+
async def test_send_message(self, driver: ydb.aio.Driver, topic_path):
9+
writer = driver.topic_client.topic_writer(
10+
topic_path, producer_and_message_group_id="test"
11+
)
12+
writer.write(ydb.TopicWriterMessage(data="123".encode()))
13+
14+
async def test_wait_last_seqno(self, driver: ydb.aio.Driver, topic_path):
15+
async with driver.topic_client.topic_writer(
16+
topic_path,
17+
producer_and_message_group_id="test",
18+
auto_seqno=False,
19+
) as writer:
20+
await writer.write_with_ack(
21+
ydb.TopicWriterMessage(data="123".encode(), seqno=5)
22+
)
23+
24+
async with driver.topic_client.topic_writer(
25+
topic_path,
26+
producer_and_message_group_id="test",
27+
get_last_seqno=True,
28+
) as writer2:
29+
init_info = await writer2.wait_init()
30+
assert init_info.last_seqno == 5

ydb/_topic_writer/topic_writer.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,9 @@ class Skipped:
165165
pass
166166

167167

168+
PublicWriteResultTypes = Union[PublicWriteResult.Written, PublicWriteResult.Skipped]
169+
170+
168171
class WriterSettings(PublicWriterSettings):
169172
def __init__(self, settings: PublicWriterSettings):
170173
self.__dict__ = settings.__dict__.copy()

ydb/_topic_writer/topic_writer_asyncio.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import asyncio
22
import datetime
3-
import threading
43
from collections import deque
54
from typing import Deque, AsyncIterator, Union, List, Optional, Callable
65

@@ -9,13 +8,13 @@
98
PublicWriterSettings,
109
WriterSettings,
1110
Writer,
12-
PublicWriteResult,
1311
PublicMessage,
1412
PublicWriterInitInfo,
1513
InternalMessage,
1614
TopicWriterStopped,
1715
TopicWriterError,
1816
messages_to_proto_requests,
17+
PublicWriteResultTypes,
1918
)
2019
from .. import (
2120
_apis,
@@ -35,21 +34,22 @@
3534
class WriterAsyncIO:
3635
_loop: asyncio.AbstractEventLoop
3736
_reconnector: "WriterAsyncIOReconnector"
38-
_lock: threading.Lock
37+
_lock: asyncio.Lock
3938
_closed: bool
4039

4140
@property
4241
def last_seqno(self) -> int:
4342
raise NotImplementedError()
4443

4544
def __init__(self, driver: SupportedDriverType, settings: PublicWriterSettings):
45+
self._lock = asyncio.Lock()
4646
self._loop = asyncio.get_running_loop()
4747
self._closed = False
4848
self._reconnector = WriterAsyncIOReconnector(
4949
driver=driver, settings=WriterSettings(settings)
5050
)
5151

52-
async def __aenter__(self):
52+
async def __aenter__(self) -> "WriterAsyncIO":
5353
return self
5454

5555
async def __aexit__(self, exc_type, exc_val, exc_tb):
@@ -62,7 +62,7 @@ def __del__(self):
6262
self._loop.call_soon(self.close)
6363

6464
async def close(self):
65-
with self._lock:
65+
async with self._lock:
6666
if self._closed:
6767
return
6868
self._closed = True
@@ -73,7 +73,7 @@ async def write_with_ack(
7373
self,
7474
messages: Union[Writer.MessageType, List[Writer.MessageType]],
7575
*args: Optional[Writer.MessageType],
76-
) -> Union[PublicWriteResult, List[PublicWriteResult]]:
76+
) -> Union[PublicWriteResultTypes, List[PublicWriteResultTypes]]:
7777
"""
7878
IT IS SLOWLY WAY. IT IS BAD CHOISE IN MOST CASES.
7979
It is recommended to use write with optionally flush or write_with_ack_futures and receive acks by wait futures.

0 commit comments

Comments
 (0)