Skip to content

Commit ca3c9e6

Browse files
author
DanielePalaia
committed
better management of arguments
1 parent 078b70d commit ca3c9e6

File tree

7 files changed

+153
-39
lines changed

7 files changed

+153
-39
lines changed

examples/getting_started/main.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@
33
Connection,
44
ExchangeSpecification,
55
Message,
6-
QueueSpecification,
76
QueueType,
7+
QuorumQueueSpecification,
8+
StreamSpecification,
89
exchange_address,
910
)
1011

@@ -24,7 +25,7 @@ def main() -> None:
2425
management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))
2526

2627
management.declare_queue(
27-
QueueSpecification(name=queue_name, queue_type=QueueType.quorum)
28+
StreamSpecification(name=queue_name, queue_type=QueueType.stream)
2829
)
2930

3031
print("binding queue to exchange")

poetry.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rabbitmq_amqp_python_client/__init__.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,14 @@
66
from .entities import (
77
BindingSpecification,
88
ExchangeSpecification,
9-
QueueSpecification,
109
)
1110
from .publisher import Publisher
1211
from .qpid.proton._message import Message
12+
from .queues import (
13+
ClassicQueueSpecification,
14+
QuorumQueueSpecification,
15+
StreamSpecification,
16+
)
1317

1418
try:
1519
__version__ = metadata.version(__package__)
@@ -23,7 +27,9 @@
2327
__all__ = [
2428
"Connection",
2529
"ExchangeSpecification",
26-
"QueueSpecification",
30+
"QuorumQueueSpecification",
31+
"ClassicQueueSpecification",
32+
"StreamSpecification",
2733
"BindingSpecification",
2834
"QueueType",
2935
"Publisher",

rabbitmq_amqp_python_client/entities.py

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,23 +14,6 @@ class ExchangeSpecification:
1414
is_durable: bool = True
1515

1616

17-
@dataclass
18-
class QueueSpecification:
19-
name: str
20-
queue_type: QueueType = QueueType.quorum
21-
dead_letter_routing_key: Optional[str] = None
22-
is_exclusive: Optional[bool] = None
23-
max_len: Optional[int] = None
24-
max_len_bytes: Optional[int] = None
25-
message_ttl: Optional[int] = None
26-
expires: Optional[int] = None
27-
dead_letter_exchange: Optional[str] = ""
28-
is_auto_delete: bool = False
29-
is_durable: bool = True
30-
overflow: Optional[str] = None
31-
single_active_consumer: Optional[bool] = None
32-
33-
3417
@dataclass
3518
class QueueInfo:
3619
name: str

rabbitmq_amqp_python_client/management.py

Lines changed: 92 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
BindingSpecification,
1515
ExchangeSpecification,
1616
QueueInfo,
17-
QueueSpecification,
1817
)
1918
from .exceptions import ValidationCodeException
2019
from .options import ReceiverOption, SenderOption
@@ -24,6 +23,11 @@
2423
BlockingReceiver,
2524
BlockingSender,
2625
)
26+
from .queues import (
27+
ClassicQueueSpecification,
28+
QuorumQueueSpecification,
29+
StreamSpecification,
30+
)
2731

2832
logger = logging.getLogger(__name__)
2933

@@ -125,9 +129,41 @@ def declare_exchange(
125129
return exchange_specification
126130

127131
def declare_queue(
128-
self, queue_specification: QueueSpecification
129-
) -> QueueSpecification:
132+
self,
133+
queue_specification: (
134+
ClassicQueueSpecification | QuorumQueueSpecification | StreamSpecification
135+
),
136+
) -> ClassicQueueSpecification | QuorumQueueSpecification | StreamSpecification:
130137
logger.debug("declare_queue operation called")
138+
139+
if (
140+
type(queue_specification) is ClassicQueueSpecification
141+
or type(queue_specification) is QuorumQueueSpecification
142+
):
143+
body = self._declare_queue(queue_specification)
144+
145+
elif type(queue_specification) is StreamSpecification:
146+
body = self._declare_stream(queue_specification)
147+
148+
path = queue_address(queue_specification.name)
149+
150+
self.request(
151+
body,
152+
path,
153+
CommonValues.command_put.value,
154+
[
155+
CommonValues.response_code_200.value,
156+
CommonValues.response_code_201.value,
157+
CommonValues.response_code_409.value,
158+
],
159+
)
160+
161+
return queue_specification
162+
163+
def _declare_queue(
164+
self, queue_specification: ClassicQueueSpecification | QuorumQueueSpecification
165+
) -> dict[str, Any]:
166+
131167
body = {}
132168
args: dict[str, Any] = {}
133169

@@ -155,22 +191,63 @@ def declare_queue(
155191
queue_specification.single_active_consumer
156192
)
157193

194+
if type(queue_specification) is ClassicQueueSpecification:
195+
if queue_specification.maximum_priority is not None:
196+
args["x-maximum-priority"] = queue_specification.maximum_priority
197+
198+
if type(queue_specification) is QuorumQueueSpecification:
199+
if queue_specification.deliver_limit is not None:
200+
args["x-deliver-limit"] = queue_specification.deliver_limit
201+
202+
if queue_specification.dead_letter_strategy is not None:
203+
args["x-dead-letter-strategy"] = (
204+
queue_specification.dead_letter_strategy
205+
)
206+
207+
if queue_specification.quorum_initial_group_size is not None:
208+
args["x-initial-quorum-group-size"] = (
209+
queue_specification.quorum_initial_group_size
210+
)
211+
212+
if queue_specification.cluster_target_size is not None:
213+
args["cluster_target_size"] = queue_specification.cluster_target_size
214+
158215
body["arguments"] = args # type: ignore
159216

160-
path = queue_address(queue_specification.name)
217+
return body
161218

162-
self.request(
163-
body,
164-
path,
165-
CommonValues.command_put.value,
166-
[
167-
CommonValues.response_code_200.value,
168-
CommonValues.response_code_201.value,
169-
CommonValues.response_code_409.value,
170-
],
171-
)
219+
def _declare_stream(
220+
self, stream_specification: StreamSpecification
221+
) -> dict[str, Any]:
172222

173-
return queue_specification
223+
body = {}
224+
args: dict[str, Any] = {}
225+
226+
args["x-queue-type"] = stream_specification.queue_type.value
227+
228+
if stream_specification.max_len_bytes is not None:
229+
args["x-max-length-bytes"] = stream_specification.max_len_bytes
230+
231+
if stream_specification.max_time_retention is not None:
232+
args["x-max-time-retention"] = stream_specification.max_time_retention
233+
234+
if stream_specification.max_segment_size_in_bytes is not None:
235+
args["x-max-segment-size-in-bytes"] = (
236+
stream_specification.max_segment_size_in_bytes
237+
)
238+
239+
if stream_specification.filter_size is not None:
240+
args["x-filter-size"] = stream_specification.filter_size
241+
242+
if stream_specification.initial_group_size is not None:
243+
args["x-initial-group-size"] = stream_specification.initial_group_size
244+
245+
if stream_specification.leader_locator is not None:
246+
args["x-leader-locator"] = stream_specification.leader_locator
247+
248+
body["arguments"] = args
249+
250+
return body
174251

175252
def delete_exchange(self, exchange_name: str) -> None:
176253
logger.debug("delete_exchange operation called")
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
from dataclasses import dataclass
2+
from typing import Optional
3+
4+
from .common import QueueType
5+
6+
7+
@dataclass
8+
class QueueSpecification:
9+
name: str
10+
expires: Optional[int] = None
11+
message_ttl: Optional[int] = None
12+
overflow: Optional[str] = None
13+
single_active_consumer: Optional[bool] = None
14+
dead_letter_exchange: Optional[str] = None
15+
dead_letter_routing_key: Optional[str] = None
16+
max_len: Optional[int] = None
17+
max_len_bytes: Optional[int] = None
18+
leader_locator: Optional[str] = None
19+
is_auto_delete: bool = False
20+
is_durable: bool = True
21+
22+
23+
@dataclass
24+
class ClassicQueueSpecification(QueueSpecification):
25+
queue_type: QueueType = QueueType.classic
26+
maximum_priority: Optional[int] = None
27+
28+
29+
@dataclass
30+
class QuorumQueueSpecification(QueueSpecification):
31+
queue_type: QueueType = QueueType.quorum
32+
deliver_limit: Optional[str] = None
33+
dead_letter_strategy: Optional[str] = None
34+
quorum_initial_group_size: Optional[int] = None
35+
cluster_target_size: Optional[int] = None
36+
37+
38+
@dataclass
39+
class StreamSpecification:
40+
name: str
41+
queue_type: QueueType = QueueType.stream
42+
max_len_bytes: Optional[str] = None
43+
max_time_retention: Optional[str] = None
44+
max_segment_size_in_bytes: Optional[str] = None
45+
filter_size: Optional[int] = None
46+
initial_group_size: Optional[int] = None
47+
leader_locator: Optional[str] = None

setup.cfg

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ exclude = .git, venv
1010
max-line-length = 120
1111

1212
[mypy]
13-
python_version = 3.9
13+
python_version = 3.10
1414
strict = True
1515
ignore_missing_imports = True
1616

0 commit comments

Comments
 (0)