From cad2597df590f6d5c2b9cde569511eae0c6af197 Mon Sep 17 00:00:00 2001 From: Pierre Bonneau Date: Fri, 7 Nov 2025 11:38:33 +0100 Subject: [PATCH 1/3] fix: Recreate group in case redis flush - adding a test - reset properly the id - adding another rare case when group is tempered by another system, and IDs gets desynchronized. (AI recommendation) --- tests/brokers/redis/test_consume.py | 46 +++++++++++++++++++++++++++++ uv.lock | 15 +++++----- 2 files changed, 54 insertions(+), 7 deletions(-) diff --git a/tests/brokers/redis/test_consume.py b/tests/brokers/redis/test_consume.py index 20d75e83fa..b0b6301847 100644 --- a/tests/brokers/redis/test_consume.py +++ b/tests/brokers/redis/test_consume.py @@ -844,6 +844,52 @@ async def handler(msg: RedisStreamMessage) -> None: assert queue_len == 0, ( f"Redis stream must be empty here, found {queue_len} messages" ) + async def test_consume_from_group( + self, + queue: str, + ) -> None: + event = asyncio.Event() + + consume_broker = self.get_broker(apply_types=True) + + @consume_broker.subscriber( + stream=StreamSub(queue, group="group", consumer=queue), + ) + async def handler(msg: RedisMessage) -> None: + event.set() + + async with self.patch_broker(consume_broker) as br: + await br.start() + redis_client = br._connection + with ( + patch.object(redis_client, "xreadgroup", spy_decorator(redis_client.xreadgroup)) as m_readgroup, + patch.object(redis_client, "xgroup_create", spy_decorator(redis_client.xgroup_create)) as m_group_create + ): + await asyncio.wait( + ( + asyncio.create_task(br.publish("hello", stream=queue)), + asyncio.create_task(event.wait()), + ), + timeout=3, + ) + await asyncio.sleep(0.1) + m_readgroup.mock.assert_called_once() + assert event.is_set() + await redis_client.flushall() + event.clear() + await asyncio.sleep(0.1) + await asyncio.wait( + ( + asyncio.create_task(br.publish("hello again", stream=queue)), + asyncio.create_task(event.wait()), + ), + timeout=3, + ) + + await asyncio.sleep(0.1) + m_group_create.mock.assert_called_once() + + assert event.is_set() async def test_get_one( self, diff --git a/uv.lock b/uv.lock index ae93212f24..0c9056d547 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 3 +revision = 2 requires-python = ">=3.10" resolution-markers = [ "python_full_version >= '3.13'", @@ -397,6 +397,7 @@ wheels = [ name = "confluent-kafka" version = "2.12.1" source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/d5/d6/10799cbc9ad757170eb466eaadfaf2abde1d832b6d67d7ff040179888272/confluent_kafka-2.12.1.tar.gz", hash = "sha256:cfc4ceeb3071a678f7c93cfe1e76216442e1621ee6192584cdf704db1d619894", size = 250074, upload-time = "2025-11-04T19:03:31.969Z" } wheels = [ { url = "https://files.pythonhosted.org/packages/c9/ad/0d2b906aece9069fc0bead3bd7907a440442209ee3795954359555a51b91/confluent_kafka-2.12.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:418f52c52c3833a2481cb4f576f4bee5183776dd9411f5288286117d794d858c", size = 3593667, upload-time = "2025-10-21T20:49:58.317Z" }, { url = "https://files.pythonhosted.org/packages/c4/ae/27220c24f96b5cb7d69b01035e3ebe1fc6d6cde54e8d9e6e69a6fd8fcd69/confluent_kafka-2.12.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:6308da6a1feddfc8a5ab757982ab546c4482e33c056e7b4f379deeddfd821c0c", size = 3153382, upload-time = "2025-10-21T20:50:01.627Z" }, @@ -732,7 +733,7 @@ pydantic = [ [[package]] name = "fastapi" -version = "0.120.0" +version = "0.121.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "annotated-doc" }, @@ -740,14 +741,14 @@ dependencies = [ { name = "starlette" }, { name = "typing-extensions" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/f7/0e/7f29e8f7219e4526747db182e1afb5a4b6abc3201768fb38d81fa2536241/fastapi-0.120.0.tar.gz", hash = "sha256:6ce2c1cfb7000ac14ffd8ddb2bc12e62d023a36c20ec3710d09d8e36fab177a0", size = 337603, upload-time = "2025-10-23T20:56:34.743Z" } +sdist = { url = "https://files.pythonhosted.org/packages/8c/e3/77a2df0946703973b9905fd0cde6172c15e0781984320123b4f5079e7113/fastapi-0.121.0.tar.gz", hash = "sha256:06663356a0b1ee93e875bbf05a31fb22314f5bed455afaaad2b2dad7f26e98fa", size = 342412, upload-time = "2025-11-03T10:25:54.818Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/1d/60/7a639ceaba54aec4e1d5676498c568abc654b95762d456095b6cb529b1ca/fastapi-0.120.0-py3-none-any.whl", hash = "sha256:84009182e530c47648da2f07eb380b44b69889a4acfd9e9035ee4605c5cfc469", size = 108243, upload-time = "2025-10-23T20:56:33.281Z" }, + { url = "https://files.pythonhosted.org/packages/dd/2c/42277afc1ba1a18f8358561eee40785d27becab8f80a1f945c0a3051c6eb/fastapi-0.121.0-py3-none-any.whl", hash = "sha256:8bdf1b15a55f4e4b0d6201033da9109ea15632cb76cf156e7b8b4019f2172106", size = 109183, upload-time = "2025-11-03T10:25:53.27Z" }, ] [[package]] name = "faststream" -version = "0.6.2" +version = "0.6.3" source = { editable = "." } dependencies = [ { name = "anyio" }, @@ -925,7 +926,7 @@ dev = [ { name = "detect-secrets", specifier = "==1.5.0" }, { name = "dirty-equals", specifier = "==0.10.0" }, { name = "email-validator", specifier = "==2.3.0" }, - { name = "fastapi", specifier = "==0.120.0" }, + { name = "fastapi", specifier = "==0.121.0" }, { name = "faststream", extras = ["rabbit", "kafka", "confluent", "nats", "redis", "otel", "cli", "prometheus"] }, { name = "httpx", specifier = "==0.28.1" }, { name = "mdx-include", specifier = "==1.4.2" }, @@ -1013,7 +1014,7 @@ testing = [ { name = "covdefaults", specifier = ">=2.3.0" }, { name = "dirty-equals", specifier = "==0.10.0" }, { name = "email-validator", specifier = "==2.3.0" }, - { name = "fastapi", specifier = "==0.120.0" }, + { name = "fastapi", specifier = "==0.121.0" }, { name = "httpx", specifier = "==0.28.1" }, { name = "msgspec" }, { name = "opentelemetry-sdk", specifier = ">=1.24.0,<2.0.0" }, From 2368b5f1ed410d04f3adcd325b5e6341e22991b3 Mon Sep 17 00:00:00 2001 From: Pierre Bonneau Date: Mon, 10 Nov 2025 14:38:29 +0100 Subject: [PATCH 2/3] fix: Recreate group in case redis flush - fixing group creation in autoclaim + get_one message and iter. --- .../subscriber/usecases/stream_subscriber.py | 174 +++++++++++++----- 1 file changed, 123 insertions(+), 51 deletions(-) diff --git a/faststream/redis/subscriber/usecases/stream_subscriber.py b/faststream/redis/subscriber/usecases/stream_subscriber.py index 5ecb866e36..002b63bc38 100644 --- a/faststream/redis/subscriber/usecases/stream_subscriber.py +++ b/faststream/redis/subscriber/usecases/stream_subscriber.py @@ -85,6 +85,51 @@ async def _consume(self, *args: Any, start_signal: "Event") -> None: start_signal.set() await super()._consume(*args, start_signal=start_signal) + async def _create_group(self, reset_counter:bool = False) -> None: + if reset_counter: + group_create_id = "0" + else: + group_create_id = "$" if self.last_id == ">" else self.last_id + try: + await self._client.xgroup_create( + name=self.stream_sub.name, + id=group_create_id, + groupname=self.stream_sub.group, + mkstream=True, + ) + except ResponseError as e: + if "already exists" not in str(e): + raise + + def _protect_read_from_group_removal( + self, + read_func: Callable[[], Awaitable[ReadResponse]], + stream: "StreamSub", + ) -> Callable[[], Awaitable[ReadResponse]]: + async def _read_from_group_removal() -> ReadResponse: + try: + return await read_func() + except ResponseError as e: + err_msg = str(e) + known_error:bool = False + if "NOGROUP" in err_msg: + # most likely redis was flushed, so we need to reset our group + await self._create_group(reset_counter=True) + # Important: reset our internal position too + stream.last_id = ">" + known_error = True + if ( + "smaller than the first available entry" in err_msg + or "greater than the maximum id" in err_msg + ): + # group was modified by third party and we need to reset our position to an existing id + stream.last_id = "$" + known_error = True + if known_error: + return await read_func() + raise e + return _read_from_group_removal + @override async def start(self) -> None: client = self._client @@ -112,10 +157,7 @@ async def start(self) -> None: raise if stream.min_idle_time is None: - - def read( - _: str, - ) -> Awaitable[ReadResponse]: + def _xreadgroup_call() -> Awaitable[ReadResponse]: return client.xreadgroup( groupname=stream.group, consumername=stream.consumer, @@ -125,10 +167,18 @@ def read( noack=stream.no_ack, ) - else: + protected_read_func = self._protect_read_from_group_removal( + read_func=_xreadgroup_call, + stream=stream, + ) - async def read(_: str) -> ReadResponse: - stream_message = await client.xautoclaim( + async def read( + _: str, + ) -> ReadResponse: + return await protected_read_func() + else: + def _xautoclaim_call() -> Awaitable[Any]: + return client.xautoclaim( name=self.stream_sub.name, groupname=self.stream_sub.group, consumername=self.stream_sub.consumer, @@ -136,6 +186,15 @@ async def read(_: str) -> ReadResponse: start_id=self.autoclaim_start_id, count=1, ) + + protected_autoclaim = self._protect_read_from_group_removal( + read_func=_xautoclaim_call, + stream=stream, + ) + + async def read(_: str) -> ReadResponse: + stream_message = await protected_autoclaim() + stream_name = self.stream_sub.name.encode() (next_id, messages, _) = stream_message @@ -149,7 +208,6 @@ async def read(_: str) -> ReadResponse: return ((stream_name, messages),) else: - def read( last_id: str, ) -> Awaitable[ReadResponse]: @@ -161,6 +219,30 @@ def read( await super().start(read) + async def _get_one_message(self, timeout: float) -> None: + if self.stream_sub.group and self.stream_sub.consumer: + def _readgroup_call() -> Awaitable[ReadResponse]: + return self._client.xreadgroup( + groupname=self.stream_sub.group, + consumername=self.stream_sub.consumer, + streams={self.stream_sub.name: self.last_id}, + block=math.ceil(timeout * 1000), + count=1, + ) + + protected_read = self._protect_read_from_group_removal( + read_func=_readgroup_call, + stream=self.stream_sub, + ) + stream_message = await protected_read() # <-- Appel et attente de la fonction protégée + else: + stream_message = await self._client.xread( + {self.stream_sub.name: self.last_id}, + block=math.ceil(timeout * 1000), + count=1, + ) + return stream_message + @override async def get_one( self, @@ -170,34 +252,29 @@ async def get_one( assert not self.calls, ( "You can't use `get_one` method if subscriber has registered handlers." ) - if self.min_idle_time is None: - if self.stream_sub.group and self.stream_sub.consumer: - stream_message = await self._client.xreadgroup( - groupname=self.stream_sub.group, - consumername=self.stream_sub.consumer, - streams={self.stream_sub.name: self.last_id}, - block=math.ceil(timeout * 1000), - count=1, - ) - else: - stream_message = await self._client.xread( - {self.stream_sub.name: self.last_id}, - block=math.ceil(timeout * 1000), - count=1, - ) + if self.min_idle_time is None:# utilise _get_one_message corrigé ci-dessus + stream_message = await self._get_one_message(timeout) if not stream_message: return None ((stream_name, ((message_id, raw_message),)),) = stream_message else: - stream_message = await self._client.xautoclaim( - name=self.stream_sub.name, - groupname=self.stream_sub.group, - consumername=self.stream_sub.consumer, - min_idle_time=self.min_idle_time, - start_id=self.autoclaim_start_id, - count=1, + def _autoclaim_call() -> Awaitable[Any]: + return self._client.xautoclaim( + name=self.stream_sub.name, + groupname=self.stream_sub.group, + consumername=self.stream_sub.consumer, + min_idle_time=self.min_idle_time, + start_id=self.autoclaim_start_id, + count=1, + ) + + protected_autoclaim = self._protect_read_from_group_removal( + read_func=_autoclaim_call, + stream=self.stream_sub, ) + stream_message = await protected_autoclaim() + (next_id, messages, _) = stream_message # Update start_id for next call self.autoclaim_start_id = next_id @@ -241,33 +318,28 @@ async def __aiter__(self) -> AsyncIterator["RedisStreamMessage"]: # type: ignor while True: if self.min_idle_time is None: - if self.stream_sub.group and self.stream_sub.consumer: - stream_message = await self._client.xreadgroup( - groupname=self.stream_sub.group, - consumername=self.stream_sub.consumer, - streams={self.stream_sub.name: self.last_id}, - block=math.ceil(timeout * 1000), - count=1, - ) - else: - stream_message = await self._client.xread( - {self.stream_sub.name: self.last_id}, - block=math.ceil(timeout * 1000), - count=1, - ) + stream_message = await self._get_one_message(timeout) if not stream_message: continue ((stream_name, ((message_id, raw_message),)),) = stream_message else: - stream_message = await self._client.xautoclaim( - name=self.stream_sub.name, - groupname=self.stream_sub.group, - consumername=self.stream_sub.consumer, - min_idle_time=self.min_idle_time, - start_id=self.autoclaim_start_id, - count=1, + def _autoclaim_call() -> Awaitable[Any]: + return self._client.xautoclaim( + name=self.stream_sub.name, + groupname=self.stream_sub.group, + consumername=self.stream_sub.consumer, + min_idle_time=self.min_idle_time, + start_id=self.autoclaim_start_id, + count=1, + ) + + protected_autoclaim = self._protect_read_from_group_removal( + read_func=_autoclaim_call, + stream=self.stream_sub, ) + stream_message = await protected_autoclaim() + (next_id, messages, _) = stream_message # Update start_id for next call self.autoclaim_start_id = next_id From 182cf0b86741c0a559318e37d0baa18ea34af37b Mon Sep 17 00:00:00 2001 From: Pierre Bonneau Date: Mon, 10 Nov 2025 15:11:10 +0100 Subject: [PATCH 3/3] fix: Recreate group in case redis flush - fixing group creation in autoclaim + get_one message and iter. --- faststream/redis/subscriber/usecases/stream_subscriber.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/faststream/redis/subscriber/usecases/stream_subscriber.py b/faststream/redis/subscriber/usecases/stream_subscriber.py index 002b63bc38..8aa44c8c55 100644 --- a/faststream/redis/subscriber/usecases/stream_subscriber.py +++ b/faststream/redis/subscriber/usecases/stream_subscriber.py @@ -219,7 +219,7 @@ def read( await super().start(read) - async def _get_one_message(self, timeout: float) -> None: + async def _get_one_message(self, timeout: float) -> Optional[ReadResponse]: if self.stream_sub.group and self.stream_sub.consumer: def _readgroup_call() -> Awaitable[ReadResponse]: return self._client.xreadgroup( @@ -274,6 +274,8 @@ def _autoclaim_call() -> Awaitable[Any]: stream=self.stream_sub, ) stream_message = await protected_autoclaim() + if not stream_message: + return None (next_id, messages, _) = stream_message # Update start_id for next call