Skip to content

Commit a9206ce

Browse files
authored
add dynamic batch size to PersistedQueue based on pending takers (#5816)
1 parent 49d15de commit a9206ce

File tree

3 files changed

+148
-63
lines changed

3 files changed

+148
-63
lines changed

.changeset/vast-olives-doubt.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@effect/experimental": patch
3+
"@effect/sql": patch
4+
---
5+
6+
add dynamic batch size to PersistedQueue based on pending takers

packages/experimental/src/PersistedQueue/Redis.ts

Lines changed: 99 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@ import * as Effect from "effect/Effect"
99
import * as Exit from "effect/Exit"
1010
import * as Layer from "effect/Layer"
1111
import * as Mailbox from "effect/Mailbox"
12+
import * as MutableRef from "effect/MutableRef"
13+
import * as Option from "effect/Option"
1214
import * as RcMap from "effect/RcMap"
15+
import * as Schedule from "effect/Schedule"
1316
import type { RedisOptions } from "ioredis"
1417
import { Redis } from "ioredis"
1518
import * as PersistedQueue from "../PersistedQueue.js"
@@ -27,8 +30,9 @@ interface RedisWithQueue extends Redis {
2730
keyPending: string,
2831
prefix: string,
2932
workerId: string,
33+
batchSize: number,
3034
pttl: number
31-
): Promise<string>
35+
): Promise<Array<string> | null>
3236
complete(
3337
keyPending: string,
3438
keyLock: string,
@@ -50,10 +54,13 @@ interface RedisWithQueue extends Redis {
5054
export const make = Effect.fnUntraced(function*(
5155
options: RedisOptions & {
5256
readonly prefix?: string | undefined
57+
readonly pollInterval?: Duration.DurationInput | undefined
5358
readonly lockRefreshInterval?: Duration.DurationInput | undefined
5459
readonly lockExpiration?: Duration.DurationInput | undefined
5560
}
5661
) {
62+
const pollInterval = options.pollInterval ? Duration.decode(options.pollInterval) : Duration.seconds(1)
63+
5764
const acquireClient = Effect.gen(function*() {
5865
const redis = yield* Effect.acquireRelease(
5966
Effect.sync(() => new Redis(options) as RedisWithQueue),
@@ -70,7 +77,7 @@ local payload = ARGV[2]
7077
7178
redis.call("DEL", key_lock)
7279
redis.call("HDEL", key_pending, id)
73-
redis.call("LPUSH", key_queue, payload)
80+
redis.call("RPUSH", key_queue, payload)
7481
`,
7582
numberOfKeys: 3,
7683
readOnly: false
@@ -111,20 +118,22 @@ local key_queue = KEYS[1]
111118
local key_pending = KEYS[2]
112119
local prefix = ARGV[1]
113120
local worker_id = ARGV[2]
114-
local pttl = ARGV[3]
121+
local batch_size = tonumber(ARGV[3])
122+
local pttl = ARGV[4]
115123
116-
local item = redis.call("BLPOP", key_queue, 0)
117-
if not item then
124+
local payloads = redis.call("LPOP", key_queue, batch_size)
125+
if not payloads then
118126
return nil
119127
end
120128
121-
local payload = item[2]
122-
local id = cjson.decode(payload).id
123-
local key_lock = prefix .. id .. ":lock"
124-
redis.call("SET", key_lock, worker_id, "PX", pttl)
125-
redis.call("HSET", key_pending, id, payload)
129+
for i, payload in ipairs(payloads) do
130+
local id = cjson.decode(payload).id
131+
local key_lock = prefix .. id .. ":lock"
132+
redis.call("SET", key_lock, worker_id, "PX", pttl)
133+
redis.call("HSET", key_pending, id, payload)
134+
end
126135
127-
return payload
136+
return payloads
128137
`,
129138
numberOfKeys: 2,
130139
readOnly: false
@@ -162,46 +171,78 @@ return payload
162171
if (locks[i] !== null) continue
163172
toReset.push(elements[i][1])
164173
}
165-
yield* Effect.promise(() => offerClient.lpush(keyQueue(name), ...toReset))
174+
if (toReset.length === 0) return
175+
yield* Effect.promise(() => offerClient.rpush(keyQueue(name), ...toReset))
166176
}),
167177
capacity: Number.MAX_SAFE_INTEGER,
168178
timeToLive: Duration.minutes(15)
169179
})
170180

171-
const clients = yield* RcMap.make({
181+
const mailboxes = yield* RcMap.make({
172182
lookup: Effect.fnUntraced(function*(name: string) {
173183
yield* resetPending.get(name)
174184
const redis = yield* acquireClient
175-
const clientId = yield* Effect.promise(() => redis.client("ID"))
176-
177-
return { clientId, redis } as const
178-
}),
179-
idleTimeToLive: Duration.seconds(15)
180-
})
185+
const queueKey = keyQueue(name)
186+
const pendingKey = keyPending(name)
187+
const mailbox = yield* Mailbox.make<RedisElement>()
188+
const takers = MutableRef.make(0)
189+
const pollLatch = Effect.unsafeMakeLatch()
190+
const takenLatch = Effect.unsafeMakeLatch()
181191

182-
const mailboxes = yield* RcMap.make({
183-
lookup: Effect.fnUntraced(function*(name: string) {
184-
const { clientId, redis } = yield* RcMap.get(clients, name)
185-
const mailbox = yield* Mailbox.make<RedisElement>({ capacity: 0 })
192+
yield* Effect.addFinalizer(() =>
193+
Effect.flatMap(
194+
mailbox.clear,
195+
(elements) =>
196+
elements.length === 0
197+
? Effect.void
198+
: Effect.promise(() =>
199+
Promise.all(Array.from(elements, (element) =>
200+
offerClient.requeue(
201+
queueKey,
202+
pendingKey,
203+
keyLock(element.id),
204+
element.id,
205+
JSON.stringify(element)
206+
)))
207+
)
208+
)
209+
)
186210

187-
yield* Effect.promise(() =>
211+
const poll = Effect.promise(() =>
188212
redis.take(
189-
keyQueue(name),
190-
keyPending(name),
213+
queueKey,
214+
pendingKey,
191215
prefix,
192216
workerId,
217+
takers.current,
193218
lockExpirationMillis
194219
)
195-
).pipe(
196-
Effect.onInterrupt(() => Effect.promise(() => offerClient.client("UNBLOCK", clientId))),
197-
Effect.flatMap((payload) => payload ? mailbox.offer(JSON.parse(payload)) : Effect.void),
198-
Effect.forever,
220+
)
221+
222+
yield* Effect.gen(function*() {
223+
while (true) {
224+
yield* pollLatch.await
225+
yield* Effect.yieldNow()
226+
const results = takers.current === 0 ? null : yield* poll
227+
if (results === null) {
228+
yield* Effect.sleep(pollInterval)
229+
continue
230+
}
231+
takenLatch.unsafeClose()
232+
yield* mailbox.offerAll(results.map((json) => JSON.parse(json)))
233+
yield* takenLatch.await
234+
yield* Effect.yieldNow()
235+
}
236+
}).pipe(
237+
Effect.sandbox,
238+
Effect.retry(Schedule.spaced(500)),
199239
Effect.forkScoped,
200240
Effect.interruptible
201241
)
202242

203-
return mailbox
204-
})
243+
return { mailbox, takers, pollLatch, takenLatch } as const
244+
}),
245+
idleTimeToLive: Duration.seconds(15)
205246
})
206247

207248
const activeLockKeys = new Set<string>()
@@ -236,9 +277,23 @@ return payload
236277
})
237278
}),
238279
take: (options) =>
239-
Effect.uninterruptibleMask((restore) =>
240-
RcMap.get(mailboxes, options.name).pipe(
241-
Effect.flatMap((m) => Effect.orDie(restore(m.take))),
280+
Effect.uninterruptibleMask((restore) => {
281+
return RcMap.get(mailboxes, options.name).pipe(
282+
Effect.flatMap(({ mailbox, pollLatch, takenLatch, takers }) => {
283+
takers.current++
284+
if (takers.current === 1) {
285+
pollLatch.unsafeOpen()
286+
}
287+
return Effect.tap(restore(mailbox.take as Effect.Effect<RedisElement>), () => {
288+
takers.current--
289+
if (takers.current === 0) {
290+
pollLatch.unsafeClose()
291+
takenLatch.unsafeOpen()
292+
} else if (Option.getOrUndefined(mailbox.unsafeSize()) === 0) {
293+
takenLatch.unsafeOpen()
294+
}
295+
})
296+
}),
242297
Effect.scoped,
243298
Effect.tap((element) => {
244299
const lock = keyLock(element.id)
@@ -293,16 +348,22 @@ return payload
293348
}))
294349
})
295350
)
296-
)
351+
})
297352
})
298353
})
299354

300355
/**
301356
* @since 1.0.0
302357
* @category Layers
303358
*/
304-
export const layerStore = (options: RedisOptions & { readonly prefix?: string | undefined }) =>
305-
Layer.scoped(PersistedQueue.PersistedQueueStore, make(options))
359+
export const layerStore = (
360+
options: RedisOptions & {
361+
readonly prefix?: string | undefined
362+
readonly pollInterval?: Duration.DurationInput | undefined
363+
readonly lockRefreshInterval?: Duration.DurationInput | undefined
364+
readonly lockExpiration?: Duration.DurationInput | undefined
365+
}
366+
) => Layer.scoped(PersistedQueue.PersistedQueueStore, make(options))
306367

307368
/**
308369
* @since 1.0.0

0 commit comments

Comments
 (0)