Skip to content

Conversation

JonathanSerafini
Copy link
Contributor

@JonathanSerafini JonathanSerafini commented Oct 10, 2025

Description

This change adds an initial implementation for an optional reading mode for the redis stream
subscriber whereby it will attempt to read messages from the PEL first and then continue on
reading new messages on the stream.

At present this mode is an optional and disable so as to not alter the existing behaviour.

On it's own, this change doesn't "do" much. However, by utilizing the resume_from method
it now becomes possible to periodically have the consumer switch from reading new messages
to reading from the PEL.

In a subsequent PR, i could add the resumable option to the subscriber settings and potentially
also add a background task to monitor the pending list to set resume_from to consume pending.

Type of change

Please delete options that are not relevant.

  • Documentation (typos, code examples, or any documentation updates)
  • Bug fix (a non-breaking change that resolves an issue)
  • New feature (a non-breaking change that adds functionality)
  • Breaking change (a fix or feature that would disrupt existing functionality)
  • This change requires a documentation update

Checklist

  • My code adheres to the style guidelines of this project (just lint shows no errors)
  • I have conducted a self-review of my own code
  • I have made the necessary changes to the documentation
  • My changes do not generate any new warnings
  • I have added tests to validate the effectiveness of my fix or the functionality of my new feature
  • Both new and existing unit tests pass successfully on my local environment by running just test-coverage
  • I have ensured that static analysis tests are passing by running just static-analysis
  • I have included code examples to illustrate the modifications

@github-actions github-actions bot added the Redis Issues related to `faststream.redis` module and Redis features label Oct 10, 2025
@JonathanSerafini JonathanSerafini changed the title [redis] add optional redis PEL reading support feat: [redis] add optional redis PEL reading support Oct 10, 2025
@JonathanSerafini
Copy link
Contributor Author

@Lancetnik I was wondering if you might have an opinion on this implementation for reading of pending redis messages

@Lancetnik
Copy link
Member

@JonathanSerafini sorry, I am nor sure, that I got your second idea correctly. Can you explain more detail, please? Probably, with pseudocode. But, I like your approach

@JonathanSerafini
Copy link
Contributor Author

@Lancetnik Sure ...
primary goal right now is to add support for reading the pending entries without needing big changes or forcing a particular workflow. the way i'm currently thinking it could be used would be as follows ...

subscriber = broker.subscriber(stream=queue)

@subscriber
async def handler(msg): ...

async def pending_monitor(subscriber: StreamSubscriber, interval: float, idle: int) -> None:
  while True:
      # optionally get first xpending message
      # optionally filter xpending by idle
      # or optionally walk the list of xpending_range and delete those where delivery_attempts too high
      if has_pending:
          subscriber.read_pending = True
      else:
          subscriber.read_pending = False
      await asyncio.sleep(interval)
      
subscriber.add_task(pending_monitor, subscriber, 15, 30 * 1000)

above is pseudo-code ... it doesn't quite work with the declarative approach out of the box
though i have managed to get something like this working by wrapping the RedisBroker.start and adding the tasks after a super().start.

would be cleaner if it was either builtin support or subscribers had some sort of support for registering tasks as part of the decorator.

@Lancetnik
Copy link
Member

would be cleaner if it was either builtin support or subscribers had some sort of support for registering tasks as part of the decorator.

Not "tasks" exactly, but we made smth close here - https://faststream.ag2.ai/latest/howto/nats/in-progress/

@JonathanSerafini JonathanSerafini force-pushed the redis-optional-read-pel branch 2 times, most recently from e320405 to d47d4ff Compare October 14, 2025 09:09
@JonathanSerafini JonathanSerafini marked this pull request as ready for review October 14, 2025 10:21
@JonathanSerafini
Copy link
Contributor Author

Not "tasks" exactly, but we made smth close here - https://faststream.ag2.ai/latest/howto/nats/in-progress/

By the looks of it, it does seem like a background task would be warranted given that it's more "subscriber lifecycle" than it is "message lifecycle" as it is for NATs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Redis Issues related to `faststream.redis` module and Redis features

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants