Skip to content

Commit 3e626da

Browse files
committed
add 3 lab files
1 parent 810d0cc commit 3e626da

File tree

3 files changed

+78
-0
lines changed

3 files changed

+78
-0
lines changed

emit_log.py

+35
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import asyncio
2+
import sys
3+
4+
from aio_pika import DeliveryMode, ExchangeType, Message, connect
5+
6+
7+
async def main() -> None:
8+
# Perform connection
9+
connection = await connect("amqp://guest:guest@localhost/")
10+
11+
async with connection:
12+
channel = await connection.channel()
13+
14+
logs_exchange = await channel.declare_exchange(
15+
"logs",
16+
ExchangeType.FANOUT,
17+
)
18+
19+
message_body = (
20+
b" ".join(arg.encode() for arg in sys.argv[1:]) or b"Hello World!"
21+
)
22+
23+
message = Message(
24+
message_body,
25+
delivery_mode=DeliveryMode.PERSISTENT,
26+
)
27+
28+
# Sending the message
29+
await logs_exchange.publish(message, routing_key="info")
30+
31+
print(f" [x] Sent {message!r}")
32+
33+
34+
if __name__ == "__main__":
35+
asyncio.run(main())

logs_from_rabbit.log

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
[*] Waiting for logs. To exit press CTRL+C
2+
[x] b'Hello World!'
3+
[x] b'Hello World!'

receive_logs.py

+40
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import asyncio
2+
3+
from aio_pika import ExchangeType, connect
4+
from aio_pika.abc import AbstractIncomingMessage
5+
6+
7+
async def on_message(message: AbstractIncomingMessage) -> None:
8+
async with message.process():
9+
print(f"[x] {message.body!r}")
10+
11+
12+
async def main() -> None:
13+
# Perform connection
14+
connection = await connect("amqp://guest:guest@localhost/")
15+
16+
async with connection:
17+
# Creating a channel
18+
channel = await connection.channel()
19+
await channel.set_qos(prefetch_count=1)
20+
21+
logs_exchange = await channel.declare_exchange(
22+
"logs",
23+
ExchangeType.FANOUT,
24+
)
25+
26+
# Declaring queue
27+
queue = await channel.declare_queue(exclusive=True)
28+
29+
# Binding the queue to the exchange
30+
await queue.bind(logs_exchange)
31+
32+
# Start listening the queue
33+
await queue.consume(on_message)
34+
35+
print(" [*] Waiting for logs. To exit press CTRL+C")
36+
await asyncio.Future()
37+
38+
39+
if __name__ == "__main__":
40+
asyncio.run(main())

0 commit comments

Comments
 (0)