Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
158 changes: 103 additions & 55 deletions tests/brokers/redis/test_autoclaim.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import asyncio
from contextlib import suppress
from unittest.mock import MagicMock
from unittest.mock import MagicMock, patch

import pytest
from redis.asyncio import Redis

from faststream.exceptions import NackMessage
from faststream.redis import StreamSub
from tests.brokers.base.consume import BrokerRealConsumeTestcase
from tests.tools import spy_decorator

from .basic import RedisTestcaseConfig

Expand All @@ -20,10 +22,9 @@ async def test_consume_stream_with_min_idle_time(
self,
queue: str,
mock: MagicMock,
event: asyncio.Event,
) -> None:
"""Test consuming messages using XAUTOCLAIM with min_idle_time."""
event = asyncio.Event()

"""Verify that subscribers with min_idle_time use XAUTOCLAIM to reclaim pending messages."""
consume_broker = self.get_broker(apply_types=True)

@consume_broker.subscriber(
Expand All @@ -49,26 +50,38 @@ async def retry(msg: str) -> None:
event.set()

async with self.patch_broker(consume_broker) as br:
await br.start()
with (
patch.object(
Redis, "xautoclaim", spy_decorator(Redis.xautoclaim)
) as xautoclaim,
patch.object(
Redis, "xreadgroup", spy_decorator(Redis.xreadgroup)
) as xreadgroup,
):
await br.start()

# First, publish a message and let it become pending
await br.publish("pending_message", stream=queue)

# The subscriber with XAUTOCLAIM should reclaim it
await asyncio.wait(
(asyncio.create_task(event.wait()),),
timeout=3,
)

# First, publish a message and let it become pending
await br.publish("pending_message", stream=queue)
assert event.is_set()
mock.assert_called_once_with("pending_message")

# The subscriber with XAUTOCLAIM should reclaim it
await asyncio.wait(
(asyncio.create_task(event.wait()),),
timeout=3,
)

assert event.is_set()
mock.assert_called_once_with("pending_message")
# Verify that XAUTOCLAIM was used, not XREADGROUP
assert xautoclaim.mock.called
assert xreadgroup.mock.called # regular subscriber uses xreadgroup

@pytest.mark.slow()
async def test_get_one_with_min_idle_time(
self,
queue: str,
) -> None:
"""Test get_one() with min_idle_time."""
"""Verify that get_one() method uses XAUTOCLAIM when min_idle_time is configured."""
broker = self.get_broker(apply_types=True)

async with self.patch_broker(broker) as br:
Expand Down Expand Up @@ -102,19 +115,30 @@ async def test_get_one_with_min_idle_time(
)
)

message = await subscriber.get_one(timeout=3)

assert message is not None
decoded = await message.decode()
assert decoded == {"data": "pending"}
with (
patch.object(
Redis, "xautoclaim", spy_decorator(Redis.xautoclaim)
) as xautoclaim,
patch.object(
Redis, "xreadgroup", spy_decorator(Redis.xreadgroup)
) as xreadgroup,
):
message = await subscriber.get_one(timeout=3)

assert message is not None
decoded = await message.decode()
assert decoded == {"data": "pending"}
# Should use XAUTOCLAIM, not XREADGROUP
assert xautoclaim.mock.called
assert not xreadgroup.mock.called

@pytest.mark.slow()
async def test_get_one_with_min_idle_time_no_pending(
self,
queue: str,
mock: MagicMock,
) -> None:
"""Test get_one() with min_idle_time when no pending messages."""
"""Verify that get_one() returns None when no pending messages are available for claiming."""
broker = self.get_broker(apply_types=True)

subscriber = broker.subscriber(
Expand All @@ -141,7 +165,7 @@ async def test_iterator_with_min_idle_time(
queue: str,
mock: MagicMock,
) -> None:
"""Test async iterator with min_idle_time."""
"""Verify that async iterator uses XAUTOCLAIM when min_idle_time is configured."""
broker = self.get_broker(apply_types=True)

async with self.patch_broker(broker) as br:
Expand Down Expand Up @@ -175,27 +199,37 @@ async def test_iterator_with_min_idle_time(
)
)

count = 0
async for msg in subscriber:
decoded = await msg.decode()
mock(decoded)
count += 1
if count >= 2:
break

assert count == 2
mock.assert_any_call({"data": "msg1"})
mock.assert_any_call({"data": "msg2"})
with (
patch.object(
Redis, "xautoclaim", spy_decorator(Redis.xautoclaim)
) as xautoclaim,
patch.object(
Redis, "xreadgroup", spy_decorator(Redis.xreadgroup)
) as xreadgroup,
):
count = 0
async for msg in subscriber:
decoded = await msg.decode()
mock(decoded)
count += 1
if count >= 2:
break

assert count == 2
mock.assert_any_call({"data": "msg1"})
mock.assert_any_call({"data": "msg2"})
# Should use XAUTOCLAIM, not XREADGROUP
assert xautoclaim.mock.called
assert not xreadgroup.mock.called

@pytest.mark.slow()
async def test_consume_stream_batch_with_min_idle_time(
self,
queue: str,
mock: MagicMock,
event: asyncio.Event,
) -> None:
"""Test batch consuming with min_idle_time."""
event = asyncio.Event()

"""Verify that batch subscribers use XAUTOCLAIM when min_idle_time is configured."""
consume_broker = self.get_broker(apply_types=True)

@consume_broker.subscriber(
Expand All @@ -212,17 +246,15 @@ async def handler(msg: list) -> None:
event.set()

async with self.patch_broker(consume_broker) as br:
await br.start()

# Create a pending message first
# Create a pending message first (before starting subscriber)
await br.publish({"data": "batch_msg"}, stream=queue)

with suppress(Exception):
await br._connection.xgroup_create(
queue, "batch_group", id="0", mkstream=True
)

# Read but don't ack
# Read but don't ack (before starting subscriber)
await br._connection.xreadgroup(
groupname="batch_group",
consumername="temp",
Expand All @@ -232,26 +264,40 @@ async def handler(msg: list) -> None:

await asyncio.sleep(0.1)

# Now the subscriber should reclaim it
await asyncio.wait(
(asyncio.create_task(event.wait()),),
timeout=3,
)
# Now start subscriber and track calls
with (
patch.object(
Redis, "xautoclaim", spy_decorator(Redis.xautoclaim)
) as xautoclaim,
patch.object(
Redis, "xreadgroup", spy_decorator(Redis.xreadgroup)
) as xreadgroup,
):
await br.start()

# Now the subscriber should reclaim it
await asyncio.wait(
(asyncio.create_task(event.wait()),),
timeout=3,
)

assert event.is_set()
# In batch mode, should receive list
assert mock.call_count == 1
called_with = mock.call_args[0][0]
assert isinstance(called_with, list)
assert len(called_with) > 0
assert event.is_set()
# In batch mode, should receive list
assert mock.call_count == 1
called_with = mock.call_args[0][0]
assert isinstance(called_with, list)
assert len(called_with) > 0
# Should use XAUTOCLAIM, not XREADGROUP
assert xautoclaim.mock.called
assert not xreadgroup.mock.called

@pytest.mark.slow()
async def test_xautoclaim_with_deleted_messages(
self,
queue: str,
mock: MagicMock,
) -> None:
"""Test XAUTOCLAIM behavior when messages are deleted from stream."""
"""Verify that XAUTOCLAIM handles deleted messages gracefully without errors."""
consume_broker = self.get_broker(apply_types=True)

async with self.patch_broker(consume_broker) as br:
Expand Down Expand Up @@ -301,7 +347,7 @@ async def test_xautoclaim_circular_scanning_with_idle_timeout(
queue: str,
mock: MagicMock,
) -> None:
"""Test that XAUTOCLAIM scans circularly and claims messages as they become idle."""
"""Verify that XAUTOCLAIM performs circular scanning and claims messages as they become idle."""
consume_broker = self.get_broker(apply_types=True)

async with self.patch_broker(consume_broker) as br:
Expand Down Expand Up @@ -350,7 +396,9 @@ async def test_xautoclaim_circular_scanning_with_idle_timeout(

# Should have claimed all 5 messages in order
assert len(claimed_messages_first_pass) == 5
assert claimed_messages_first_pass == [{"data": f"msg{i}"} for i in range(5)]
assert claimed_messages_first_pass == [
{"data": f"msg{i}"} for i in range(5)
]

# After reaching the end, XAUTOCLAIM should restart from "0-0"
# and scan circularly - messages are still pending since we didn't ACK them
Expand Down
Loading