From 5514e2a87326aba54c16a38ac82e6fda4fe67e77 Mon Sep 17 00:00:00 2001 From: joeriddles Date: Tue, 11 Mar 2025 22:33:49 -0600 Subject: [PATCH 01/16] Implement basic NATS JetStream transport --- examples/nats_receive.py | 25 ++ examples/nats_send.py | 13 + kombu/transport/__init__.py | 1 + kombu/transport/nats_jetstream.py | 435 ++++++++++++++++++++++++++++++ requirements/extras/nats.txt | 1 + 5 files changed, 475 insertions(+) create mode 100644 examples/nats_receive.py create mode 100644 examples/nats_send.py create mode 100644 kombu/transport/nats_jetstream.py create mode 100644 requirements/extras/nats.txt diff --git a/examples/nats_receive.py b/examples/nats_receive.py new file mode 100644 index 0000000000..9f5d0f541d --- /dev/null +++ b/examples/nats_receive.py @@ -0,0 +1,25 @@ +from __future__ import annotations + +from pprint import pformat + +from kombu import Connection, Consumer, Exchange, Queue, eventloop + +exchange = Exchange("exchange", "direct", durable=False) +msg_queue = Queue("queue", exchange=exchange, routing_key="messages") + + +def pretty(obj): + return pformat(obj, indent=4) + + +def process_msg(body, message): + print(f"Received message: {body!r}") + print(f" properties:\n{pretty(message.properties)}") + print(f" delivery_info:\n{pretty(message.delivery_info)}") + message.ack() + + +with Connection("nats://localhost:4222") as connection: + with Consumer(connection, msg_queue, callbacks=[process_msg]) as consumer: + for msg in eventloop(connection): + pass diff --git a/examples/nats_send.py b/examples/nats_send.py new file mode 100644 index 0000000000..fdf440a9c1 --- /dev/null +++ b/examples/nats_send.py @@ -0,0 +1,13 @@ +from __future__ import annotations + +from kombu import Connection, Exchange, Queue + +exchange = Exchange("exchange", "direct", durable=False) +msg_queue = Queue("queue", exchange=exchange, routing_key="messages") + + +with Connection("nats://localhost:4222") as conn: + producer = conn.Producer() + producer.publish( + "hello world", exchange=exchange, routing_key="messages", declare=[msg_queue] + ) diff --git a/kombu/transport/__init__.py b/kombu/transport/__init__.py index 180a27b4b6..23a9b1e41a 100644 --- a/kombu/transport/__init__.py +++ b/kombu/transport/__init__.py @@ -45,6 +45,7 @@ def supports_librabbitmq() -> bool | None: 'azureservicebus': 'kombu.transport.azureservicebus:Transport', 'pyro': 'kombu.transport.pyro:Transport', 'gcpubsub': 'kombu.transport.gcpubsub:Transport', + 'nats': 'kombu.transport.nats_jetstream:Transport', } _transport_cache = {} diff --git a/kombu/transport/nats_jetstream.py b/kombu/transport/nats_jetstream.py new file mode 100644 index 0000000000..5703d39998 --- /dev/null +++ b/kombu/transport/nats_jetstream.py @@ -0,0 +1,435 @@ +"""NATS JetStream transport module for Kombu. + +NATS JetStream transport using nats-py library. + +**References** + +- https://github.com/nats-io/nats.py +- https://docs.nats.io/nats-concepts/jetstream + +Features +======== +* Type: Virtual +* Supports Direct: Yes +* Supports Topic: Yes +* Supports Fanout: Yes +* Supports Priority: No +* Supports TTL: Yes + +Connection String +================= +Connection string has the following format: + +.. code-block:: + + nats://[USER:PASSWORD@]NATS_ADDRESS[:PORT] + +Transport Options +================= +* ``connection_wait_time_seconds`` - Time in seconds to wait for connection + to succeed. Default ``5`` +* ``wait_time_seconds`` - Time in seconds to wait to receive messages. + Default ``5`` +* ``stream_config`` - Stream configuration. Must be a dict whose key-value pairs + correspond with attributes in the NATS JetStream stream configuration. +* ``consumer_config`` - Consumer configuration. Must be a dict whose key-value pairs + correspond with attributes in the NATS JetStream consumer configuration. +""" + +from __future__ import annotations + +import asyncio +from queue import Empty +from typing import TYPE_CHECKING + +from kombu.transport import virtual +from kombu.utils import cached_property +from kombu.utils.encoding import str_to_bytes +from kombu.utils.json import dumps, loads + +try: + import nats.aio.client + import nats.aio.errors + import nats.errors + from nats.aio.client import Client + from nats.js.api import ( + AckPolicy, + ConsumerConfig, + DeliverPolicy, + DiscardPolicy, + RetentionPolicy, + StorageType, + StreamConfig, + ) + from nats.js.client import JetStreamContext + from nats.js.errors import NotFoundError + + NATS_CONNECTION_ERRORS = ( + nats.aio.errors.ErrConnectionClosed, + nats.aio.errors.ErrTimeout, + nats.aio.errors.ErrNoServers, + ) + NATS_CHANNEL_ERRORS = (NotFoundError,) + +except ImportError: + Client = None + NATS_CONNECTION_ERRORS = NATS_CHANNEL_ERRORS = () + +from kombu.log import get_logger + +logger = get_logger(__name__) + +DEFAULT_PORT = 4222 +DEFAULT_HOST = "localhost" + + +class Message(virtual.Message): + """Message object.""" + + def __init__(self, payload, channel=None, **kwargs): + self.subject = payload["subject"] + self.nats_ack = payload["ack"] + self.nats_nak = payload["nak"] + self.nats_term = payload["term"] + super().__init__(payload, channel=channel, **kwargs) + + +class QoS(virtual.QoS): + """Quality of Service guarantees.""" + + _not_yet_acked = {} + + def can_consume(self): + """Return true if the channel can be consumed from.""" + return not self.prefetch_count or len(self._not_yet_acked) < self.prefetch_count + + def can_consume_max_estimate(self): + if self.prefetch_count: + return self.prefetch_count - len(self._not_yet_acked) + return 1 + + def append(self, message, delivery_tag): + self._not_yet_acked[delivery_tag] = message + + def get(self, delivery_tag): + return self._not_yet_acked[delivery_tag] + + def ack(self, delivery_tag): + if delivery_tag not in self._not_yet_acked: + return + message = self._not_yet_acked.pop(delivery_tag) + self.channel.ack_msg(message) + + def reject(self, delivery_tag, requeue=False): + """Reject a message by delivery tag.""" + if delivery_tag not in self._not_yet_acked: + return + message = self._not_yet_acked.pop(delivery_tag) + if requeue: + self.channel.nak_msg(message) + else: + self.channel.term_msg(message) + + def restore_unacked_once(self, stderr=None): + pass + + +class Channel(virtual.Channel): + """NATS JetStream Channel.""" + + QoS = QoS + Message = Message + + default_wait_time_seconds = 5 + default_connection_wait_time_seconds = 5 + + if TYPE_CHECKING: + _nats_client: Client + _js: JetStreamContext + + def __init__(self, *args, **kwargs): + if Client is None: + raise ImportError("nats-py is not installed") + + super().__init__(*args, **kwargs) + + port = self.connection.client.port or self.connection.default_port + host = self.connection.client.hostname or DEFAULT_HOST + + logger.debug("Host: %s Port: %s", host, port) + + self._event_loop = asyncio.new_event_loop() + + self._nats_client = None + self._js = None + + self._streams = set() + + # Evaluate connection + self.client + + def _get_stream_name(self, queue): + """Get the stream name for a queue.""" + return f"STREAM_{queue}" + + def _get_consumer_name(self, queue): + """Get the consumer name for a queue.""" + return f"CONSUMER_{queue}" + + def _ensure_stream(self, queue): + """Ensure a stream exists for the queue.""" + stream_name = self._get_stream_name(queue) + if stream_name in self._streams: + return + + try: + self._event_loop.run_until_complete(self._js.stream_info(stream_name)) + self._streams.add(stream_name) + return + except NotFoundError: + pass + + stream_config = StreamConfig( + name=stream_name, + # subjects=[f"{queue}.>"], + subjects=[queue], + retention=RetentionPolicy.WORK_QUEUE, + max_consumers=-1, + max_msgs_per_subject=1, + max_msgs=-1, + max_bytes=-1, + max_age=0, + max_msg_size=-1, + storage=StorageType.MEMORY, + discard=DiscardPolicy.OLD, + num_replicas=1, + duplicate_window=120.0, # 2 minutes in seconds + allow_direct=True, # for debugging with nats cli + ) + + # Update with user-provided config + user_cfg = self.options.get("stream_config") or {} + + self._event_loop.run_until_complete( + self._js.add_stream(stream_config, **user_cfg) + ) + self._streams.add(stream_name) + + def _ensure_consumer(self, queue): + """Ensure a consumer exists for the queue.""" + consumer_name = self._get_consumer_name(queue) + if consumer_name in self._consumers: + return + + name = self._get_stream_name(queue) + + consumer_config = ConsumerConfig( + durable_name=consumer_name, + deliver_policy=DeliverPolicy.ALL, + ack_policy=AckPolicy.EXPLICIT, + # filter_subject=f"{queue}.>", + filter_subject=queue, + ) + + # Update with user-provided config + user_cfg = self.options.get("consumer_config") or {} + + self._event_loop.run_until_complete( + self._js.add_consumer(name, consumer_config, **user_cfg) + ) + self._consumers.add(consumer_name) + + def _put(self, queue, message, **kwargs): + """Put a message on a queue.""" + self._ensure_stream(queue) + # subject = f"{queue}.{message.get('id', '')}" + subject = queue + self._event_loop.run_until_complete( + self._js.publish(subject, str_to_bytes(dumps(message))) + ) + + def _get(self, queue, **kwargs): + """Get a message from a queue.""" + self._ensure_stream(queue) + self._ensure_consumer(queue) + + try: + pull_sub = self._event_loop.run_until_complete( + self._js.pull_subscribe( + # f"{queue}.>", + queue, + self._get_consumer_name(queue), + stream=self._get_stream_name(queue), + ) + ) + msg = self._event_loop.run_until_complete( + pull_sub.fetch(1, timeout=self.wait_time_seconds) + )[0] + + body = loads(msg.data.decode()) + body["subject"] = msg.subject + body["ack"] = msg.ack + body["nak"] = msg.nak + body["term"] = msg.term + return body + except (IndexError, nats.errors.TimeoutError): + pass + raise Empty() + + def _delete(self, queue, *args, **kwargs): + """Delete a queue.""" + stream_name = self._get_stream_name(queue) + if stream_name in self._streams: + try: + self._event_loop.run_until_complete(self._js.delete_stream(stream_name)) + self._streams.remove(stream_name) + except NotFoundError: + pass + + def _size(self, queue): + """Return the number of messages in a queue.""" + try: + info = self._event_loop.run_until_complete( + self._js.stream_info(self._get_stream_name(queue)) + ) + return info.state.messages + except NotFoundError: + return 0 + + def _new_queue(self, queue, **kwargs): + """Declare a new queue.""" + self._ensure_stream(queue) + return queue + + def _has_queue(self, queue, **kwargs): + """Check if a queue exists.""" + try: + self._event_loop.run_until_complete( + self._js.stream_info(self._get_stream_name(queue)) + ) + return True + except NotFoundError: + return False + + def _open(self): + """Open a new connection to NATS.""" + if self._nats_client is None: + self._nats_client = Client() + self._event_loop.run_until_complete( + self._nats_client.connect( + f"nats://{self.conninfo.hostname}:{self.conninfo.port or DEFAULT_PORT}", + user=self.conninfo.userid, + password=self.conninfo.password, + connect_timeout=self.connection_wait_time_seconds, + ) + ) + self._js = self._nats_client.jetstream() + return self._nats_client + + @cached_property + def client(self): + """Get the NATS client.""" + return self._open() + + @property + def options(self): + """Get the transport options.""" + return self.connection.client.transport_options + + @property + def conninfo(self): + """Get the connection info.""" + return self.connection.client + + @cached_property + def wait_time_seconds(self): + """Get the wait time in seconds.""" + return float( + self.options.get("wait_time_seconds", self.default_wait_time_seconds) + ) + + @cached_property + def connection_wait_time_seconds(self): + """Get the connection wait time in seconds.""" + return float( + self.options.get( + "connection_wait_time_seconds", + self.default_connection_wait_time_seconds, + ) + ) + + def close(self): + """Close the channel.""" + if self._nats_client is not None: + self._event_loop.run_until_complete(self._nats_client.drain()) + self._event_loop.run_until_complete(self._nats_client.close()) + self._nats_client = None + self._js = None + + pending = asyncio.all_tasks(loop=self._event_loop) + group = asyncio.gather(*pending) + if not group.done(): + self._event_loop.run_until_complete(group) + + self._event_loop.close() + + def ack_msg(self, msg): + self._event_loop.run_until_complete(msg.nats_ack()) + + def nak_msg(self, msg): + self._event_loop.run_until_complete(msg.nats_nak()) + + def term_msg(self, msg): + self._event_loop.run_until_complete(msg.nats_term()) + + +class Transport(virtual.Transport): + """NATS JetStream Transport.""" + + Channel = Channel + + default_port = DEFAULT_PORT + + driver_type = "nats" + driver_name = "nats" + + connection_errors = NATS_CONNECTION_ERRORS + channel_errors = NATS_CHANNEL_ERRORS + + def __init__(self, client, **kwargs): + if Client is None: + raise ImportError("nats-py is not installed") + super().__init__(client, **kwargs) + self._event_loop = asyncio.new_event_loop() + + def drain_events(self, connection, **kwargs): + return super().drain_events(connection, **kwargs) + + def driver_version(self): + """Get the NATS driver version.""" + return nats.aio.client.__version__ + + def establish_connection(self): + """Establish a connection to NATS.""" + return super().establish_connection() + + def close_connection(self, connection): + """Close the connection to NATS.""" + return super().close_connection(connection) + + def verify_connection(self, connection): + """Verify the connection works.""" + port = connection.client.port or self.default_port + host = connection.client.hostname or DEFAULT_HOST + + logger.debug("Verify NATS connection to nats://%s:%s", host, port) + + client = Client() + try: + self._event_loop.run_until_complete(client.connect(f"nats://{host}:{port}")) + self._event_loop.run_until_complete(client.close()) + return True + except ValueError: + pass + + return False diff --git a/requirements/extras/nats.txt b/requirements/extras/nats.txt new file mode 100644 index 0000000000..cc447d6c2f --- /dev/null +++ b/requirements/extras/nats.txt @@ -0,0 +1 @@ +nats-py[nkeys]==2.9.0 From 5a43ee2cfba52461579b1c849b4c9e6146b7588b Mon Sep 17 00:00:00 2001 From: joeriddles Date: Fri, 16 May 2025 09:11:29 -0600 Subject: [PATCH 02/16] Add NATS demo server to examples --- examples/nats_receive.py | 14 ++++++++++++-- examples/nats_send.py | 21 +++++++++++++++++++-- kombu/transport/nats_jetstream.py | 4 ---- 3 files changed, 31 insertions(+), 8 deletions(-) diff --git a/examples/nats_receive.py b/examples/nats_receive.py index 9f5d0f541d..9ed104f068 100644 --- a/examples/nats_receive.py +++ b/examples/nats_receive.py @@ -1,11 +1,21 @@ from __future__ import annotations +import sys from pprint import pformat from kombu import Connection, Consumer, Exchange, Queue, eventloop +LOCAL_SERVER = "localhost" +DEMO_SERVER = "demo.nats.io" + +server = LOCAL_SERVER +use_demo_server = len(sys.argv) > 1 and sys.argv[1] == "--demo" +if use_demo_server: + server = DEMO_SERVER + + exchange = Exchange("exchange", "direct", durable=False) -msg_queue = Queue("queue", exchange=exchange, routing_key="messages") +msg_queue = Queue("kombu_demo", exchange=exchange, routing_key="messages") def pretty(obj): @@ -19,7 +29,7 @@ def process_msg(body, message): message.ack() -with Connection("nats://localhost:4222") as connection: +with Connection(f"nats://{server}:4222") as connection: with Consumer(connection, msg_queue, callbacks=[process_msg]) as consumer: for msg in eventloop(connection): pass diff --git a/examples/nats_send.py b/examples/nats_send.py index fdf440a9c1..e798ec927c 100644 --- a/examples/nats_send.py +++ b/examples/nats_send.py @@ -1,12 +1,29 @@ from __future__ import annotations +import sys + +from nats.js.api import StorageType + from kombu import Connection, Exchange, Queue +LOCAL_SERVER = "localhost" +DEMO_SERVER = "demo.nats.io" + +server = LOCAL_SERVER +use_demo_server = len(sys.argv) > 1 and sys.argv[1] == "--demo" +if use_demo_server: + server = DEMO_SERVER + + exchange = Exchange("exchange", "direct", durable=False) -msg_queue = Queue("queue", exchange=exchange, routing_key="messages") +msg_queue = Queue("kombu_demo", exchange=exchange, routing_key="messages") -with Connection("nats://localhost:4222") as conn: +with Connection(f"nats://{server}:4222", transport_options={ + "stream_config": { + "storage": StorageType.FILE, + } +}) as conn: producer = conn.Producer() producer.publish( "hello world", exchange=exchange, routing_key="messages", declare=[msg_queue] diff --git a/kombu/transport/nats_jetstream.py b/kombu/transport/nats_jetstream.py index 5703d39998..2ee5d47e5f 100644 --- a/kombu/transport/nats_jetstream.py +++ b/kombu/transport/nats_jetstream.py @@ -191,7 +191,6 @@ def _ensure_stream(self, queue): stream_config = StreamConfig( name=stream_name, - # subjects=[f"{queue}.>"], subjects=[queue], retention=RetentionPolicy.WORK_QUEUE, max_consumers=-1, @@ -227,7 +226,6 @@ def _ensure_consumer(self, queue): durable_name=consumer_name, deliver_policy=DeliverPolicy.ALL, ack_policy=AckPolicy.EXPLICIT, - # filter_subject=f"{queue}.>", filter_subject=queue, ) @@ -242,7 +240,6 @@ def _ensure_consumer(self, queue): def _put(self, queue, message, **kwargs): """Put a message on a queue.""" self._ensure_stream(queue) - # subject = f"{queue}.{message.get('id', '')}" subject = queue self._event_loop.run_until_complete( self._js.publish(subject, str_to_bytes(dumps(message))) @@ -256,7 +253,6 @@ def _get(self, queue, **kwargs): try: pull_sub = self._event_loop.run_until_complete( self._js.pull_subscribe( - # f"{queue}.>", queue, self._get_consumer_name(queue), stream=self._get_stream_name(queue), From 943f310e5cb02227d858616ae72224cb94c8d54c Mon Sep 17 00:00:00 2001 From: joeriddles Date: Mon, 2 Jun 2025 21:22:58 -0500 Subject: [PATCH 03/16] Rename module to nats.py --- kombu/transport/__init__.py | 2 +- kombu/transport/{nats_jetstream.py => nats.py} | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename kombu/transport/{nats_jetstream.py => nats.py} (100%) diff --git a/kombu/transport/__init__.py b/kombu/transport/__init__.py index 23a9b1e41a..1a122f34bc 100644 --- a/kombu/transport/__init__.py +++ b/kombu/transport/__init__.py @@ -45,7 +45,7 @@ def supports_librabbitmq() -> bool | None: 'azureservicebus': 'kombu.transport.azureservicebus:Transport', 'pyro': 'kombu.transport.pyro:Transport', 'gcpubsub': 'kombu.transport.gcpubsub:Transport', - 'nats': 'kombu.transport.nats_jetstream:Transport', + 'nats': 'kombu.transport.nats:Transport', } _transport_cache = {} diff --git a/kombu/transport/nats_jetstream.py b/kombu/transport/nats.py similarity index 100% rename from kombu/transport/nats_jetstream.py rename to kombu/transport/nats.py From 793fb97b518a79e5a61befacc2fbedcf8d3b3c19 Mon Sep 17 00:00:00 2001 From: joeriddles Date: Mon, 2 Jun 2025 21:26:19 -0500 Subject: [PATCH 04/16] Use range for nats dependency --- requirements/extras/nats.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements/extras/nats.txt b/requirements/extras/nats.txt index cc447d6c2f..f92a4733f0 100644 --- a/requirements/extras/nats.txt +++ b/requirements/extras/nats.txt @@ -1 +1 @@ -nats-py[nkeys]==2.9.0 +nats-py[nkeys]>=2.9.0,<3.0.0 From 9daa8347b6219d298141e481d85a755341960919 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 3 Jun 2025 02:23:21 +0000 Subject: [PATCH 05/16] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- kombu/transport/nats.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/kombu/transport/nats.py b/kombu/transport/nats.py index 2ee5d47e5f..33b207712f 100644 --- a/kombu/transport/nats.py +++ b/kombu/transport/nats.py @@ -52,15 +52,9 @@ import nats.aio.errors import nats.errors from nats.aio.client import Client - from nats.js.api import ( - AckPolicy, - ConsumerConfig, - DeliverPolicy, - DiscardPolicy, - RetentionPolicy, - StorageType, - StreamConfig, - ) + from nats.js.api import (AckPolicy, ConsumerConfig, DeliverPolicy, + DiscardPolicy, RetentionPolicy, StorageType, + StreamConfig) from nats.js.client import JetStreamContext from nats.js.errors import NotFoundError From b147b4598848c60007f95d66cf850a5bfb2cd877 Mon Sep 17 00:00:00 2001 From: joeriddles Date: Tue, 10 Jun 2025 20:47:36 -0600 Subject: [PATCH 06/16] Add NATS integration tests --- t/integration/test_nats.py | 149 +++++++++++++++++++++++++++++++++++++ tox.ini | 15 ++++ 2 files changed, 164 insertions(+) create mode 100644 t/integration/test_nats.py diff --git a/t/integration/test_nats.py b/t/integration/test_nats.py new file mode 100644 index 0000000000..dc504a9658 --- /dev/null +++ b/t/integration/test_nats.py @@ -0,0 +1,149 @@ +from __future__ import annotations + +from unittest.mock import AsyncMock, Mock, patch + +import pytest + +from kombu import Connection +from kombu.transport import nats + +pytest.importorskip('nats') + + +class test_Channel: + def setup_method(self): + self.connection = self.create_connection() + self.channel = self.connection.default_channel + + def create_connection(self, **kwargs): + return Connection(transport=nats.Transport, connect_timeout=60, **kwargs) + + def teardown_method(self): + self.connection.close() + + def test_get_returns_message(self): + message = {'body': 'test message'} + self.channel._put('test_queue', message) + result = self.channel._get('test_queue') + assert result['body'] == 'test message' + + def test_delete_removes_queue(self): + self.channel._put('test_queue', {'body': 'test'}) + self.channel._delete('test_queue') + assert not self.channel._has_queue('test_queue') + + def test_size_returns_queue_size(self): + self.channel._put('test_queue', {'body': 'test1'}) + self.channel._put('test_queue', {'body': 'test2'}) + assert self.channel._size('test_queue') == 2 + + def test_new_queue_creates_queue(self): + queue = self.channel._new_queue('test_queue') + assert queue == 'test_queue' + assert self.channel._has_queue('test_queue') + + def test_has_queue_returns_true_for_existing_queue(self): + self.channel._new_queue('test_queue') + assert self.channel._has_queue('test_queue') + + def test_has_queue_returns_false_for_nonexistent_queue(self): + assert not self.channel._has_queue('nonexistent_queue') + + def test_ack_msg_acknowledges_message(self): + message = AsyncMock() + self.channel.ack_msg(message) + message.nats_ack.assert_called_once() + + def test_nak_msg_negatively_acknowledges_message(self): + message = AsyncMock() + self.channel.nak_msg(message) + message.nats_nak.assert_called_once() + + def test_term_msg_terminates_message(self): + message = AsyncMock() + self.channel.term_msg(message) + message.nats_term.assert_called_once() + + def test_custom_stream_config(self): + stream_config = { + 'max_msgs': 1000, + 'max_bytes': 1024 * 1024, + 'max_age': 3600, + } + conn = self.create_connection(transport_options={'stream_config': stream_config}) + channel = conn.default_channel + assert channel.options['stream_config'] == stream_config + + def test_custom_consumer_config(self): + consumer_config = { + 'ack_policy': 'explicit', + 'deliver_policy': 'all', + } + conn = self.create_connection(transport_options={'consumer_config': consumer_config}) + channel = conn.default_channel + assert channel.options['consumer_config'] == consumer_config + + def test_custom_wait_time(self): + wait_time = 10 + conn = self.create_connection(transport_options={'wait_time_seconds': wait_time}) + channel = conn.default_channel + assert channel.wait_time_seconds == wait_time + + def test_custom_connection_wait_time(self): + wait_time = 15 + conn = self.create_connection(transport_options={'connection_wait_time_seconds': wait_time}) + channel = conn.default_channel + assert channel.connection_wait_time_seconds == wait_time + + +class test_Transport: + def setup_method(self): + self.client = Mock() + self.client.transport_options = {} + self.transport = nats.Transport(self.client) + + def test_driver_version(self): + assert self.transport.driver_version() + + @patch('kombu.transport.nats.Client') + def test_verify_connection(self, mock_client_class): + mock_client = AsyncMock() + mock_client_class.return_value = mock_client + mock_client.connect.return_value = None + mock_client.close.return_value = None + + connection = Mock() + connection.client.port = None + connection.client.hostname = None + + assert self.transport.verify_connection(connection) + mock_client.connect.assert_called_once() + mock_client.close.assert_called_once() + + @patch('kombu.transport.nats.Client') + def test_verify_connection_fails(self, mock_client_class): + mock_client = AsyncMock() + mock_client_class.return_value = mock_client + mock_client.connect.side_effect = ValueError + + connection = Mock() + connection.client.port = None + connection.client.hostname = None + + assert not self.transport.verify_connection(connection) + mock_client.connect.assert_called_once() + + def test_connection_errors(self): + assert self.transport.connection_errors == nats.NATS_CONNECTION_ERRORS + + def test_channel_errors(self): + assert self.transport.channel_errors == nats.NATS_CHANNEL_ERRORS + + def test_default_port(self): + assert self.transport.default_port == nats.DEFAULT_PORT + + def test_driver_type(self): + assert self.transport.driver_type == 'nats' + + def test_driver_name(self): + assert self.transport.driver_name == 'nats' diff --git a/tox.ini b/tox.ini index b2154cc435..d9bb34e8ad 100644 --- a/tox.ini +++ b/tox.ini @@ -5,6 +5,7 @@ envlist = {pypy3.10,3.9,3.10,3.11,3.12,3.13}-linux-integration-redis {pypy3.10,3.9,3.10,3.11,3.12,3.13}-linux-integration-mongodb {pypy3.10,3.9,3.10,3.11,3.12,3.13}-linux-integration-kafka + {pypy3.10,3.9,3.10,3.11,3.12,3.13}-linux-integration-nats flake8 apicheck pydocstyle @@ -36,6 +37,7 @@ deps= apicheck,pypy3.10,3.9,3.10,3.11,3.12,3.13: -r{toxinidir}/requirements/test.txt apicheck,pypy3.10,3.9,3.10,3.11,3.12,3.13: -r{toxinidir}/requirements/test-ci.txt apicheck,3.9-linux,3.10-linux,3.11-linux,3.12-linux,3.13-linux: -r{toxinidir}/requirements/extras/confluentkafka.txt + apicheck,3.9-linux,3.10-linux,3.11-linux,3.12-linux,3.13-linux: -r{toxinidir}/requirements/extras/nats.txt apicheck,linkcheck: -r{toxinidir}/requirements/docs.txt flake8,pydocstyle,mypy: -r{toxinidir}/requirements/pkgutils.txt integration: -r{toxinidir}/requirements/test-integration.txt @@ -46,6 +48,7 @@ commands = integration-redis: pytest -xv -E redis t/integration -n auto --reruns 2 --reruns-delay 1 {posargs} integration-mongodb: pytest -xv -E mongodb t/integration -n auto --reruns 2 --reruns-delay 1 {posargs} integration-kafka: pytest -xv -E kafka t/integration -n auto --reruns 2 --reruns-delay 1 {posargs} + integration-nats: pytest -xv -E nats t/integration -n auto --reruns 2 --reruns-delay 1 {posargs} basepython = pypy3: pypy3 @@ -64,6 +67,7 @@ docker = integration-mongodb: mongodb integration-kafka: zookeeper integration-kafka: kafka + integration-nats: nats dockerenv = PYAMQP_INTEGRATION_INSTANCE=1 @@ -123,6 +127,16 @@ environment = KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 ALLOW_PLAINTEXT_LISTENER=yes +[docker:nats] +image = nats:alpine +command = nats-server -js +ports = 4222:4222/tcp +healthcheck_cmd = wget http://localhost:8222/healthz -q -S -O - +healthcheck_interval = 5 +healthcheck_timeout = 10 +healthcheck_retries = 3s +healthcheck_start_period = 3 + [testenv:apicheck] commands = pip install -U -r{toxinidir}/requirements/dev.txt sphinx-build -j2 -b apicheck -d {envtmpdir}/doctrees docs docs/_build/apicheck @@ -167,4 +181,5 @@ commands = 3.13-linux-integration-redis,\ 3.13-linux-integration-mongodb,\ 3.13-linux-integration-kafka \ + 3.13-linux-integration-nats \ -p -o -- --exitfirst {posargs} From 0b21e60e5354e0a9cf8f91354f4781899352f169 Mon Sep 17 00:00:00 2001 From: joeriddles Date: Tue, 10 Jun 2025 20:48:14 -0600 Subject: [PATCH 07/16] Fix default NATS max_msgs_per_subject --- kombu/transport/nats.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kombu/transport/nats.py b/kombu/transport/nats.py index 33b207712f..ee1eca195d 100644 --- a/kombu/transport/nats.py +++ b/kombu/transport/nats.py @@ -188,7 +188,7 @@ def _ensure_stream(self, queue): subjects=[queue], retention=RetentionPolicy.WORK_QUEUE, max_consumers=-1, - max_msgs_per_subject=1, + max_msgs_per_subject=-1, max_msgs=-1, max_bytes=-1, max_age=0, From 1bacb070d781a419d98b8a4a4ab9241482b35512 Mon Sep 17 00:00:00 2001 From: joeriddles Date: Tue, 10 Jun 2025 20:51:42 -0600 Subject: [PATCH 08/16] Fix NATS healthcheck_retries --- tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index d9bb34e8ad..831efa20e7 100644 --- a/tox.ini +++ b/tox.ini @@ -134,7 +134,7 @@ ports = 4222:4222/tcp healthcheck_cmd = wget http://localhost:8222/healthz -q -S -O - healthcheck_interval = 5 healthcheck_timeout = 10 -healthcheck_retries = 3s +healthcheck_retries = 3 healthcheck_start_period = 3 [testenv:apicheck] From 1a320abfa95892c4db6fb278f3025341b2fa5fa9 Mon Sep 17 00:00:00 2001 From: joeriddles Date: Tue, 10 Jun 2025 21:25:21 -0600 Subject: [PATCH 09/16] Fix NATS integration test Dockerfile --- t/integration/docker/Dockerfile.nats | 2 ++ tox.ini | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) create mode 100644 t/integration/docker/Dockerfile.nats diff --git a/t/integration/docker/Dockerfile.nats b/t/integration/docker/Dockerfile.nats new file mode 100644 index 0000000000..c47451b12d --- /dev/null +++ b/t/integration/docker/Dockerfile.nats @@ -0,0 +1,2 @@ +FROM nats:2.11-alpine +CMD ["nats-server", "--jetstream", "--http_port", "8222", "--debug"] diff --git a/tox.ini b/tox.ini index 831efa20e7..d4e8a1c13f 100644 --- a/tox.ini +++ b/tox.ini @@ -128,8 +128,8 @@ environment = ALLOW_PLAINTEXT_LISTENER=yes [docker:nats] -image = nats:alpine -command = nats-server -js +dockerfile = {toxinidir}/t/integration/docker/Dockerfile.nats +command = nats-server --jetstream --http_port 8222 --debug ports = 4222:4222/tcp healthcheck_cmd = wget http://localhost:8222/healthz -q -S -O - healthcheck_interval = 5 From 722111a933fe8263028d3cb1715c9f5d7c4b9339 Mon Sep 17 00:00:00 2001 From: joeriddles Date: Tue, 10 Jun 2025 21:28:38 -0600 Subject: [PATCH 10/16] Improve NATS event loop logic --- kombu/transport/nats.py | 166 +++++++++++++++++++++++++++++----------- 1 file changed, 120 insertions(+), 46 deletions(-) diff --git a/kombu/transport/nats.py b/kombu/transport/nats.py index ee1eca195d..096e6250f2 100644 --- a/kombu/transport/nats.py +++ b/kombu/transport/nats.py @@ -40,7 +40,6 @@ import asyncio from queue import Empty -from typing import TYPE_CHECKING from kombu.transport import virtual from kombu.utils import cached_property @@ -51,19 +50,19 @@ import nats.aio.client import nats.aio.errors import nats.errors + import nats.js.errors from nats.aio.client import Client from nats.js.api import (AckPolicy, ConsumerConfig, DeliverPolicy, DiscardPolicy, RetentionPolicy, StorageType, StreamConfig) from nats.js.client import JetStreamContext - from nats.js.errors import NotFoundError NATS_CONNECTION_ERRORS = ( nats.aio.errors.ErrConnectionClosed, nats.aio.errors.ErrTimeout, nats.aio.errors.ErrNoServers, ) - NATS_CHANNEL_ERRORS = (NotFoundError,) + NATS_CHANNEL_ERRORS = (nats.js.errors.NotFoundError,) except ImportError: Client = None @@ -76,6 +75,17 @@ DEFAULT_PORT = 4222 DEFAULT_HOST = "localhost" +_event_loop: asyncio.AbstractEventLoop | None = None + + +def get_event_loop() -> asyncio.AbstractEventLoop: + """Get or create the global event loop.""" + global _event_loop + if _event_loop is None: + _event_loop = asyncio.new_event_loop() + asyncio.set_event_loop(_event_loop) + return _event_loop + class Message(virtual.Message): """Message object.""" @@ -137,10 +147,6 @@ class Channel(virtual.Channel): default_wait_time_seconds = 5 default_connection_wait_time_seconds = 5 - if TYPE_CHECKING: - _nats_client: Client - _js: JetStreamContext - def __init__(self, *args, **kwargs): if Client is None: raise ImportError("nats-py is not installed") @@ -152,11 +158,8 @@ def __init__(self, *args, **kwargs): logger.debug("Host: %s Port: %s", host, port) - self._event_loop = asyncio.new_event_loop() - - self._nats_client = None - self._js = None - + self._nats_client: Client | None = None + self._js: JetStreamContext | None = None self._streams = set() # Evaluate connection @@ -176,13 +179,25 @@ def _ensure_stream(self, queue): if stream_name in self._streams: return + if self._js is None: + raise RuntimeError("JetStream context not initialized") + + # First try to get stream info with a shorter timeout try: - self._event_loop.run_until_complete(self._js.stream_info(stream_name)) + loop = get_event_loop() + loop.run_until_complete( + asyncio.wait_for( + self._js.stream_info(stream_name), + timeout=1.0 # Use a shorter timeout for the check + ) + ) self._streams.add(stream_name) return - except NotFoundError: + except (nats.js.errors.NotFoundError, nats.errors.TimeoutError): + # Stream doesn't exist or timed out, we'll create it pass + # Create the stream with a longer timeout stream_config = StreamConfig( name=stream_name, subjects=[queue], @@ -203,10 +218,27 @@ def _ensure_stream(self, queue): # Update with user-provided config user_cfg = self.options.get("stream_config") or {} - self._event_loop.run_until_complete( - self._js.add_stream(stream_config, **user_cfg) - ) - self._streams.add(stream_name) + try: + loop = get_event_loop() + loop.run_until_complete( + asyncio.wait_for( + self._js.add_stream(stream_config, **user_cfg), + timeout=5.0 # Use a longer timeout for creation + ) + ) + self._streams.add(stream_name) + except nats.errors.TimeoutError: + # If we timeout creating the stream, check if it was actually created + try: + loop.run_until_complete( + asyncio.wait_for( + self._js.stream_info(stream_name), + timeout=1.0 + ) + ) + self._streams.add(stream_name) + except (nats.js.errors.NotFoundError, nats.errors.TimeoutError): + raise RuntimeError(f"Failed to create stream {stream_name}") def _ensure_consumer(self, queue): """Ensure a consumer exists for the queue.""" @@ -214,6 +246,9 @@ def _ensure_consumer(self, queue): if consumer_name in self._consumers: return + if self._js is None: + raise RuntimeError("JetStream context not initialized") + name = self._get_stream_name(queue) consumer_config = ConsumerConfig( @@ -226,16 +261,36 @@ def _ensure_consumer(self, queue): # Update with user-provided config user_cfg = self.options.get("consumer_config") or {} - self._event_loop.run_until_complete( - self._js.add_consumer(name, consumer_config, **user_cfg) - ) - self._consumers.add(consumer_name) + try: + loop = get_event_loop() + loop.run_until_complete( + asyncio.wait_for( + self._js.add_consumer(name, consumer_config, **user_cfg), + timeout=5.0 # Use a longer timeout for consumer creation + ) + ) + self._consumers.add(consumer_name) + except nats.errors.TimeoutError: + # If we timeout creating the consumer, check if it was actually created + try: + loop.run_until_complete( + asyncio.wait_for( + self._js.consumer_info(name, consumer_name), + timeout=1.0 + ) + ) + self._consumers.add(consumer_name) + except (nats.js.errors.NotFoundError, nats.errors.TimeoutError): + raise RuntimeError(f"Failed to create consumer {consumer_name} for stream {name}") def _put(self, queue, message, **kwargs): """Put a message on a queue.""" self._ensure_stream(queue) + if self._js is None: + raise RuntimeError("JetStream context not initialized") + subject = queue - self._event_loop.run_until_complete( + get_event_loop().run_until_complete( self._js.publish(subject, str_to_bytes(dumps(message))) ) @@ -244,15 +299,18 @@ def _get(self, queue, **kwargs): self._ensure_stream(queue) self._ensure_consumer(queue) + if self._js is None: + raise RuntimeError("JetStream context not initialized") + try: - pull_sub = self._event_loop.run_until_complete( + pull_sub = get_event_loop().run_until_complete( self._js.pull_subscribe( queue, self._get_consumer_name(queue), stream=self._get_stream_name(queue), ) ) - msg = self._event_loop.run_until_complete( + msg = get_event_loop().run_until_complete( pull_sub.fetch(1, timeout=self.wait_time_seconds) )[0] @@ -270,20 +328,26 @@ def _delete(self, queue, *args, **kwargs): """Delete a queue.""" stream_name = self._get_stream_name(queue) if stream_name in self._streams: + if self._js is None: + raise RuntimeError("JetStream context not initialized") + try: - self._event_loop.run_until_complete(self._js.delete_stream(stream_name)) + get_event_loop().run_until_complete(self._js.delete_stream(stream_name)) self._streams.remove(stream_name) - except NotFoundError: + except (nats.js.errors.NotFoundError): pass def _size(self, queue): """Return the number of messages in a queue.""" + if self._js is None: + raise RuntimeError("JetStream context not initialized") + try: - info = self._event_loop.run_until_complete( + info = get_event_loop().run_until_complete( self._js.stream_info(self._get_stream_name(queue)) ) return info.state.messages - except NotFoundError: + except nats.js.errors.NotFoundError: return 0 def _new_queue(self, queue, **kwargs): @@ -293,19 +357,25 @@ def _new_queue(self, queue, **kwargs): def _has_queue(self, queue, **kwargs): """Check if a queue exists.""" + if self._js is None: + raise RuntimeError("JetStream context not initialized") + try: - self._event_loop.run_until_complete( + get_event_loop().run_until_complete( self._js.stream_info(self._get_stream_name(queue)) ) return True - except NotFoundError: + except (nats.js.errors.NotFoundError, nats.errors.TimeoutError): return False def _open(self): """Open a new connection to NATS.""" if self._nats_client is None: self._nats_client = Client() - self._event_loop.run_until_complete( + if self._nats_client is None: + raise RuntimeError("Failed to create NATS client") + + get_event_loop().run_until_complete( self._nats_client.connect( f"nats://{self.conninfo.hostname}:{self.conninfo.port or DEFAULT_PORT}", user=self.conninfo.userid, @@ -351,26 +421,30 @@ def connection_wait_time_seconds(self): def close(self): """Close the channel.""" if self._nats_client is not None: - self._event_loop.run_until_complete(self._nats_client.drain()) - self._event_loop.run_until_complete(self._nats_client.close()) + loop = get_event_loop() + loop.run_until_complete(self._nats_client.drain()) + loop.run_until_complete(self._nats_client.close()) self._nats_client = None self._js = None - pending = asyncio.all_tasks(loop=self._event_loop) - group = asyncio.gather(*pending) - if not group.done(): - self._event_loop.run_until_complete(group) - - self._event_loop.close() + # Cancel any pending tasks + loop = get_event_loop() + for task in asyncio.all_tasks(loop): + if not task.done(): + task.cancel() + try: + loop.run_until_complete(task) + except asyncio.CancelledError: + pass def ack_msg(self, msg): - self._event_loop.run_until_complete(msg.nats_ack()) + get_event_loop().run_until_complete(msg.nats_ack()) def nak_msg(self, msg): - self._event_loop.run_until_complete(msg.nats_nak()) + get_event_loop().run_until_complete(msg.nats_nak()) def term_msg(self, msg): - self._event_loop.run_until_complete(msg.nats_term()) + get_event_loop().run_until_complete(msg.nats_term()) class Transport(virtual.Transport): @@ -390,7 +464,6 @@ def __init__(self, client, **kwargs): if Client is None: raise ImportError("nats-py is not installed") super().__init__(client, **kwargs) - self._event_loop = asyncio.new_event_loop() def drain_events(self, connection, **kwargs): return super().drain_events(connection, **kwargs) @@ -416,8 +489,9 @@ def verify_connection(self, connection): client = Client() try: - self._event_loop.run_until_complete(client.connect(f"nats://{host}:{port}")) - self._event_loop.run_until_complete(client.close()) + loop = get_event_loop() + loop.run_until_complete(client.connect(f"nats://{host}:{port}")) + loop.run_until_complete(client.close()) return True except ValueError: pass From e78076cd9a0cda06a88f288e408d9108e049af4e Mon Sep 17 00:00:00 2001 From: joeriddles Date: Tue, 10 Jun 2025 21:34:03 -0600 Subject: [PATCH 11/16] Add NATS doc --- docs/reference/kombu.transport.nats.rst | 31 +++++++++++++++++++++++++ 1 file changed, 31 insertions(+) create mode 100644 docs/reference/kombu.transport.nats.rst diff --git a/docs/reference/kombu.transport.nats.rst b/docs/reference/kombu.transport.nats.rst new file mode 100644 index 0000000000..2341614b15 --- /dev/null +++ b/docs/reference/kombu.transport.nats.rst @@ -0,0 +1,31 @@ +================================================ + NATS Transport - ``kombu.transport.nats`` +================================================ + +.. currentmodule:: kombu.transport.nats + +.. automodule:: kombu.transport.nats + + .. contents:: + :local: + + Transport + --------- + + .. autoclass:: Transport + :members: + :undoc-members: + + Channel + ------- + + .. autoclass:: Channel + :members: + :undoc-members: + + Message + ------- + + .. autoclass:: Message + :members: + :undoc-members: From a12fac67d5df12a503e3bca9cab286a7789742f2 Mon Sep 17 00:00:00 2001 From: joeriddles Date: Fri, 17 Oct 2025 14:26:56 -0600 Subject: [PATCH 12/16] Fix codespell --- t/unit/transport/test_azureservicebus.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/t/unit/transport/test_azureservicebus.py b/t/unit/transport/test_azureservicebus.py index 111eab68ec..73dbefeece 100644 --- a/t/unit/transport/test_azureservicebus.py +++ b/t/unit/transport/test_azureservicebus.py @@ -362,7 +362,7 @@ def test_custom_entity_name(): assert channel.entity_name('test-celery') == 'test-celery' assert channel.entity_name('test.celery') == 'test-celery' - # all other punctuations replaced by underscores + # all other punctuation replaced by underscores assert channel.entity_name('test_celery') == 'test_celery' assert channel.entity_name('test:celery') == 'test_celery' assert channel.entity_name('test+celery') == 'test_celery' From f45fbfc5e50f9a32dda8526d41595fe3564f6727 Mon Sep 17 00:00:00 2001 From: joeriddles Date: Fri, 17 Oct 2025 14:54:48 -0600 Subject: [PATCH 13/16] Fix nats integration test not running in tox --- t/integration/test_nats.py | 4 ++++ tox.ini | 4 ++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/t/integration/test_nats.py b/t/integration/test_nats.py index dc504a9658..9a928bd731 100644 --- a/t/integration/test_nats.py +++ b/t/integration/test_nats.py @@ -10,6 +10,8 @@ pytest.importorskip('nats') +@pytest.mark.env('nats') +@pytest.mark.flaky(reruns=5, reruns_delay=2) class test_Channel: def setup_method(self): self.connection = self.create_connection() @@ -96,6 +98,8 @@ def test_custom_connection_wait_time(self): assert channel.connection_wait_time_seconds == wait_time +@pytest.mark.env('nats') +@pytest.mark.flaky(reruns=5, reruns_delay=2) class test_Transport: def setup_method(self): self.client = Mock() diff --git a/tox.ini b/tox.ini index d4e8a1c13f..82dbdfc879 100644 --- a/tox.ini +++ b/tox.ini @@ -36,8 +36,8 @@ deps= apicheck,pypy3.10,3.9,3.10,3.11,3.12,3.13: -r{toxinidir}/requirements/default.txt apicheck,pypy3.10,3.9,3.10,3.11,3.12,3.13: -r{toxinidir}/requirements/test.txt apicheck,pypy3.10,3.9,3.10,3.11,3.12,3.13: -r{toxinidir}/requirements/test-ci.txt - apicheck,3.9-linux,3.10-linux,3.11-linux,3.12-linux,3.13-linux: -r{toxinidir}/requirements/extras/confluentkafka.txt - apicheck,3.9-linux,3.10-linux,3.11-linux,3.12-linux,3.13-linux: -r{toxinidir}/requirements/extras/nats.txt + apicheck,3.9-linux,3.10-linux,3.11-linux,3.12-linux,3.13-linux: -r{toxinidir}/requirements/extras/confluentkafka.txt -r{toxinidir}/requirements/extras/nats.txt + integration-nats: -r{toxinidir}/requirements/extras/nats.txt apicheck,linkcheck: -r{toxinidir}/requirements/docs.txt flake8,pydocstyle,mypy: -r{toxinidir}/requirements/pkgutils.txt integration: -r{toxinidir}/requirements/test-integration.txt From 0428d5aeb37563150942da01bb2fff1a72521ed5 Mon Sep 17 00:00:00 2001 From: joeriddles Date: Thu, 23 Oct 2025 09:53:20 -0600 Subject: [PATCH 14/16] Add NATS back to tox --- tox.ini | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index 75287a130c..8cf8d78c9c 100644 --- a/tox.ini +++ b/tox.ini @@ -37,7 +37,7 @@ deps= apicheck,pypy3.10,3.9,3.10,3.11,3.12,3.13,3.14,3.14t: -r{toxinidir}/requirements/default.txt apicheck,pypy3.10,3.9,3.10,3.11,3.12,3.13,3.14,3.14t: -r{toxinidir}/requirements/test.txt apicheck,pypy3.10,3.9,3.10,3.11,3.12,3.13,3.14,3.14t: -r{toxinidir}/requirements/test-ci.txt - apicheck,3.9-linux,3.10-linux,3.11-linux,3.12-linux,3.13-linux,3.14-linux,3.14t-linux: -r{toxinidir}/requirements/extras/confluentkafka.txt + apicheck,3.9-linux,3.10-linux,3.11-linux,3.12-linux,3.13-linux,3.14-linux,3.14t-linux: -r{toxinidir}/requirements/extras/confluentkafka.txt -r{toxinidir}/requirements/extras/nats.txt integration-nats: -r{toxinidir}/requirements/extras/nats.txt apicheck,linkcheck: -r{toxinidir}/requirements/docs.txt From ee076ade349ee022e01177748916f565e4a526b5 Mon Sep 17 00:00:00 2001 From: joeriddles Date: Thu, 23 Oct 2025 09:54:57 -0600 Subject: [PATCH 15/16] Fix flake8 blank line contains whitespace --- t/unit/transport/test_azureservicebus.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/t/unit/transport/test_azureservicebus.py b/t/unit/transport/test_azureservicebus.py index c505090642..69ea61b90d 100644 --- a/t/unit/transport/test_azureservicebus.py +++ b/t/unit/transport/test_azureservicebus.py @@ -361,7 +361,7 @@ def test_custom_entity_name(): # dashes allowed and dots replaced by dashes assert channel.entity_name('test-celery') == 'test-celery' assert channel.entity_name('test.celery') == 'test-celery' - + # all other punctuation is replaced by underscores assert channel.entity_name('test_celery') == 'test_celery' assert channel.entity_name('test:celery') == 'test_celery' From b254371579af5c59e26cb976d87c2ceaec3bd2a6 Mon Sep 17 00:00:00 2001 From: joeriddles Date: Thu, 23 Oct 2025 10:44:18 -0600 Subject: [PATCH 16/16] fix tox --- tox.ini | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tox.ini b/tox.ini index 8cf8d78c9c..f183e79e85 100644 --- a/tox.ini +++ b/tox.ini @@ -37,7 +37,8 @@ deps= apicheck,pypy3.10,3.9,3.10,3.11,3.12,3.13,3.14,3.14t: -r{toxinidir}/requirements/default.txt apicheck,pypy3.10,3.9,3.10,3.11,3.12,3.13,3.14,3.14t: -r{toxinidir}/requirements/test.txt apicheck,pypy3.10,3.9,3.10,3.11,3.12,3.13,3.14,3.14t: -r{toxinidir}/requirements/test-ci.txt - apicheck,3.9-linux,3.10-linux,3.11-linux,3.12-linux,3.13-linux,3.14-linux,3.14t-linux: -r{toxinidir}/requirements/extras/confluentkafka.txt -r{toxinidir}/requirements/extras/nats.txt + apicheck,3.9-linux,3.10-linux,3.11-linux,3.12-linux,3.13-linux,3.14-linux,3.14t-linux: -r{toxinidir}/requirements/extras/confluentkafka.txt + apicheck,3.9-linux,3.10-linux,3.11-linux,3.12-linux,3.13-linux,3.14-linux,3.14t-linux: -r{toxinidir}/requirements/extras/nats.txt integration-nats: -r{toxinidir}/requirements/extras/nats.txt apicheck,linkcheck: -r{toxinidir}/requirements/docs.txt