diff --git a/tests/brokers/redis/test_autoclaim.py b/tests/brokers/redis/test_autoclaim.py index 00db1de8f7..69cacff2b8 100644 --- a/tests/brokers/redis/test_autoclaim.py +++ b/tests/brokers/redis/test_autoclaim.py @@ -1,12 +1,14 @@ import asyncio from contextlib import suppress -from unittest.mock import MagicMock +from unittest.mock import MagicMock, patch import pytest +from redis.asyncio import Redis from faststream.exceptions import NackMessage from faststream.redis import StreamSub from tests.brokers.base.consume import BrokerRealConsumeTestcase +from tests.tools import spy_decorator from .basic import RedisTestcaseConfig @@ -20,10 +22,9 @@ async def test_consume_stream_with_min_idle_time( self, queue: str, mock: MagicMock, + event: asyncio.Event, ) -> None: - """Test consuming messages using XAUTOCLAIM with min_idle_time.""" - event = asyncio.Event() - + """Verify that subscribers with min_idle_time use XAUTOCLAIM to reclaim pending messages.""" consume_broker = self.get_broker(apply_types=True) @consume_broker.subscriber( @@ -49,26 +50,38 @@ async def retry(msg: str) -> None: event.set() async with self.patch_broker(consume_broker) as br: - await br.start() + with ( + patch.object( + Redis, "xautoclaim", spy_decorator(Redis.xautoclaim) + ) as xautoclaim, + patch.object( + Redis, "xreadgroup", spy_decorator(Redis.xreadgroup) + ) as xreadgroup, + ): + await br.start() + + # First, publish a message and let it become pending + await br.publish("pending_message", stream=queue) + + # The subscriber with XAUTOCLAIM should reclaim it + await asyncio.wait( + (asyncio.create_task(event.wait()),), + timeout=3, + ) - # First, publish a message and let it become pending - await br.publish("pending_message", stream=queue) + assert event.is_set() + mock.assert_called_once_with("pending_message") - # The subscriber with XAUTOCLAIM should reclaim it - await asyncio.wait( - (asyncio.create_task(event.wait()),), - timeout=3, - ) - - assert event.is_set() - mock.assert_called_once_with("pending_message") + # Verify that XAUTOCLAIM was used, not XREADGROUP + assert xautoclaim.mock.called + assert xreadgroup.mock.called # regular subscriber uses xreadgroup @pytest.mark.slow() async def test_get_one_with_min_idle_time( self, queue: str, ) -> None: - """Test get_one() with min_idle_time.""" + """Verify that get_one() method uses XAUTOCLAIM when min_idle_time is configured.""" broker = self.get_broker(apply_types=True) async with self.patch_broker(broker) as br: @@ -102,11 +115,22 @@ async def test_get_one_with_min_idle_time( ) ) - message = await subscriber.get_one(timeout=3) - - assert message is not None - decoded = await message.decode() - assert decoded == {"data": "pending"} + with ( + patch.object( + Redis, "xautoclaim", spy_decorator(Redis.xautoclaim) + ) as xautoclaim, + patch.object( + Redis, "xreadgroup", spy_decorator(Redis.xreadgroup) + ) as xreadgroup, + ): + message = await subscriber.get_one(timeout=3) + + assert message is not None + decoded = await message.decode() + assert decoded == {"data": "pending"} + # Should use XAUTOCLAIM, not XREADGROUP + assert xautoclaim.mock.called + assert not xreadgroup.mock.called @pytest.mark.slow() async def test_get_one_with_min_idle_time_no_pending( @@ -114,7 +138,7 @@ async def test_get_one_with_min_idle_time_no_pending( queue: str, mock: MagicMock, ) -> None: - """Test get_one() with min_idle_time when no pending messages.""" + """Verify that get_one() returns None when no pending messages are available for claiming.""" broker = self.get_broker(apply_types=True) subscriber = broker.subscriber( @@ -141,7 +165,7 @@ async def test_iterator_with_min_idle_time( queue: str, mock: MagicMock, ) -> None: - """Test async iterator with min_idle_time.""" + """Verify that async iterator uses XAUTOCLAIM when min_idle_time is configured.""" broker = self.get_broker(apply_types=True) async with self.patch_broker(broker) as br: @@ -175,27 +199,37 @@ async def test_iterator_with_min_idle_time( ) ) - count = 0 - async for msg in subscriber: - decoded = await msg.decode() - mock(decoded) - count += 1 - if count >= 2: - break - - assert count == 2 - mock.assert_any_call({"data": "msg1"}) - mock.assert_any_call({"data": "msg2"}) + with ( + patch.object( + Redis, "xautoclaim", spy_decorator(Redis.xautoclaim) + ) as xautoclaim, + patch.object( + Redis, "xreadgroup", spy_decorator(Redis.xreadgroup) + ) as xreadgroup, + ): + count = 0 + async for msg in subscriber: + decoded = await msg.decode() + mock(decoded) + count += 1 + if count >= 2: + break + + assert count == 2 + mock.assert_any_call({"data": "msg1"}) + mock.assert_any_call({"data": "msg2"}) + # Should use XAUTOCLAIM, not XREADGROUP + assert xautoclaim.mock.called + assert not xreadgroup.mock.called @pytest.mark.slow() async def test_consume_stream_batch_with_min_idle_time( self, queue: str, mock: MagicMock, + event: asyncio.Event, ) -> None: - """Test batch consuming with min_idle_time.""" - event = asyncio.Event() - + """Verify that batch subscribers use XAUTOCLAIM when min_idle_time is configured.""" consume_broker = self.get_broker(apply_types=True) @consume_broker.subscriber( @@ -212,9 +246,7 @@ async def handler(msg: list) -> None: event.set() async with self.patch_broker(consume_broker) as br: - await br.start() - - # Create a pending message first + # Create a pending message first (before starting subscriber) await br.publish({"data": "batch_msg"}, stream=queue) with suppress(Exception): @@ -222,7 +254,7 @@ async def handler(msg: list) -> None: queue, "batch_group", id="0", mkstream=True ) - # Read but don't ack + # Read but don't ack (before starting subscriber) await br._connection.xreadgroup( groupname="batch_group", consumername="temp", @@ -232,18 +264,32 @@ async def handler(msg: list) -> None: await asyncio.sleep(0.1) - # Now the subscriber should reclaim it - await asyncio.wait( - (asyncio.create_task(event.wait()),), - timeout=3, - ) + # Now start subscriber and track calls + with ( + patch.object( + Redis, "xautoclaim", spy_decorator(Redis.xautoclaim) + ) as xautoclaim, + patch.object( + Redis, "xreadgroup", spy_decorator(Redis.xreadgroup) + ) as xreadgroup, + ): + await br.start() + + # Now the subscriber should reclaim it + await asyncio.wait( + (asyncio.create_task(event.wait()),), + timeout=3, + ) - assert event.is_set() - # In batch mode, should receive list - assert mock.call_count == 1 - called_with = mock.call_args[0][0] - assert isinstance(called_with, list) - assert len(called_with) > 0 + assert event.is_set() + # In batch mode, should receive list + assert mock.call_count == 1 + called_with = mock.call_args[0][0] + assert isinstance(called_with, list) + assert len(called_with) > 0 + # Should use XAUTOCLAIM, not XREADGROUP + assert xautoclaim.mock.called + assert not xreadgroup.mock.called @pytest.mark.slow() async def test_xautoclaim_with_deleted_messages( @@ -251,7 +297,7 @@ async def test_xautoclaim_with_deleted_messages( queue: str, mock: MagicMock, ) -> None: - """Test XAUTOCLAIM behavior when messages are deleted from stream.""" + """Verify that XAUTOCLAIM handles deleted messages gracefully without errors.""" consume_broker = self.get_broker(apply_types=True) async with self.patch_broker(consume_broker) as br: @@ -301,7 +347,7 @@ async def test_xautoclaim_circular_scanning_with_idle_timeout( queue: str, mock: MagicMock, ) -> None: - """Test that XAUTOCLAIM scans circularly and claims messages as they become idle.""" + """Verify that XAUTOCLAIM performs circular scanning and claims messages as they become idle.""" consume_broker = self.get_broker(apply_types=True) async with self.patch_broker(consume_broker) as br: @@ -350,7 +396,9 @@ async def test_xautoclaim_circular_scanning_with_idle_timeout( # Should have claimed all 5 messages in order assert len(claimed_messages_first_pass) == 5 - assert claimed_messages_first_pass == [{"data": f"msg{i}"} for i in range(5)] + assert claimed_messages_first_pass == [ + {"data": f"msg{i}"} for i in range(5) + ] # After reaching the end, XAUTOCLAIM should restart from "0-0" # and scan circularly - messages are still pending since we didn't ACK them