@@ -3,6 +3,8 @@ package rpc
3
3
import (
4
4
"context"
5
5
"encoding/json"
6
+ "fmt"
7
+ "reflect"
6
8
"time"
7
9
8
10
"github.com/NethermindEth/juno/blockchain"
@@ -77,8 +79,8 @@ func (h *Handler) SubscribeEvents(ctx context.Context, fromAddr *felt.Felt, keys
77
79
// old blocks.
78
80
var wg conc.WaitGroup
79
81
wg .Go (func () {
80
- // Stores the transaction hash of the event
81
- eventsPreviouslySent := make (map [felt. Felt ] struct {} )
82
+ // Stores the transaction hash -> number of events
83
+ eventsPreviouslySent := make ([] * blockchain. FilteredEvent , 0 )
82
84
83
85
for {
84
86
select {
@@ -98,10 +100,34 @@ func (h *Handler) SubscribeEvents(ctx context.Context, fromAddr *felt.Felt, keys
98
100
return
99
101
}
100
102
101
- for _ , t := range b .Transactions {
102
- delete (eventsPreviouslySent , * t .Hash ())
103
+ fmt .Println ("size of slice before" , len (eventsPreviouslySent ))
104
+ for i , r := range b .Receipts {
105
+ for _ , e := range r .Events {
106
+ fe := & blockchain.FilteredEvent {
107
+ Event : e ,
108
+ BlockNumber : header .Number ,
109
+ BlockHash : header .Hash ,
110
+ TransactionHash : b .Transactions [i ].Hash (),
111
+ }
112
+
113
+ var deleteI int
114
+ var duplicateFound bool
115
+ for j , dupE := range eventsPreviouslySent {
116
+ if reflect .DeepEqual (fe , dupE ) {
117
+ duplicateFound = true
118
+ deleteI = j
119
+ break
120
+ }
121
+ }
122
+
123
+ if duplicateFound {
124
+ eventsPreviouslySent = append (eventsPreviouslySent [:deleteI ], eventsPreviouslySent [deleteI + 1 :]... )
125
+ }
126
+ }
103
127
}
128
+ fmt .Println ("size of slice after" , len (eventsPreviouslySent ))
104
129
case pending := <- pendingSub .Recv ():
130
+ fmt .Println ("Found pending block" , len (pending .Transactions ))
105
131
h .processEvents (subscriptionCtx , w , id , pending .Number , pending .Number , fromAddr , keys , eventsPreviouslySent )
106
132
}
107
133
}
@@ -271,7 +297,7 @@ func (h *Handler) SubscribeTransactionStatus(ctx context.Context, txHash felt.Fe
271
297
}
272
298
273
299
func (h * Handler ) processEvents (ctx context.Context , w jsonrpc.Conn , id , from , to uint64 , fromAddr * felt.Felt ,
274
- keys [][]felt.Felt , eventsPreviouslySent map [felt. Felt ] struct {} ,
300
+ keys [][]felt.Felt , eventsPreviouslySent [] * blockchain. FilteredEvent ,
275
301
) {
276
302
filter , err := h .bcReader .EventFilter (fromAddr , keys )
277
303
if err != nil {
@@ -314,18 +340,19 @@ func (h *Handler) processEvents(ctx context.Context, w jsonrpc.Conn, id, from, t
314
340
}
315
341
316
342
func sendEvents (ctx context.Context , w jsonrpc.Conn , events []* blockchain.FilteredEvent ,
317
- eventsPreviouslySent map [felt. Felt ] struct {} , id uint64 ,
343
+ eventsPreviouslySent [] * blockchain. FilteredEvent , id uint64 ,
318
344
) error {
345
+ eventsLoop:
319
346
for _ , event := range events {
320
347
select {
321
348
case <- ctx .Done ():
322
349
return ctx .Err ()
323
350
default :
324
- if eventsPreviouslySent != nil {
325
- if _ , exists := eventsPreviouslySent [ * event . TransactionHash ]; exists {
326
- continue
351
+ for _ , prevEvent := range eventsPreviouslySent {
352
+ if reflect . DeepEqual ( event , prevEvent ) {
353
+ continue eventsLoop
327
354
}
328
- eventsPreviouslySent [ * event . TransactionHash ] = struct {}{}
355
+ eventsPreviouslySent = append ( eventsPreviouslySent , event )
329
356
}
330
357
331
358
emittedEvent := & EmittedEvent {
0 commit comments