Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Description
Work-in-progress for a relational database based broker.
Fixes #799
Motivation
The primary benefit of a message queue built on top of a relational database is the ability to insert messages transactionally, atomically with other database operations, thus enabling the transactional outbox pattern. Also, the relational database is usually the most readiliy available, already provisioned piece of infrastructure for a given service. While implementing all patterns / semantics of full-blown message queue / streaming platform (e.g. kafka-like partitions-enabled horizontal scaling with local ordering, etc.) would be problematic, given a proper understanding of the trade-offs involved, the relational database based queue would be an appropriate tool for many low-to-medium throughput, latency-tolerant uses, including as parts of a larger messaging flow that involves a "proper" queue (e.g. as transactional layer between a service and a queue).
Design
The key components are in message.py, usecase.py, client.py.
Flow:
On start, the subscriber spawns four types of concurrent loops:
Fetch loop: Periodically fetches PENDING or RETRYABLE messages from the database, simultaneously updating them in the database: marking as PROCESSING, setting acquired_at to now, and incrementing attempts_count. Only messages with next_attempt_at <= now are fetched, ordered by next_attempt_at. The fetched messages are placed into an internal queue. The fetch limit is the minimum of fetch_batch_size and the free buffer capacity (fetch_batch_size * overfetch_factor minus currently queued messages). If the last fetch was "full" (returned as many messages as the limit), the next fetch happens after min_fetch_interval; otherwise after max_fetch_interval.
Worker loops (max_workers instances): Each worker takes a message from the internal queue and checks if the attempt is allowed by the retry_strategy. If allowed, the message is processed, if not, Reject'ed. Depending on the processing result, AckPolicy, and manual Ack/Nack/Reject, the message is Ack'ed, Nack'ed, or Reject'ed. For Nack'ed messages the retry_strategy is consulted to determine if and when the message might be retried. If allowed to be retried, the message is marked as RETRYABLE, otherwise as FAILED. Ack'ed messages are marked as COMPLETED and Reject'ed messages are marked as FAILED. The message is then buffered for flushing.
Flush loop: Periodically flushes the buffered message state changes to the database. COMPLETED and FAILED messages are moved from the primary table to the archive table. The state of RETRYABLE messages is updated in the primary table.
Release stuck loop: Periodically releases messages that have been stuck in PROCESSING state for longer than release_stuck_timeout since acquired_at. These messages are marked back as PENDING.
On stop, all loops are gracefully stopped. Messages that have been acquired but are not yet being processed are drained from the internal queue and marked back as PENDING. The subscriber waits for all tasks to complete within graceful_shutdown_timeout, then performs a final flush.
Notes:
This design allows for work sharing between processes/nodes because "SELECT FOR UPDATE SKIP LOCKED" is utilized.
This design adheres to the "at least once" processing guarantee because flushing changes to the database happens only after a processing attempt. Messages might be processed more times than allowed by the retry_strategy if, among other things, the flush doesn't happen due to crash or failure after a message is processed.
This design handles the poison message problem (messages that crash the worker without the ability to catch the exception due to e.g. OOM terminations) because attempts_count is incremented and retry_strategy is consulted with prior to processing attempt.
Why not use LISTEN/NOTIFY? It is specific to Postgres, while it is preferable to start with functionality universal to any database. When using multiple nodes/processes, distributing messages among them would still require "SELECT FOR UPDATE SKIP LOCKED", because the notification will be delivered to all nodes/processes. A notification may also fail to arrive, especially if a node restarts. That is, polling is needed in any case. And once polling is in place, listen/notify can be integrated to “wake up” the polling loop earlier than as per the interval-based schedule.
Type of change
Please delete options that are not relevant.
Checklist
just lintshows no errors)just test-coveragejust static-analysis