@@ -3,8 +3,10 @@ import { AbiEvent } from "abitype";
3
3
import { Job , Processor , Worker } from "bullmq" ;
4
4
import superjson from "superjson" ;
5
5
import {
6
+ Address ,
6
7
Chain ,
7
8
PreparedEvent ,
9
+ ThirdwebContract ,
8
10
defineChain ,
9
11
eth_getBlockByHash ,
10
12
getContract ,
@@ -48,7 +50,6 @@ const handler: Processor<any, void, string> = async (job: Job<string>) => {
48
50
49
51
// Store logs to DB.
50
52
const insertedLogs = await bulkInsertContractEventLogs ( { logs } ) ;
51
-
52
53
if ( insertedLogs . length === 0 ) {
53
54
return ;
54
55
}
@@ -103,6 +104,10 @@ export const getWebhooksByContractAddresses = async (
103
104
104
105
type GetLogsParams = EnqueueProcessEventLogsData ;
105
106
107
+ /**
108
+ * Gets all event logs for the subscribed addresses and filters.
109
+ * @returns A list of logs to insert to the ContractEventLogs table.
110
+ */
106
111
const getLogs = async ( {
107
112
chainId,
108
113
fromBlock,
@@ -114,22 +119,32 @@ const getLogs = async ({
114
119
}
115
120
116
121
const chain = defineChain ( chainId ) ;
122
+ const addressConfig : Record <
123
+ Address ,
124
+ {
125
+ contract : ThirdwebContract ;
126
+ }
127
+ > = { } ;
128
+ for ( const filter of filters ) {
129
+ addressConfig [ filter . address ] = {
130
+ contract : getContract ( {
131
+ client : thirdwebClient ,
132
+ chain,
133
+ address : filter . address ,
134
+ } ) ,
135
+ } ;
136
+ }
117
137
118
138
// Get events for each contract address. Apply any filters.
119
139
const promises = filters . map ( async ( f ) => {
120
- const contract = getContract ( {
121
- client : thirdwebClient ,
122
- chain,
123
- address : f . address ,
124
- } ) ;
140
+ const { contract } = addressConfig [ f . address ] ;
125
141
126
142
// Get events to filter by, if any.
127
143
const events : PreparedEvent < AbiEvent > [ ] = [ ] ;
128
- if ( f . events . length ) {
144
+ if ( f . events . length > 0 ) {
129
145
const abi = await resolveContractAbi < AbiEvent [ ] > ( contract ) ;
130
- for ( const event of f . events ) {
131
- const signature = abi . find ( ( a ) => a . name === event ) ;
132
- if ( signature ) {
146
+ for ( const signature of abi ) {
147
+ if ( f . events . includes ( signature . name ) ) {
133
148
events . push ( prepareEvent ( { signature } ) ) ;
134
149
}
135
150
}
@@ -142,33 +157,88 @@ const getLogs = async ({
142
157
events,
143
158
} ) ;
144
159
} ) ;
160
+
145
161
// Query and flatten all events.
146
- const allEvents = ( await Promise . all ( promises ) ) . flat ( ) ;
162
+ const allLogs = ( await Promise . all ( promises ) ) . flat ( ) ;
147
163
148
- const blockHashes = allEvents . map ( ( e ) => e . blockHash ) ;
164
+ // Get timestamps for blocks.
165
+ const blockHashes = allLogs . map ( ( e ) => e . blockHash ) ;
149
166
const blockTimestamps = await getBlockTimestamps ( chain , blockHashes ) ;
150
167
151
168
// Transform logs into the DB schema.
152
- return allEvents . map (
153
- ( event ) : Prisma . ContractEventLogsCreateInput => ( {
154
- chainId,
155
- blockNumber : Number ( event . blockNumber ) ,
156
- contractAddress : event . address . toLowerCase ( ) ,
157
- transactionHash : event . transactionHash ,
158
- topic0 : event . topics [ 0 ] ,
159
- topic1 : event . topics [ 1 ] ,
160
- topic2 : event . topics [ 2 ] ,
161
- topic3 : event . topics [ 3 ] ,
162
- data : event . data ,
163
- eventName : event . eventName ,
164
- decodedLog : event . args ?? { } ,
165
- timestamp : blockTimestamps [ event . blockHash ] ,
166
- transactionIndex : event . transactionIndex ,
167
- logIndex : event . logIndex ,
168
- } ) ,
169
+ return await Promise . all (
170
+ allLogs . map (
171
+ async ( log ) : Promise < Prisma . ContractEventLogsCreateInput > => ( {
172
+ chainId,
173
+ blockNumber : Number ( log . blockNumber ) ,
174
+ contractAddress : log . address . toLowerCase ( ) ,
175
+ transactionHash : log . transactionHash ,
176
+ topic0 : log . topics [ 0 ] ,
177
+ topic1 : log . topics [ 1 ] ,
178
+ topic2 : log . topics [ 2 ] ,
179
+ topic3 : log . topics [ 3 ] ,
180
+ data : log . data ,
181
+ eventName : log . eventName ,
182
+ decodedLog : await formatDecodedLog ( {
183
+ contract :
184
+ addressConfig [ log . address . toLowerCase ( ) as Address ] . contract ,
185
+ eventName : log . eventName ,
186
+ logArgs : log . args as Record < string , unknown > ,
187
+ } ) ,
188
+ timestamp : blockTimestamps [ log . blockHash ] ,
189
+ transactionIndex : log . transactionIndex ,
190
+ logIndex : log . logIndex ,
191
+ } ) ,
192
+ ) ,
169
193
) ;
170
194
} ;
171
195
196
+ /**
197
+ * Transform v5 SDK to v4 log format.
198
+ *
199
+ * Example input:
200
+ * {
201
+ * "to": "0x123...",
202
+ * "quantity": 2n
203
+ * }
204
+ *
205
+ * Example output:
206
+ * {
207
+ * "to": {
208
+ * "type:" "address",
209
+ * "value": "0x123..."
210
+ * },
211
+ * "quantity": {
212
+ * "type:" "uint256",
213
+ * "value": "2"
214
+ * },
215
+ * }
216
+ */
217
+ const formatDecodedLog = async ( args : {
218
+ contract : ThirdwebContract ;
219
+ eventName : string ;
220
+ logArgs : Record < string , unknown > ;
221
+ } ) : Promise < Record < string , Prisma . InputJsonObject > | undefined > => {
222
+ const { contract, eventName, logArgs } = args ;
223
+
224
+ const abi = await resolveContractAbi < AbiEvent [ ] > ( contract ) ;
225
+ const eventSignature = abi . find ( ( a ) => a . name === eventName ) ;
226
+ if ( ! eventSignature ) {
227
+ return ;
228
+ }
229
+
230
+ const res : Record < string , Prisma . InputJsonObject > = { } ;
231
+ for ( const { name, type } of eventSignature . inputs ) {
232
+ if ( name && name in logArgs ) {
233
+ res [ name ] = {
234
+ type,
235
+ value : ( logArgs [ name ] as any ) . toString ( ) ,
236
+ } ;
237
+ }
238
+ }
239
+ return res ;
240
+ } ;
241
+
172
242
/**
173
243
* Gets the timestamps for a list of block hashes. Falls back to the current time.
174
244
* @param chain
0 commit comments