Skip to content

Commit e8d2cd9

Browse files
authored
Improvements to reduce deadlocks (#589)
1 parent a5a82c8 commit e8d2cd9

File tree

3 files changed

+27
-8
lines changed

3 files changed

+27
-8
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
exports.up = async function(knex) {
2+
await knex.schema.alterTable('journey_user_step', function(table) {
3+
table.index(['journey_id', 'type', 'delay_until'])
4+
table.dropIndex(['type', 'delay_until'])
5+
})
6+
}
7+
8+
exports.down = async function(knex) {
9+
await knex.schema.alterTable('journey_user_step', function(table) {
10+
table.index(['type', 'delay_until'])
11+
table.dropIndex(['journey_id', 'type', 'delay_until'])
12+
})
13+
}

apps/platform/src/lists/ListEvaluateUserJob.ts

-7
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
import App from '../app'
22
import { cacheIncr } from '../config/redis'
33
import { Job } from '../queue'
4-
import { JobPriority } from '../queue/Job'
54
import { getUser } from '../users/UserRepository'
65
import { DynamicList } from './List'
76
import { CacheKeys, cleanupList, evaluateUserList, getList } from './ListService'
@@ -17,12 +16,6 @@ interface ListEvaluateUserParams {
1716
export default class ListEvaluateUserJob extends Job {
1817
static $name = 'list_evaluate_user_job'
1918

20-
options = {
21-
delay: 0,
22-
attempts: 3,
23-
priority: JobPriority.low,
24-
}
25-
2619
static from(params: ListEvaluateUserParams): ListEvaluateUserJob {
2720
return new this(params)
2821
}

apps/platform/src/utilities/index.ts

+14-1
Original file line numberDiff line numberDiff line change
@@ -224,9 +224,22 @@ export const chunk = async <T>(
224224
modifier: (result: any) => T = (result) => result,
225225
) => {
226226
const chunker = new Chunker(callback, size)
227+
const handler = async (result: any, retries = 3) => {
228+
try {
229+
await chunker.add(modifier(result))
230+
} catch (error: any) {
231+
232+
// In the case of deadlocks, retry the operation
233+
if (['ER_LOCK_WAIT_TIMEOUT', 'ER_LOCK_DEADLOCK'].includes(error.code) && retries > 0) {
234+
setTimeout(() => handler(result, retries - 1), 250)
235+
} else {
236+
throw error
237+
}
238+
}
239+
}
227240
await query.stream(async function(stream) {
228241
for await (const result of stream) {
229-
await chunker.add(modifier(result))
242+
await handler(result)
230243
}
231244
})
232245
await chunker.flush()

0 commit comments

Comments
 (0)