Skip to content

Commit 5a0cff8

Browse files
committed
Removed pydantic.
1 parent 6b4ce80 commit 5a0cff8

File tree

5 files changed

+13
-15
lines changed

5 files changed

+13
-15
lines changed

poetry.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pyproject.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ packages = [{ include = "taskiq_aio_kafka" }]
2727
python = "^3.7"
2828
taskiq = "^0"
2929
aiokafka = "^0.8.0"
30-
pydantic = "^1.10.7"
3130

3231
[tool.poetry.group.dev.dependencies]
3332
pytest = "^7.1.2"

taskiq_aio_kafka/broker.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,8 @@ def __init__( # noqa: WPS211
115115

116116
self._delay_kick_tasks: Set[asyncio.Task[None]] = set()
117117

118-
self._is_started = False
118+
self._is_producer_started = False
119+
self._is_consumer_started = False
119120

120121
async def startup(self) -> None:
121122
"""Setup AIOKafkaProducer, AIOKafkaConsumer and kafka topics.
@@ -127,7 +128,10 @@ async def startup(self) -> None:
127128
"""
128129
await super().startup()
129130

130-
if self._kafka_topic.name not in self._kafka_admin_client.list_topics():
131+
is_topic_available: bool = bool(
132+
self._kafka_admin_client.describe_topics([self._kafka_topic.name]),
133+
)
134+
if not is_topic_available:
131135
self._kafka_admin_client.create_topics(
132136
new_topics=[self._kafka_topic],
133137
validate_only=False,
@@ -136,8 +140,9 @@ async def startup(self) -> None:
136140
await self._aiokafka_producer.start()
137141
if self.is_worker_process:
138142
await self._aiokafka_consumer.start()
143+
self._is_consumer_started = True
139144

140-
self._is_started = True
145+
self._is_producer_started = True
141146

142147
async def shutdown(self) -> None:
143148
"""Close all connections on shutdown."""
@@ -175,7 +180,7 @@ async def kick(self, message: BrokerMessage) -> None:
175180
:raises ValueError: if startup wasn't called.
176181
:param message: message to send.
177182
"""
178-
if not self._is_started:
183+
if not self._is_producer_started:
179184
raise ValueError("Please run startup before kicking.")
180185

181186
kafka_message: bytes = pickle.dumps(message)
@@ -197,7 +202,7 @@ async def listen(
197202
:yields: parsed broker message.
198203
:raises ValueError: if no aiokafka_consumer or startup wasn't called.
199204
"""
200-
if not self._is_started:
205+
if not self._is_consumer_started:
201206
raise ValueError("Please run startup before listening.")
202207

203208
async for raw_kafka_message in self._aiokafka_consumer:

tests/conftest.py

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -87,20 +87,19 @@ async def test_kafka_consumer(
8787
@pytest.fixture()
8888
async def broker_without_arguments(
8989
kafka_url: str,
90-
base_topic_name: str,
9190
) -> AsyncGenerator[AioKafkaBroker, None]:
9291
"""Return AioKafkaBroker default realization.
9392
9493
In this fixture we don't pass custom topic, AIOKafkaProducer
9594
and AIOKafkaConsumer.
9695
9796
:param kafka_url: url to kafka.
98-
:param base_topic_name: name of the topic.
9997
10098
:yields: AioKafkaBroker.
10199
"""
102100
broker = AioKafkaBroker(
103101
bootstrap_servers=kafka_url,
102+
delete_topic_on_shutdown=True,
104103
)
105104
broker.is_worker_process = True
106105

@@ -114,10 +113,8 @@ async def broker_without_arguments(
114113
@pytest.fixture()
115114
async def broker(
116115
kafka_url: str,
117-
base_topic: NewTopic,
118116
test_kafka_producer: AIOKafkaProducer,
119117
test_kafka_consumer: AIOKafkaConsumer,
120-
base_topic_name: str,
121118
) -> AsyncGenerator[AioKafkaBroker, None]:
122119
"""Yield new broker instance.
123120
@@ -126,18 +123,16 @@ async def broker(
126123
and shutdown after test.
127124
128125
:param kafka_url: url to kafka.
129-
:param base_topic: custom topic.
130126
:param test_kafka_producer: custom AIOKafkaProducer.
131127
:param test_kafka_consumer: custom AIOKafkaConsumer.
132-
:param base_topic_name: name of the topic.
133128
134129
:yields: broker.
135130
"""
136131
broker = AioKafkaBroker(
137132
bootstrap_servers=kafka_url,
138-
kafka_topic=base_topic,
139133
aiokafka_producer=test_kafka_producer,
140134
aiokafka_consumer=test_kafka_consumer,
135+
delete_topic_on_shutdown=True,
141136
)
142137
broker.is_worker_process = True
143138

tests/test_broker.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@ async def test_kick_success(broker: AioKafkaBroker) -> None:
5353
received_message: BrokerMessage = pickle.loads(
5454
received_message_bytes,
5555
)
56-
5756
assert message_to_send == received_message
5857

5958

0 commit comments

Comments
 (0)