@@ -26,6 +26,185 @@ def subscriber(
2626 release_stuck_timeout : int ,
2727 ack_policy : AckPolicy = AckPolicy .NACK_ON_ERROR ,
2828 ) -> Any :
29+ """
30+ Args:
31+ max_workers:
32+ Number of workers to process messages concurrently.
33+ min_fetch_interval:
34+ The minimum allowed interval between consecutive fetches. The minimum
35+ interval is used if the last fetch returned the same number of messages
36+ as the fetch's limit.
37+ max_fetch_interval:
38+ The maximum allowed interval between consecutive fetches. The maximum
39+ interval is used if the last fetch returned fewer messages than the
40+ fetch's limit.
41+ fetch_batch_size:
42+ The maximum allowed number of messages to fetch in a single batch. A
43+ fetch's actual limit might be lower if the free capacity of the
44+ acquired-but-not-yet-in-processing buffer is smaller.
45+ overfetch_factor:
46+ The factor by which the fetch_batch_size is multiplied to determine the
47+ capacity of the acquired-but-not-yet-in-processing buffer.
48+ flush_interval:
49+ The interval at which the state of messages for which the processing
50+ attempt has been completed or aborted is flushed to the database.
51+ release_stuck_interval:
52+ The interval at which the PROCESSING-state messages are marked back as
53+ PENDING if the release_stuck_timeout since acquired_at has passed.
54+
55+ Flow:
56+ On start, the subscriber spawns four types of concurrent loops:
57+
58+ 1. Fetch loop:
59+ Periodically fetches PENDING or RETRYABLE messages from the database,
60+ simultaneously updating them in the database: marking as PROCESSING,
61+ setting acquired_at to now, and incrementing attempts_count. Only messages
62+ with next_attempt_at <= now are fetched, ordered by next_attempt_at. The
63+ fetched messages are placed into an internal queue. The fetch limit is
64+ the minimum of fetch_batch_size and the free buffer capacity (fetch_batch_size
65+ * overfetch_factor minus currently queued messages). If the last fetch was
66+ "full" (returned as many messages as the limit), the next fetch happens
67+ after min_fetch_interval; otherwise after max_fetch_interval.
68+
69+ 2. Worker loops (max_workers instances):
70+ Each worker takes a message from the internal queue and checks if the
71+ attempt is allowed by the retry_strategy. If allowed, the message is
72+ processed, if not, Reject'ed. Depending on the processing result, AckPolicy,
73+ and manual Ack/Nack/Reject, the message is Ack'ed, Nack'ed, or Reject'ed.
74+ For Nack'ed messages the retry_strategy is consulted to determine if and
75+ when the message might be retried. If allowed to be retried, the message is
76+ marked as RETRYABLE, otherwise as FAILED. Ack'ed messages are marked as
77+ COMPLETED and Reject'ed messages are marked as FAILED. The message is then
78+ buffered for flushing.
79+
80+ 3. Flush loop:
81+ Periodically flushes the buffered message state changes to the database.
82+ COMPLETED and FAILED messages are moved from the primary table to the archive
83+ table. The state of RETRYABLE messages is updated in the primary table.
84+
85+ 4. Release stuck loop:
86+ Periodically releases messages that have been stuck in PROCESSING state
87+ for longer than release_stuck_timeout since acquired_at. These messages
88+ are marked back as PENDING.
89+
90+ On stop, all loops are gracefully stopped. Messages that have been acquired
91+ but are not yet being processed are drained from the internal queue and marked
92+ back as PENDING. The subscriber waits for all tasks to complete within
93+ graceful_shutdown_timeout, then performs a final flush.
94+
95+ Notes:
96+ This design adheres to the "at least once" processing guarantee because flushing
97+ changes to the database happens only after a processing attempt. Messages might
98+ be processed more times than allowed by the retry_strategy if, among other
99+ things, the flush doesn't happen due to crash or failure after a message is
100+ processed.
101+
102+ This design handles the poison message problem (messages that crash the worker
103+ without the ability to catch the exception due to e.g. OOM terminations) because
104+ attempts_count is incremented and retry_strategy is consulted with prior to
105+ processing.
106+
107+ SQL queries:
108+ Fetch:
109+ WITH ready AS
110+ (SELECT message.id AS id,
111+ message.queue AS queue,
112+ message.payload AS payload,
113+ message.state AS state,
114+ message.attempts_count AS attempts_count,
115+ message.created_at AS created_at,
116+ message.first_attempt_at AS first_attempt_at,
117+ message.next_attempt_at AS next_attempt_at,
118+ message.last_attempt_at AS last_attempt_at,
119+ message.acquired_at AS acquired_at
120+ FROM message
121+ WHERE (message.state = $3::sqlamessagestate
122+ OR message.state = $4::sqlamessagestate)
123+ AND message.next_attempt_at <= now()
124+ AND message.queue = $5::VARCHAR
125+ ORDER BY message.next_attempt_at
126+ LIMIT $6::INTEGER
127+ FOR UPDATE SKIP LOCKED),
128+ updated AS
129+ (UPDATE message
130+ SET state=$1::sqlamessagestate,
131+ attempts_count=(message.attempts_count + $2::SMALLINT),
132+ acquired_at=now()
133+ WHERE message.id IN
134+ (SELECT ready.id
135+ FROM ready) RETURNING message.id,
136+ message.queue,
137+ message.payload,
138+ message.state,
139+ message.attempts_count,
140+ message.created_at,
141+ message.first_attempt_at,
142+ message.next_attempt_at,
143+ message.last_attempt_at,
144+ message.acquired_at)
145+ SELECT updated.id,
146+ updated.queue,
147+ updated.payload,
148+ updated.state,
149+ updated.attempts_count,
150+ updated.created_at,
151+ updated.first_attempt_at,
152+ updated.next_attempt_at,
153+ updated.last_attempt_at,
154+ updated.acquired_at
155+ FROM updated
156+ ORDER BY updated.next_attempt_at;
157+
158+ Flush:
159+ For RETRYABLE messages:
160+ UPDATE message
161+ SET state=$1::sqlamessagestate,
162+ first_attempt_at=$2::datetime,
163+ next_attempt_at=$3::datetime,
164+ last_attempt_at=$4::datetime,
165+ acquired_at=$5::datetime
166+ WHERE message.id = $6::BIGINT;
167+
168+ For COMPLETED and FAILED messages:
169+ BEGIN;
170+ INSERT INTO message_archive (
171+ id,
172+ queue,
173+ payload,
174+ state,
175+ attempts_count,
176+ created_at,
177+ first_attempt_at,
178+ last_attempt_at,
179+ archived_at
180+ )
181+ VALUES (
182+ $1::BIGINT,
183+ $2::VARCHAR,
184+ $3::BYTEA,
185+ $4::sqlamessagestate,
186+ $5::SMALLINT,
187+ $6::TIMESTAMP WITH TIME ZONE,
188+ $7::TIMESTAMP WITH TIME ZONE,
189+ $8::TIMESTAMP WITH TIME ZONE,
190+ $9::TIMESTAMP WITH TIME ZONE
191+ );
192+ DELETE
193+ FROM message
194+ WHERE message.id IN ($1::BIGINT);
195+ COMMIT;
196+
197+ Release stuck:
198+ UPDATE message
199+ SET state=$1::sqlamessagestate,
200+ next_attempt_at=now(),
201+ acquired_at=$2::TIMESTAMP WITH TIME ZONE
202+ WHERE message.id IN
203+ (SELECT message.id
204+ FROM message
205+ WHERE message.state = $3::sqlamessagestate
206+ AND message.acquired_at < $4::TIMESTAMP WITH TIME ZONE)
207+ """
29208 workers = max_workers or 1
30209
31210 subscriber = create_subscriber (
0 commit comments