Skip to content
Open
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
6e298ef
support redis queue expiration
ffje Feb 26, 2025
43b5ff5
fix: linter
ffje Feb 26, 2025
e5f552b
fix: docs
ffje Feb 26, 2025
570fecb
fix: docs [2]
ffje Feb 26, 2025
d371800
Merge branch 'main' into redis-q-expiration
ffje Mar 4, 2025
229b910
Merge branch 'main' into redis-q-expiration
ffje Mar 9, 2025
f12d5dc
clean-up: remove integration docker-compose
ffje Mar 9, 2025
2e7a18c
Merge branch 'redis-q-expiration' of https://github.com/ffje/kombu in…
ffje Mar 9, 2025
2a41482
Merge branch 'main' into redis-q-expiration
ffje Mar 13, 2025
f576a31
Merge branch 'main' into redis-q-expiration
ffje Mar 15, 2025
dd94bac
Merge branch 'main' into redis-q-expiration
ffje Mar 18, 2025
5c34e7e
Merge branch 'main' into redis-q-expiration
auvipy Mar 19, 2025
dd18d80
tests: unit
ffje Mar 19, 2025
407cdea
fix: lint
ffje Mar 19, 2025
d8e6baa
Merge branch 'main' into redis-q-expiration
ffje Mar 20, 2025
7f5e585
Update kombu/transport/redis.py
auvipy Mar 20, 2025
c03bd4c
Update kombu/transport/redis.py
auvipy Mar 20, 2025
c5c4f80
docs: add support queue TTL doc
ffje Mar 23, 2025
5811be2
Merge branch 'redis-q-expiration' of https://github.com/ffje/kombu in…
ffje Mar 23, 2025
536ca7a
Merge branch 'main' into redis-q-expiration
auvipy Mar 25, 2025
5c925e0
Merge branch 'main' into redis-q-expiration
ffje Mar 27, 2025
6a72bcf
Merge branch 'main' into redis-q-expiration
auvipy Mar 30, 2025
1daf444
Update kombu/transport/redis.py
auvipy Mar 30, 2025
2d148e3
Update kombu/transport/redis.py
auvipy Mar 30, 2025
ac38051
Update kombu/transport/redis.py
auvipy Mar 30, 2025
080cf5d
Update kombu/transport/redis.py
auvipy Mar 30, 2025
f81998f
Update kombu/transport/redis.py
auvipy Mar 30, 2025
1d4900c
Update kombu/transport/redis.py
auvipy Mar 30, 2025
9633638
Update kombu/transport/redis.py
auvipy Mar 30, 2025
19ac320
Update kombu/transport/redis.py
auvipy Mar 30, 2025
beef48d
Merge branch 'main' into redis-q-expiration
Nusnus Apr 15, 2025
0779fa6
Merge branch 'main' into redis-q-expiration
ffje Apr 17, 2025
0770b62
fix: unit testing
ffje Apr 25, 2025
3f56546
lint
ffje Apr 25, 2025
6ae9a96
fix: integartion tests
ffje Apr 27, 2025
1d2caf3
Merge branch 'main' into redis-q-expiration
ffje May 2, 2025
3e478df
Merge branch 'main' into redis-q-expiration
ffje May 5, 2025
6014b37
Merge branch 'main' into redis-q-expiration
ffje May 8, 2025
d6d39e1
Merge branch 'main' into redis-q-expiration
ffje May 21, 2025
0168756
Merge branch 'main' into redis-q-expiration
ffje Jun 4, 2025
d80d746
Merge branch 'main' into redis-q-expiration
auvipy Jun 14, 2025
d24af21
Merge branch 'main' into redis-q-expiration
auvipy Jun 27, 2025
1cf6a2d
Merge branch 'main' into redis-q-expiration
ffje Jun 30, 2025
9cb30f6
Merge branch 'main' into redis-q-expiration
auvipy Jul 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions AUTHORS
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ Corentin Ardeois <[email protected]>
Dan LaMotte <[email protected]>
Dan McGee <[email protected]>
Dane Guempel <[email protected]>
Danila Shiman <[email protected]>
Davanum Srinivas <[email protected]>
David Clymer <[email protected]>
David Gelvin <[email protected]>
Expand Down
4 changes: 2 additions & 2 deletions kombu/entity.py
Original file line number Diff line number Diff line change
Expand Up @@ -468,8 +468,8 @@ class Queue(MaybeChannelBound):

See https://www.rabbitmq.com/ttl.html#queue-ttl

**RabbitMQ extension**: Only available when using RabbitMQ.

**RabbitMQ extension**: Available when using RabbitMQ.
**Redis extension**: Available when using Redis.
message_ttl (float): Message time to live in seconds.

This setting controls how long messages can stay in the queue
Expand Down
51 changes: 50 additions & 1 deletion kombu/transport/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
* Supports Topic: Yes
* Supports Fanout: Yes
* Supports Priority: Yes
* Supports TTL: No
* Supports Queue TTL: Yes
* Supports Message TTL: No

Connection String
=================
Expand Down Expand Up @@ -49,6 +50,8 @@
* ``health_check_interval``
* ``retry_on_timeout``
* ``priority_steps``
* ``x-expires``: (int) Time in milliseconds for queues to expire if there's no activity.
The queue will be automatically deleted after this period of inactivity.
"""

from __future__ import annotations
Expand All @@ -66,6 +69,7 @@

from kombu.exceptions import InconsistencyError, VersionMismatch
from kombu.log import get_logger
from kombu.transport.base import to_rabbitmq_queue_arguments
from kombu.utils.compat import register_after_fork
from kombu.utils.encoding import bytes_to_str
from kombu.utils.eventio import ERR, READ, poll
Expand Down Expand Up @@ -206,6 +210,7 @@
"ZADD",
"ZREM",
"ZREVRANGEBYSCORE",
"PEXPIRE",
]

PREFIXED_COMPLEX_COMMANDS = {
Expand Down Expand Up @@ -690,6 +695,8 @@
_async_pool = None
_pool = None

_expires = {}

from_transport_options = (
virtual.Channel.from_transport_options +
('sep',
Expand Down Expand Up @@ -994,7 +1001,10 @@
for pri in self.priority_steps:
item = client.rpop(self._q_for_pri(queue, pri))
if item:
self._maybe_update_queues_expire(queue)
return loads(bytes_to_str(item))

self._maybe_update_queues_expire(queue)
raise Empty()

def _size(self, queue):
Expand Down Expand Up @@ -1023,6 +1033,8 @@
with self.conn_or_acquire() as client:
client.lpush(self._q_for_pri(queue, pri), dumps(message))

self._maybe_update_queues_expire(queue)

def _put_fanout(self, exchange, message, routing_key, **kwargs):
"""Deliver fanout message."""
with self.conn_or_acquire() as client:
Expand All @@ -1035,6 +1047,10 @@
if auto_delete:
self.auto_delete_queues.add(queue)

expire = self._get_queue_expire(kwargs)
if expire is not None:
self._expires[queue] = expire

Check warning on line 1052 in kombu/transport/redis.py

View check run for this annotation

Codecov / codecov/patch

kombu/transport/redis.py#L1052

Added line #L1052 was not covered by tests

def _queue_bind(self, exchange, routing_key, pattern, queue):
if self.typeof(exchange).type == 'fanout':
# Mark exchange as fanout.
Expand All @@ -1047,6 +1063,39 @@
pattern or '',
queue or '']))

def _maybe_update_queues_expire(self, queue):
"""Update expiration on queue keys.

For each queue, set expiration time in milliseconds.
Will only be set if x-expires argument was provided when creating the queue.
"""
if not self._expires or queue not in self._expires:
return

with self.conn_or_acquire() as client:
with client.pipeline() as pipe:
for priority in self.priority_steps:
pipe = pipe.pexpire(self._q_for_pri(queue, priority), self._expires[queue])
pipe.execute()

def _get_queue_expire(self, args):
"""Get expiration header named `x-expires` of queue definition.

Returns expiration time in milliseconds or None if not set.

Arguments:
---------
args (dict): Queue arguments dictionary
"""
try:
value = args['arguments']['x-expires']
return int(value)
except (KeyError, TypeError, ValueError):
return None

def prepare_queue_arguments(self, arguments, **kwargs):
return to_rabbitmq_queue_arguments(arguments, **kwargs)

def _delete(self, queue, exchange, routing_key, pattern, *args, **kwargs):
self.auto_delete_queues.discard(queue)
with self.conn_or_acquire(client=kwargs.get('client')) as client:
Expand Down
169 changes: 169 additions & 0 deletions t/integration/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,20 @@ def connection(request):
)


@pytest.fixture()
def redis_client(connection):
"""Direct Redis client for verification."""
conn_info = connection.info()
host = conn_info['hostname'] or 'localhost'
port = conn_info['port'] or 6379

return redis.Redis(
host=host,
port=port,
decode_responses=True
)


@pytest.fixture()
def invalid_connection():
return kombu.Connection('redis://localhost:12345')
Expand Down Expand Up @@ -243,3 +257,158 @@ def connect_check_certificate(self):
# note the host/port here is irrelevant because
# connect will raise a CertificateError due to hostname mismatch
kombu.Connection('rediss://localhost:12345?ssl_check_hostname=true').connect()


@pytest.mark.env('redis')
@pytest.mark.flaky(reruns=5, reruns_delay=2)
class test_RedisQueueExpiration:
"""Integration tests for Redis queue expiration feature."""

def test_queue_expiration_set(self, connection, redis_client):
"""Test that expiration is set correctly on queue using direct expires parameter."""
expires_ms = 2000
expires_sec = expires_ms / 1000

test_queue = kombu.Queue(
'expire_test_queue',
routing_key='expire_test_queue',
expires=expires_sec
)

with connection as conn:
with conn.channel() as channel:
producer = kombu.Producer(channel)
producer.publish(
{'msg': 'test message'},
retry=True,
exchange=test_queue.exchange,
routing_key=test_queue.routing_key,
declare=[test_queue],
serializer='json'
)

# Check if expiration was set on queue key
keyprefix = connection.transport_options.get('global_keyprefix', '')
queue_key = f"{keyprefix}{test_queue.name}"
ttl = redis_client.pttl(queue_key)
assert ttl > 0 and ttl <= expires_ms, f"Expected TTL to be set but got {ttl}"

def test_expiration_gets_reset_on_put(self, connection, redis_client):
"""Test that expiration gets reset when putting new message to queue."""
expires_ms = 5000
expires_sec = expires_ms / 1000

test_queue = kombu.Queue(
'expire_reset_test_queue',
routing_key='expire_reset_test_queue',
expires=expires_sec
)

keyprefix = connection.transport_options.get('global_keyprefix', '')
queue_key = f"{keyprefix}{test_queue.name}"

with connection as conn:
with conn.channel() as channel:
producer = kombu.Producer(channel)
producer.publish(
{'msg': 'first message'},
retry=True,
exchange=test_queue.exchange,
routing_key=test_queue.routing_key,
declare=[test_queue],
serializer='json'
)

sleep(expires_ms / 2000) # Wait for half the TTL
producer.publish(
{'msg': 'second message'},
retry=True,
exchange=test_queue.exchange,
routing_key=test_queue.routing_key,
declare=[test_queue],
serializer='json'
)

ttl = redis_client.pttl(queue_key)
assert ttl > 0, "TTL should be updated after publishing a new message"

def test_expiration_gets_reset_on_get(self, connection, redis_client):
"""Test that expiration gets reset when getting a message from queue."""
expires_ms = 5000
expires_sec = expires_ms / 1000

test_queue = kombu.Queue(
'expire_get_test_queue',
routing_key='expire_get_test_queue',
expires=expires_sec
)

keyprefix = connection.transport_options.get('global_keyprefix', '')
queue_key = f"{keyprefix}{test_queue.name}"

received_messages = []

def callback(body, message):
received_messages.append(body)
message.ack()

with connection as conn:
with conn.channel() as channel:
producer = kombu.Producer(channel)
consumer = kombu.Consumer(
conn, [test_queue], accept=['json']
)
consumer.register_callback(callback)

for i in range(3):
producer.publish(
{'msg': f'message {i}'},
retry=True,
exchange=test_queue.exchange,
routing_key=test_queue.routing_key,
declare=[test_queue],
serializer='json'
)

# Wait for some time but not enough for queue to expire
sleep(expires_ms / 2000) # Wait for half the TTL
with consumer:
conn.drain_events(timeout=1)

assert len(received_messages) == 1, "Should have received one message"
ttl = redis_client.pttl(queue_key)
assert ttl > 0, "TTL should be updated after consuming a message"

def test_queue_expires_for_all_priorities(self, connection, redis_client):
"""Test that expiration is set for all priority queues."""
expires_ms = 2000
expires_sec = expires_ms / 1000

test_queue = kombu.Queue(
'expire_priority_test_queue',
routing_key='expire_priority_test_queue',
expires=expires_sec,
max_priority=10
)

with connection as conn:
with conn.channel() as channel:
producer = kombu.Producer(channel)
for priority in [0, 3, 6, 9]:
producer.publish(
{'msg': f'priority {priority} message'},
retry=True,
exchange=test_queue.exchange,
routing_key=test_queue.routing_key,
declare=[test_queue],
serializer='json',
priority=priority
)

# Check if expiration was set on all priority queue keys
priority_keys = [key for key in redis_client.keys("*") if test_queue.name in key]
assert priority_keys, "Expected to find queue keys with priorities"

for key in priority_keys:
ttl = redis_client.pttl(key)
assert ttl > 0 and ttl <= expires_ms, f"Expected TTL for {key} to be set but got {ttl}"
63 changes: 63 additions & 0 deletions t/unit/transport/test_redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -1338,6 +1338,69 @@ def pipeline(transaction=True, shard_hint=None):
('HDEL', 'foo_unacked', 'test-tag')
]

def test_get_queue_expire_valid_string(self):
"""Test _get_queue_expire with valid string value."""
args = {"arguments": {"x-expires": "5000"}}
result = self.channel._get_queue_expire(args)
assert result == 5000

def test_get_queue_expire_valid_int(self):
"""Test _get_queue_expire with valid integer value."""
args = {"arguments": {"x-expires": 5000}}
result = self.channel._get_queue_expire(args)
assert result == 5000

def test_get_queue_expire_missing_arguments(self):
"""Test _get_queue_expire with empty args dictionary."""
args = {}
result = self.channel._get_queue_expire(args)
assert result is None

def test_get_queue_expire_missing_x_expires(self):
"""Test _get_queue_expire with missing x-expires key."""
args = {"arguments": {}}
result = self.channel._get_queue_expire(args)
assert result is None

def test_get_queue_expire_non_numeric(self):
"""Test _get_queue_expire with non-numeric value."""
args = {"arguments": {"x-expires": "invalid"}}
result = self.channel._get_queue_expire(args)
assert result is None

def test_get_queue_expire_none(self):
"""Test _get_queue_expire with None args."""
result = self.channel._get_queue_expire(None)
assert result is None

def test_maybe_update_queues_expire(self):
with Connection(transport=Transport) as conn:
channel = conn.channel()
channel._expires = {'test_queue': 5000}

client_mock = Mock()
client_mock.__enter__ = lambda self: client_mock
client_mock.__exit__ = lambda self, *args: None

pipeline_mock = Mock()
pipeline_mock.__enter__ = lambda self: pipeline_mock
pipeline_mock.__exit__ = lambda self, *args: None

client_mock.pipeline.return_value = pipeline_mock

channel.conn_or_acquire = Mock(return_value=client_mock)

channel._maybe_update_queues_expire('test_queue')

expected_calls = [
call.pexpire('test_queue', 5000)
]

actual_calls = pipeline_mock.method_calls

for expected_call in expected_calls:
assert expected_call in actual_calls


class test_Redis:

Expand Down