Skip to content

Commit 7e92630

Browse files
DanielePalaiaDanielePalaia
andauthored
preparing for first release (#21)
* preparing for first release * check destination url for publishers and consumers --------- Co-authored-by: DanielePalaia <daniele985@@gmail.com>
1 parent 80f9f93 commit 7e92630

File tree

9 files changed

+152
-4
lines changed

9 files changed

+152
-4
lines changed

README.md

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,97 @@ This library is in early stages of development. It is meant to be used with Rabb
1111

1212
## Getting Started
1313

14-
An example is provide in ./getting_started_main.py you can run it after starting a RabbitMQ 4.0 broker with:
14+
An example is provided in ./getting_started_main.py you can run it after starting a RabbitMQ 4.0 broker with:
1515

1616
poetry run python ./examples/getting_started/main.py
1717

18+
### Creating a connection
19+
20+
A connection to the RabbitMQ AMQP 1.0 server can be established using the Connection object.
21+
22+
For example:
23+
24+
```python
25+
connection = Connection("amqp://guest:guest@localhost:5672/")
26+
connection.dial()
27+
```
28+
29+
### Managing resources
30+
31+
Once we have a Connection object we can get a Management object in order to submit to the server management operations
32+
(es: declare/delete queues and exchanges, purging queues, binding/unbinding objects ecc...)
33+
34+
For example (this code is declaring an exchange and a queue:
35+
36+
```python
37+
management = connection.management()
38+
39+
print("declaring exchange and queue")
40+
management.declare_exchange(ExchangeSpecification(name=exchange_name, arguments={}))
41+
42+
management.declare_queue(
43+
QuorumQueueSpecification(name=queue_name)
44+
)
45+
```
46+
47+
### Publishing messages
48+
49+
Once we have a Connection object we can get a Publisher object in order to send messages to the server (to an exchange or queue)
50+
51+
For example:
52+
53+
```python
54+
addr_queue = AddressHelper.queue_address(queue_name)
55+
publisher = connection.publisher(addr)
56+
57+
# publish messages
58+
for i in range(messages_to_publish):
59+
publisher.publish(Message(body="test"))
60+
61+
publisher.close()
62+
```
63+
64+
### Consuming messages
65+
66+
Once we have a Connection object we can get a Consumer object in order to consumer messages from the server (queue).
67+
68+
Messages are received through a callback
69+
70+
For example:
71+
72+
Create a class which extends AMQPMessagingHandler which defines at minimum the on_consumer method, that will receive the
73+
messages consumed:
74+
75+
```python
76+
class MyMessageHandler(AMQPMessagingHandler):
77+
78+
def __init__(self):
79+
super().__init__()
80+
self._count = 0
81+
82+
def on_message(self, event: Event):
83+
print("received message: " + str(event.message.body))
84+
85+
# accepting
86+
self.delivery_context.accept(event)
87+
```
88+
89+
Then from connection get a consumer object:
90+
91+
```python
92+
addr_queue = AddressHelper.queue_address(queue_name)
93+
consumer = connection.consumer(addr_queue, handler=MyMessageHandler())
94+
95+
try:
96+
consumer.run()
97+
except KeyboardInterrupt:
98+
pass
99+
100+
consumer.close()
101+
```
102+
103+
The consumer will run indefinitively waiting for messages to arrive.
104+
105+
106+
18107

examples/getting_started/main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ def __init__(self):
2020
self._count = 0
2121

2222
def on_message(self, event: Event):
23-
print("received message: " + str(event.message.annotations))
23+
print("received message: " + str(event.message.body))
2424

2525
# accepting
2626
self.delivery_context.accept(event)

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[tool.poetry]
22
name = "rabbitmq-amqp-python-client"
3-
version = "0.1.0"
3+
version = "0.1.0-alpha.0"
44
description = "Python RabbitMQ client for AMQP 1.0 protocol"
55
authors = ["RabbitMQ team"]
66
license = "Apache-2.0 license"

rabbitmq_amqp_python_client/address_helper.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,9 @@ def binding_path_with_exchange_queue(
7272
+ ";args="
7373
)
7474
return binding_path_wth_exchange_queue_key
75+
76+
77+
def validate_address(address: str) -> bool:
78+
if address.startswith("/queues") or address.startswith("/exchanges"):
79+
return True
80+
return False

rabbitmq_amqp_python_client/connection.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
import logging
22
from typing import Optional
33

4+
from .address_helper import validate_address
45
from .consumer import Consumer
6+
from .exceptions import ArgumentOutOfRangeException
57
from .management import Management
68
from .publisher import Publisher
79
from .qpid.proton._handlers import MessagingHandler
@@ -35,11 +37,19 @@ def close(self) -> None:
3537
self._conn.close()
3638

3739
def publisher(self, destination: str) -> Publisher:
40+
if validate_address(destination) is False:
41+
raise ArgumentOutOfRangeException(
42+
"destination address must start with /queues or /exchanges"
43+
)
3844
publisher = Publisher(self._conn, destination)
3945
return publisher
4046

4147
def consumer(
4248
self, destination: str, handler: Optional[MessagingHandler] = None
4349
) -> Consumer:
50+
if validate_address(destination) is False:
51+
raise ArgumentOutOfRangeException(
52+
"destination address must start with /queues or /exchanges"
53+
)
4454
consumer = Consumer(self._conn, destination, handler)
4555
return consumer

rabbitmq_amqp_python_client/consumer.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,18 @@ def close(self) -> None:
4040
self._receiver.close()
4141

4242
def run(self) -> None:
43+
logger.debug("Running the consumer: starting to consume")
4344
if self._receiver is not None:
4445
self._receiver.container.run()
4546

4647
def stop(self) -> None:
48+
logger.debug("Stopping the consumer: starting to consume")
4749
if self._receiver is not None:
4850
self._receiver.container.stop_events()
4951
self._receiver.container.stop()
5052

5153
def _create_receiver(self, addr: str) -> BlockingReceiver:
54+
logger.debug("Creating the receiver")
5255
return self._conn.create_receiver(
5356
addr, options=ReceiverOptionUnsettled(addr), handler=self._handler
5457
)

rabbitmq_amqp_python_client/utils.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
def validate_annotations(annotations: []) -> bool: # type: ignore
22
validated = True
33
for annotation in annotations:
4-
if len(annotation) > 0 and annotation[:2] == "x-":
4+
if annotation.startswith("x-"):
55
pass
66
else:
77
validated = False

tests/test_consumer.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,24 @@ def test_consumer_sync_queue_accept(connection: Connection) -> None:
5353
assert consumed > 0
5454

5555

56+
def test_consumer_invalid_destination(connection: Connection) -> None:
57+
58+
queue_name = "test-queue-sync-invalid-accept"
59+
raised = False
60+
consumer = None
61+
try:
62+
consumer = connection.consumer("/invalid-destination/" + queue_name)
63+
except ArgumentOutOfRangeException:
64+
raised = True
65+
except Exception:
66+
raised = False
67+
68+
if consumer is not None:
69+
consumer.close()
70+
71+
assert raised is True
72+
73+
5674
def test_consumer_async_queue_accept(connection: Connection) -> None:
5775

5876
messages_to_send = 1000

tests/test_publisher.py

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from rabbitmq_amqp_python_client import (
22
AddressHelper,
3+
ArgumentOutOfRangeException,
34
BindingSpecification,
45
Connection,
56
ExchangeSpecification,
@@ -31,6 +32,27 @@ def test_publish_queue(connection: Connection) -> None:
3132
assert raised is False
3233

3334

35+
def test_publish_to_invalid_destination(connection: Connection) -> None:
36+
37+
queue_name = "test-queue"
38+
39+
raised = False
40+
41+
publisher = None
42+
try:
43+
publisher = connection.publisher("/invalid-destination/" + queue_name)
44+
publisher.publish(Message(body="test"))
45+
except ArgumentOutOfRangeException:
46+
raised = True
47+
except Exception:
48+
raised = False
49+
50+
if publisher is not None:
51+
publisher.close()
52+
53+
assert raised is True
54+
55+
3456
def test_publish_exchange(connection: Connection) -> None:
3557

3658
exchange_name = "test-exchange"

0 commit comments

Comments
 (0)