From 26a42d3b054508b98e6b1dd0b205d13931d68213 Mon Sep 17 00:00:00 2001 From: Mikhail Bigun Date: Tue, 29 Oct 2024 13:36:39 +0300 Subject: [PATCH 1/2] add graceful shutdown example --- examples/graceful_shutdown.py | 82 +++++++++++++++++++++++++++++++++++ 1 file changed, 82 insertions(+) create mode 100644 examples/graceful_shutdown.py diff --git a/examples/graceful_shutdown.py b/examples/graceful_shutdown.py new file mode 100644 index 00000000..079f2496 --- /dev/null +++ b/examples/graceful_shutdown.py @@ -0,0 +1,82 @@ +import asyncio +import signal +import sys + +import nats + + +def sig_cb(shutdown_event: asyncio.Event): + print("received shutdown signal") + # Setting shutdown event to true, so other listeners can close + shutdown_event.set() + + +async def nats_consumer(shutdown_event: asyncio.Event): + # Connect to NATS! + try: + nc = await nats.connect( + servers=["nats://127.0.0.1:4222"], + # Setting max rc attempts in order to prevent termination block + max_reconnect_attempts=3, + ) + except Exception as e: + print(e) + return + + # Callback for sub + async def cb(msg): + print("received: ", msg) + + # Receive messages on 'foo' + await nc.subscribe("foo", cb=cb) + + # Wait until shutdown event is set to release resources + await shutdown_event.wait() + + await nc.drain() + await nc.close() + print("gracefully closed nats consumer") + + +async def nats_producer(shutdown_event: asyncio.Event): + # Connect to NATS! + try: + nc = await nats.connect( + servers=["nats://127.0.0.1:4222"], + # Setting max rc attempts in order to prevent termination block + max_reconnect_attempts=3, + ) + except Exception as e: + print(e) + return + + while not shutdown_event.is_set(): + await nc.publish("foo", b"bar") + await asyncio.sleep(1) + + await nc.drain() + await nc.close() + print("gracefully closed nats producer") + + +async def main(shutdown_event: asyncio.Event): + nats_consumer_task = asyncio.create_task(nats_consumer(shutdown_event)) + nats_producer_task = asyncio.create_task(nats_producer(shutdown_event)) + + await asyncio.gather(*[nats_consumer_task, nats_producer_task]) + + sys.exit(0) + + +if __name__ == "__main__": + # Creating shutdown event to notify child routines of a shutdown + shutdown_event = asyncio.Event() + try: + loop = asyncio.new_event_loop() + # Setting signals handlers to catch termination and interruption events + loop.add_signal_handler(signal.SIGTERM, sig_cb, shutdown_event) + loop.add_signal_handler(signal.SIGINT, sig_cb, shutdown_event) + loop.run_until_complete(main(shutdown_event)) + loop.close() + except KeyboardInterrupt: + print("interrupted by ctrl+c") From e46b5bb126356d73bfa57c8bb346462016b0f262 Mon Sep 17 00:00:00 2001 From: Mikhail Bigun Date: Tue, 29 Oct 2024 15:00:23 +0300 Subject: [PATCH 2/2] rm unnecessary conn close eval --- examples/graceful_shutdown.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/examples/graceful_shutdown.py b/examples/graceful_shutdown.py index 079f2496..d67e67ff 100644 --- a/examples/graceful_shutdown.py +++ b/examples/graceful_shutdown.py @@ -34,7 +34,6 @@ async def cb(msg): await shutdown_event.wait() await nc.drain() - await nc.close() print("gracefully closed nats consumer") @@ -55,7 +54,6 @@ async def nats_producer(shutdown_event: asyncio.Event): await asyncio.sleep(1) await nc.drain() - await nc.close() print("gracefully closed nats producer")