Skip to content

Commit 97978fb

Browse files
committed
update files 4 lesson
1 parent 3e626da commit 97978fb

File tree

2 files changed

+31
-26
lines changed

2 files changed

+31
-26
lines changed

Diff for: emit_log.py

+6-5
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,14 @@ async def main() -> None:
1111
async with connection:
1212
channel = await connection.channel()
1313

14-
logs_exchange = await channel.declare_exchange(
15-
"logs",
16-
ExchangeType.FANOUT,
14+
topic_logs_exchange = await channel.declare_exchange(
15+
"topic_logs",
16+
ExchangeType.TOPIC,
1717
)
18+
routing_key = sys.argv[1] if len(sys.argv) > 2 else "anonymous.info"
1819

1920
message_body = (
20-
b" ".join(arg.encode() for arg in sys.argv[1:]) or b"Hello World!"
21+
b" ".join(arg.encode() for arg in sys.argv[2:]) or b"Hello World!"
2122
)
2223

2324
message = Message(
@@ -26,7 +27,7 @@ async def main() -> None:
2627
)
2728

2829
# Sending the message
29-
await logs_exchange.publish(message, routing_key="info")
30+
await topic_logs_exchange.publish(message, routing_key=routing_key)
3031

3132
print(f" [x] Sent {message!r}")
3233

Diff for: receive_logs.py

+25-21
Original file line numberDiff line numberDiff line change
@@ -1,39 +1,43 @@
11
import asyncio
2+
import sys
23

34
from aio_pika import ExchangeType, connect
45
from aio_pika.abc import AbstractIncomingMessage
56

67

7-
async def on_message(message: AbstractIncomingMessage) -> None:
8-
async with message.process():
9-
print(f"[x] {message.body!r}")
10-
11-
128
async def main() -> None:
139
# Perform connection
1410
connection = await connect("amqp://guest:guest@localhost/")
1511

16-
async with connection:
17-
# Creating a channel
18-
channel = await connection.channel()
19-
await channel.set_qos(prefetch_count=1)
12+
channel = await connection.channel()
13+
await channel.set_qos(prefetch_count=1)
14+
15+
topic_logs_exchange = await channel.declare_exchange(
16+
"topic_logs",
17+
ExchangeType.TOPIC,
18+
)
19+
20+
queue = await channel.declare_queue(
21+
"task_queue",
22+
durable=True,
23+
)
2024

21-
logs_exchange = await channel.declare_exchange(
22-
"logs",
23-
ExchangeType.FANOUT,
24-
)
25+
binding_keys = sys.argv[1:]
2526

26-
# Declaring queue
27-
queue = await channel.declare_queue(exclusive=True)
27+
if not binding_keys:
28+
sys.stderr.write("Usage: %s [binding_key]...\n" % sys.argv[0])
29+
sys.exit(1)
2830

29-
# Binding the queue to the exchange
30-
await queue.bind(logs_exchange)
31+
for binding_key in binding_keys:
32+
await queue.bind(topic_logs_exchange, routing_key=binding_key)
3133

32-
# Start listening the queue
33-
await queue.consume(on_message)
34+
print(" [*] Waiting for messages. To exit press CTRL+C")
3435

35-
print(" [*] Waiting for logs. To exit press CTRL+C")
36-
await asyncio.Future()
36+
async with queue.iterator() as iterator:
37+
message: AbstractIncomingMessage
38+
async for message in iterator:
39+
async with message.process():
40+
print(f" [x] {message.routing_key!r}:{message.body!r}")
3741

3842

3943
if __name__ == "__main__":

0 commit comments

Comments
 (0)