@@ -4,22 +4,28 @@ package ownership
4
4
5
5
import (
6
6
"context"
7
+ "slices"
7
8
"sync"
9
+ "time"
8
10
9
11
"go.uber.org/zap"
10
12
11
13
"github.com/ethereum/go-ethereum/common"
12
- "github.com/ethereum/go-ethereum/event"
13
14
15
+ "github.com/status-im/go-wallet-sdk/pkg/balance/multistandardfetcher"
16
+ "github.com/status-im/go-wallet-sdk/pkg/contracts/erc1155"
17
+ "github.com/status-im/go-wallet-sdk/pkg/contracts/erc721"
18
+ "github.com/status-im/go-wallet-sdk/pkg/eventlog"
14
19
gocommon "github.com/status-im/status-go/common"
15
20
"github.com/status-im/status-go/crypto/types"
16
21
"github.com/status-im/status-go/params"
17
22
"github.com/status-im/status-go/pkg/pubsub"
18
23
"github.com/status-im/status-go/rpc/network"
19
24
"github.com/status-im/status-go/services/accounts/accountsevent"
20
25
walletCommon "github.com/status-im/status-go/services/wallet/common"
26
+ "github.com/status-im/status-go/services/wallet/multistandardbalance"
21
27
"github.com/status-im/status-go/services/wallet/thirdparty"
22
- "github.com/status-im/status-go/services/wallet/transfer "
28
+ "github.com/status-im/status-go/services/wallet/transferdetector "
23
29
"github.com/status-im/status-go/services/wallet/walletevent"
24
30
)
25
31
@@ -30,8 +36,6 @@ const (
30
36
type loaderPerChainID = map [walletCommon.ChainID ]* PeriodicalLoader
31
37
type loaderPerAddressAndChainID = map [common.Address ]loaderPerChainID
32
38
33
- type TransferCb func (common.Address , walletCommon.ChainID , []transfer.Transfer )
34
-
35
39
type AccountsProvider interface {
36
40
GetWalletAddresses () ([]types.Address , error )
37
41
}
@@ -41,12 +45,18 @@ type NetworksProvider interface {
41
45
GetPublisher () * pubsub.Publisher
42
46
}
43
47
48
+ type BlockChainStateProvider interface {
49
+ GetEstimatedBlockTime (ctx context.Context , chainID uint64 , blockNumber uint64 ) (time.Time , error )
50
+ }
51
+
44
52
type Controller struct {
45
- fetcher CollectibleOwnershipFetcher
46
- storage CollectibleOwnershipStorage
47
- walletFeed * event.Feed
48
- accountsProvider AccountsProvider
49
- accountsPublisher * pubsub.Publisher
53
+ fetcher CollectibleOwnershipFetcher
54
+ storage CollectibleOwnershipStorage
55
+ accountsProvider AccountsProvider
56
+ accountsPublisher * pubsub.Publisher
57
+ multistandardBalancePublisher * pubsub.Publisher
58
+ transferDetectorPublisher * pubsub.Publisher
59
+ blockChainStateProvider BlockChainStateProvider
50
60
51
61
networksProvider NetworksProvider
52
62
@@ -66,24 +76,28 @@ type Controller struct {
66
76
67
77
func NewController (
68
78
storage CollectibleOwnershipStorage ,
69
- walletFeed * event.Feed ,
70
79
accountsProvider AccountsProvider ,
71
80
accountsPublisher * pubsub.Publisher ,
72
81
networksProvider NetworksProvider ,
82
+ multistandardBalancePublisher * pubsub.Publisher ,
83
+ transferDetectorPublisher * pubsub.Publisher ,
84
+ blockChainStateProvider BlockChainStateProvider ,
73
85
fetcher CollectibleOwnershipFetcher ,
74
86
collectiblesPublisher * pubsub.Publisher ,
75
87
logger * zap.Logger ,
76
88
) * Controller {
77
89
return & Controller {
78
- fetcher : fetcher ,
79
- storage : storage ,
80
- walletFeed : walletFeed ,
81
- accountsProvider : accountsProvider ,
82
- accountsPublisher : accountsPublisher ,
83
- networksProvider : networksProvider ,
84
- periodicalLoaders : make (loaderPerAddressAndChainID ),
85
- collectiblesPublisher : collectiblesPublisher ,
86
- logger : logger .Named ("OwnershipController" ),
90
+ fetcher : fetcher ,
91
+ storage : storage ,
92
+ accountsProvider : accountsProvider ,
93
+ accountsPublisher : accountsPublisher ,
94
+ networksProvider : networksProvider ,
95
+ multistandardBalancePublisher : multistandardBalancePublisher ,
96
+ transferDetectorPublisher : transferDetectorPublisher ,
97
+ blockChainStateProvider : blockChainStateProvider ,
98
+ periodicalLoaders : make (loaderPerAddressAndChainID ),
99
+ collectiblesPublisher : collectiblesPublisher ,
100
+ logger : logger .Named ("OwnershipController" ),
87
101
}
88
102
}
89
103
@@ -105,11 +119,14 @@ func (c *Controller) Start() {
105
119
// Setup collectibles fetch when a new account gets added
106
120
c .startAccountsWatcher ()
107
121
108
- // Setup collectibles fetch when relevant activity is detected
109
- c .startWalletEventsWatcher ()
110
-
111
122
// Setup collectibles fetch when active networks change
112
123
c .startNetworkEventsWatcher ()
124
+
125
+ // Start balance change watcher
126
+ c .startBalanceChangeWatcher ()
127
+
128
+ // Start transfer detection watcher
129
+ c .startTransferDetectionWatcher ()
113
130
}
114
131
115
132
func (c * Controller ) Stop () {
@@ -120,8 +137,6 @@ func (c *Controller) Stop() {
120
137
close (c .stopCh )
121
138
c .stopCh = nil
122
139
123
- c .stopWalletEventsWatcher ()
124
-
125
140
c .stopPeriodicalLoaders ()
126
141
}
127
142
@@ -237,63 +252,118 @@ func (c *Controller) startAccountsWatcher() {
237
252
}()
238
253
}
239
254
240
- func (c * Controller ) startWalletEventsWatcher () {
241
- if c .walletEventsWatcher != nil {
242
- return
243
- }
244
-
245
- if c .walletFeed == nil {
255
+ func (c * Controller ) startNetworkEventsWatcher () {
256
+ if c .networksProvider == nil {
246
257
return
247
258
}
248
259
249
- walletEventCb := func (event walletevent.Event ) {
250
- if event .Type != transfer .EventInternalERC721TransferDetected &&
251
- event .Type != transfer .EventInternalERC1155TransferDetected {
252
- return
253
- }
254
-
255
- chainID := walletCommon .ChainID (event .ChainID )
256
- for _ , account := range event .Accounts {
257
- c .refetchOwnershipIfRecentTransfer (account , chainID , event .At )
260
+ ch , unsub := pubsub .Subscribe [network.EventActiveNetworksChanged ](c .networksProvider .GetPublisher (), 10 )
261
+ go func () {
262
+ defer gocommon .LogOnPanic ()
263
+ defer unsub ()
264
+ for {
265
+ select {
266
+ case <- c .stopCh :
267
+ return
268
+ case _ , ok := <- ch :
269
+ if ! ok {
270
+ return
271
+ }
272
+ c .checkPeriodicalLoaders ()
273
+ }
258
274
}
259
- }
260
-
261
- c .walletEventsWatcher = walletevent .NewWatcher (c .walletFeed , walletEventCb )
262
-
263
- c .walletEventsWatcher .Start ()
275
+ }()
264
276
}
265
277
266
- func (c * Controller ) stopWalletEventsWatcher () {
267
- if c .walletEventsWatcher != nil {
268
- c .walletEventsWatcher .Stop ()
269
- c .walletEventsWatcher = nil
278
+ func (c * Controller ) startBalanceChangeWatcher () {
279
+ if c .multistandardBalancePublisher == nil {
280
+ return
270
281
}
282
+
283
+ ch , unsub := pubsub .Subscribe [multistandardbalance.EventBalanceFetchFinished ](c .multistandardBalancePublisher , 10 )
284
+ go func () {
285
+ defer gocommon .LogOnPanic ()
286
+ defer unsub ()
287
+ for {
288
+ select {
289
+ case <- c .stopCh :
290
+ return
291
+ case event , ok := <- ch :
292
+ if ! ok {
293
+ return
294
+ }
295
+ switch event .ResultType {
296
+ case multistandardfetcher .ResultTypeERC721 , multistandardfetcher .ResultTypeERC1155 :
297
+ if event .BalanceChanged {
298
+ c .refetchOwnershipIfRecentTx (event .Key .Account , walletCommon .ChainID (event .Key .ChainID ), event .NewState .FetchedAt )
299
+ }
300
+ }
301
+ }
302
+ }
303
+ }()
271
304
}
272
305
273
- func (c * Controller ) startNetworkEventsWatcher () {
274
- if c .networksProvider == nil {
306
+ func (c * Controller ) startTransferDetectionWatcher () {
307
+ if c .transferDetectorPublisher == nil {
275
308
return
276
309
}
277
310
278
- ch , unsub := pubsub .Subscribe [network. EventActiveNetworksChanged ](c .networksProvider . GetPublisher () , 10 )
311
+ ch , unsub := pubsub .Subscribe [transferdetector. EventTransferDetectionFinished ](c .transferDetectorPublisher , 10 )
279
312
go func () {
280
313
defer gocommon .LogOnPanic ()
281
314
defer unsub ()
282
315
for {
283
316
select {
284
317
case <- c .stopCh :
285
318
return
286
- case _ , ok := <- ch :
319
+ case msg , ok := <- ch :
287
320
if ! ok {
288
321
return
289
322
}
290
- c .checkPeriodicalLoaders ()
323
+ for _ , event := range msg .Events {
324
+ switch event .EventKey {
325
+ case eventlog .ERC721Transfer :
326
+ unpackedEvent , ok := event .Unpacked .(erc721.Erc721Transfer )
327
+ if ! ok {
328
+ c .logger .Error ("failed to unpack ERC721Transfer event" )
329
+ continue
330
+ }
331
+ c .refetchOwnershipIfRelevantEvent (msg .Accounts , unpackedEvent .From , unpackedEvent .To , msg .ChainID , unpackedEvent .Raw .BlockNumber )
332
+ case eventlog .ERC1155TransferSingle :
333
+ unpackedEvent , ok := event .Unpacked .(erc1155.Erc1155TransferSingle )
334
+ if ! ok {
335
+ c .logger .Error ("failed to unpack ERC1155TransferSingle event" )
336
+ continue
337
+ }
338
+ c .refetchOwnershipIfRelevantEvent (msg .Accounts , unpackedEvent .From , unpackedEvent .To , msg .ChainID , unpackedEvent .Raw .BlockNumber )
339
+ case eventlog .ERC1155TransferBatch :
340
+ unpackedEvent , ok := event .Unpacked .(erc1155.Erc1155TransferBatch )
341
+ if ! ok {
342
+ c .logger .Error ("failed to unpack ERC1155TransferBatch event" )
343
+ continue
344
+ }
345
+ c .refetchOwnershipIfRelevantEvent (msg .Accounts , unpackedEvent .From , unpackedEvent .To , msg .ChainID , unpackedEvent .Raw .BlockNumber )
346
+ }
347
+ }
291
348
}
292
349
}
293
350
}()
294
351
}
295
352
296
- func (c * Controller ) refetchOwnershipIfRecentTransfer (account common.Address , chainID walletCommon.ChainID , latestTxTimestamp int64 ) {
353
+ func (c * Controller ) refetchOwnershipIfRelevantEvent (checkedAccounts []common.Address , eventFrom common.Address , eventTo common.Address , chainID uint64 , blockNumber uint64 ) {
354
+ for _ , address := range []common.Address {eventFrom , eventTo } {
355
+ if slices .Contains (checkedAccounts , address ) {
356
+ blockTime , err := c .blockChainStateProvider .GetEstimatedBlockTime (context .TODO (), chainID , blockNumber )
357
+ if err != nil {
358
+ c .logger .Error ("failed to get estimated block time" , zap .Error (err ))
359
+ continue
360
+ }
361
+ c .refetchOwnershipIfRecentTx (address , walletCommon .ChainID (chainID ), blockTime .Unix ())
362
+ }
363
+ }
364
+ }
365
+
366
+ func (c * Controller ) refetchOwnershipIfRecentTx (account common.Address , chainID walletCommon.ChainID , latestTxTimestamp int64 ) {
297
367
// Check last ownership update timestamp
298
368
timestamp , err := c .storage .GetOwnershipUpdateTimestamp (account , chainID )
299
369
0 commit comments