@@ -5,17 +5,6 @@ import { getConfig } from "../../utils/cache/getConfig";
5
5
import { redis } from "../../utils/redis/redis" ;
6
6
import { defaultJobOptions } from "./queues" ;
7
7
8
- export const PROCESS_TRANSACTION_RECEIPTS_QUEUE_NAME =
9
- "process-transaction-receipts" ;
10
-
11
- // Queue
12
- const _queue = redis
13
- ? new Queue < string > ( PROCESS_TRANSACTION_RECEIPTS_QUEUE_NAME , {
14
- connection : redis ,
15
- defaultJobOptions,
16
- } )
17
- : null ;
18
-
19
8
// Each job handles a block range for a given chain, filtered by addresses + events.
20
9
export type EnqueueProcessTransactionReceiptsData = {
21
10
chainId : number ;
@@ -27,30 +16,35 @@ export type EnqueueProcessTransactionReceiptsData = {
27
16
toBlock : number ; // inclusive
28
17
} ;
29
18
30
- export const enqueueProcessTransactionReceipts = async (
31
- data : EnqueueProcessTransactionReceiptsData ,
32
- ) => {
33
- if ( ! _queue ) return ;
19
+ export class ProcessTransactionReceiptsQueue {
20
+ private static name = "process-transaction-receipts" ;
34
21
35
- const serialized = superjson . stringify ( data ) ;
36
- // e.g. 8453:14423125-14423685
37
- const jobName = `${ data . chainId } :${ data . fromBlock } -${ data . toBlock } ` ;
38
- const { contractSubscriptionsRequeryDelaySeconds } = await getConfig ( ) ;
39
- const requeryDelays = contractSubscriptionsRequeryDelaySeconds . split ( "," ) ;
22
+ static q = new Queue < string > ( this . name , {
23
+ connection : redis ,
24
+ defaultJobOptions,
25
+ } ) ;
40
26
41
- // Enqueue one job immediately and any delayed jobs.
42
- await _queue . add ( jobName , serialized ) ;
27
+ static add = async ( data : EnqueueProcessTransactionReceiptsData ) => {
28
+ const serialized = superjson . stringify ( data ) ;
29
+ // e.g. 8453:14423125-14423685
30
+ const jobName = `${ data . chainId } :${ data . fromBlock } -${ data . toBlock } ` ;
31
+ const { contractSubscriptionsRequeryDelaySeconds } = await getConfig ( ) ;
32
+ const requeryDelays = contractSubscriptionsRequeryDelaySeconds . split ( "," ) ;
43
33
44
- // The last attempt should attempt repeatedly to handle extended RPC issues.
45
- // This backoff attempts at intervals:
46
- // 30s, 1m, 2m, 4m, 8m, 16m, 32m, ~1h, ~2h, ~4h
47
- for ( let i = 0 ; i < requeryDelays . length ; i ++ ) {
48
- const delay = parseInt ( requeryDelays [ i ] ) * 1000 ;
49
- const attempts = i === requeryDelays . length - 1 ? 10 : 0 ;
50
- await _queue . add ( jobName , serialized , {
51
- delay,
52
- attempts,
53
- backoff : { type : "exponential" , delay : 30_000 } ,
54
- } ) ;
55
- }
56
- } ;
34
+ // Enqueue one job immediately and any delayed jobs.
35
+ await this . q . add ( jobName , serialized ) ;
36
+
37
+ // The last attempt should attempt repeatedly to handle extended RPC issues.
38
+ // This backoff attempts at intervals:
39
+ // 30s, 1m, 2m, 4m, 8m, 16m, 32m, ~1h, ~2h, ~4h
40
+ for ( let i = 0 ; i < requeryDelays . length ; i ++ ) {
41
+ const delay = parseInt ( requeryDelays [ i ] ) * 1000 ;
42
+ const attempts = i === requeryDelays . length - 1 ? 10 : 0 ;
43
+ await this . q . add ( jobName , serialized , {
44
+ delay,
45
+ attempts,
46
+ backoff : { type : "exponential" , delay : 30_000 } ,
47
+ } ) ;
48
+ }
49
+ } ;
50
+ }
0 commit comments