Skip to content

Commit bd4386a

Browse files
committed
feat: replace old transfer detection algorithm with multistandardbalance and transferdetector
1 parent 923a38e commit bd4386a

33 files changed

+672
-8791
lines changed

services/wallet/activity/service_test.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"github.com/status-im/status-go/services/wallet/thirdparty"
1919
mock_token "github.com/status-im/status-go/services/wallet/token/mock/token"
2020
tokenTypes "github.com/status-im/status-go/services/wallet/token/types"
21-
"github.com/status-im/status-go/services/wallet/transfer"
2221
"github.com/status-im/status-go/services/wallet/walletevent"
2322
"github.com/status-im/status-go/t/helpers"
2423
"github.com/status-im/status-go/transactions"
@@ -108,13 +107,6 @@ func setupTransactions(t *testing.T, state testState, txCount int, testTxs []tra
108107
allAddresses = append(allAddresses, p.From, p.To)
109108
}
110109

111-
txs, fromTrs, toTrs := transfer.GenerateTestTransfers(t, state.service.db, len(pendings), txCount)
112-
for i := range txs {
113-
transfer.InsertTestTransfer(t, state.service.db, txs[i].To, &txs[i])
114-
}
115-
116-
allAddresses = append(append(allAddresses, fromTrs...), toTrs...)
117-
118110
state.tokenMock.EXPECT().LookupTokenIdentity(gomock.Any(), gomock.Any(), gomock.Any()).Return(
119111
&tokenTypes.Token{
120112
ChainID: 5,

services/wallet/activity/session_service.go

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919
"github.com/status-im/status-go/services/wallet/common"
2020
"github.com/status-im/status-go/services/wallet/responses"
2121
"github.com/status-im/status-go/services/wallet/routeexecution"
22-
"github.com/status-im/status-go/services/wallet/transfer"
2322
"github.com/status-im/status-go/services/wallet/walletevent"
2423
"github.com/status-im/status-go/transactions"
2524

@@ -317,12 +316,6 @@ func (s *Service) processEvents(ctx context.Context) {
317316
Hash: payload.Hash,
318317
})
319318
debounceProcessChangesFn()
320-
case transfer.EventNewTransfers:
321-
eventCount++
322-
// No updates here, these are detected with their final state, just trigger
323-
// the detection of new entries
324-
newTxs = true
325-
debounceProcessChangesFn()
326319
case routeexecution.EventRouteExecutionTransactionSent:
327320
sentTxs, ok := event.EventParams.(*responses.RouterSentTransactions)
328321
if ok && sentTxs != nil {

services/wallet/collectibles/contract_type_db.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,28 @@ import (
1010
"github.com/status-im/status-go/sqlite"
1111
)
1212

13+
type ContractTypeDB struct {
14+
db *sql.DB
15+
}
16+
17+
func NewContractTypeDB(sqlDb *sql.DB) *ContractTypeDB {
18+
return &ContractTypeDB{
19+
db: sqlDb,
20+
}
21+
}
22+
23+
func (o *ContractTypeDB) GetContractTypes(ids []thirdparty.ContractID) (map[thirdparty.ContractID]w_common.ContractType, error) {
24+
ret := make(map[thirdparty.ContractID]w_common.ContractType)
25+
var err error
26+
for _, id := range ids {
27+
ret[id], err = readContractType(o.db, id)
28+
if err != nil {
29+
return nil, err
30+
}
31+
}
32+
return ret, nil
33+
}
34+
1335
func upsertContractType(creator sqlite.StatementCreator, id thirdparty.ContractID, contractType w_common.ContractType) error {
1436
if contractType == w_common.ContractTypeUnknown {
1537
return nil

services/wallet/collectibles/ownership/controller.go

Lines changed: 124 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -4,22 +4,28 @@ package ownership
44

55
import (
66
"context"
7+
"slices"
78
"sync"
9+
"time"
810

911
"go.uber.org/zap"
1012

1113
"github.com/ethereum/go-ethereum/common"
12-
"github.com/ethereum/go-ethereum/event"
1314

15+
"github.com/status-im/go-wallet-sdk/pkg/balance/multistandardfetcher"
16+
"github.com/status-im/go-wallet-sdk/pkg/contracts/erc1155"
17+
"github.com/status-im/go-wallet-sdk/pkg/contracts/erc721"
18+
"github.com/status-im/go-wallet-sdk/pkg/eventlog"
1419
gocommon "github.com/status-im/status-go/common"
1520
"github.com/status-im/status-go/crypto/types"
1621
"github.com/status-im/status-go/params"
1722
"github.com/status-im/status-go/pkg/pubsub"
1823
"github.com/status-im/status-go/rpc/network"
1924
"github.com/status-im/status-go/services/accounts/accountsevent"
2025
walletCommon "github.com/status-im/status-go/services/wallet/common"
26+
"github.com/status-im/status-go/services/wallet/multistandardbalance"
2127
"github.com/status-im/status-go/services/wallet/thirdparty"
22-
"github.com/status-im/status-go/services/wallet/transfer"
28+
"github.com/status-im/status-go/services/wallet/transferdetector"
2329
"github.com/status-im/status-go/services/wallet/walletevent"
2430
)
2531

@@ -30,8 +36,6 @@ const (
3036
type loaderPerChainID = map[walletCommon.ChainID]*PeriodicalLoader
3137
type loaderPerAddressAndChainID = map[common.Address]loaderPerChainID
3238

33-
type TransferCb func(common.Address, walletCommon.ChainID, []transfer.Transfer)
34-
3539
type AccountsProvider interface {
3640
GetWalletAddresses() ([]types.Address, error)
3741
}
@@ -41,12 +45,18 @@ type NetworksProvider interface {
4145
GetPublisher() *pubsub.Publisher
4246
}
4347

48+
type BlockChainStateProvider interface {
49+
GetEstimatedBlockTime(ctx context.Context, chainID uint64, blockNumber uint64) (time.Time, error)
50+
}
51+
4452
type Controller struct {
45-
fetcher CollectibleOwnershipFetcher
46-
storage CollectibleOwnershipStorage
47-
walletFeed *event.Feed
48-
accountsProvider AccountsProvider
49-
accountsPublisher *pubsub.Publisher
53+
fetcher CollectibleOwnershipFetcher
54+
storage CollectibleOwnershipStorage
55+
accountsProvider AccountsProvider
56+
accountsPublisher *pubsub.Publisher
57+
multistandardBalancePublisher *pubsub.Publisher
58+
transferDetectorPublisher *pubsub.Publisher
59+
blockChainStateProvider BlockChainStateProvider
5060

5161
networksProvider NetworksProvider
5262

@@ -66,24 +76,28 @@ type Controller struct {
6676

6777
func NewController(
6878
storage CollectibleOwnershipStorage,
69-
walletFeed *event.Feed,
7079
accountsProvider AccountsProvider,
7180
accountsPublisher *pubsub.Publisher,
7281
networksProvider NetworksProvider,
82+
multistandardBalancePublisher *pubsub.Publisher,
83+
transferDetectorPublisher *pubsub.Publisher,
84+
blockChainStateProvider BlockChainStateProvider,
7385
fetcher CollectibleOwnershipFetcher,
7486
collectiblesPublisher *pubsub.Publisher,
7587
logger *zap.Logger,
7688
) *Controller {
7789
return &Controller{
78-
fetcher: fetcher,
79-
storage: storage,
80-
walletFeed: walletFeed,
81-
accountsProvider: accountsProvider,
82-
accountsPublisher: accountsPublisher,
83-
networksProvider: networksProvider,
84-
periodicalLoaders: make(loaderPerAddressAndChainID),
85-
collectiblesPublisher: collectiblesPublisher,
86-
logger: logger.Named("OwnershipController"),
90+
fetcher: fetcher,
91+
storage: storage,
92+
accountsProvider: accountsProvider,
93+
accountsPublisher: accountsPublisher,
94+
networksProvider: networksProvider,
95+
multistandardBalancePublisher: multistandardBalancePublisher,
96+
transferDetectorPublisher: transferDetectorPublisher,
97+
blockChainStateProvider: blockChainStateProvider,
98+
periodicalLoaders: make(loaderPerAddressAndChainID),
99+
collectiblesPublisher: collectiblesPublisher,
100+
logger: logger.Named("OwnershipController"),
87101
}
88102
}
89103

@@ -105,11 +119,14 @@ func (c *Controller) Start() {
105119
// Setup collectibles fetch when a new account gets added
106120
c.startAccountsWatcher()
107121

108-
// Setup collectibles fetch when relevant activity is detected
109-
c.startWalletEventsWatcher()
110-
111122
// Setup collectibles fetch when active networks change
112123
c.startNetworkEventsWatcher()
124+
125+
// Start balance change watcher
126+
c.startBalanceChangeWatcher()
127+
128+
// Start transfer detection watcher
129+
c.startTransferDetectionWatcher()
113130
}
114131

115132
func (c *Controller) Stop() {
@@ -120,8 +137,6 @@ func (c *Controller) Stop() {
120137
close(c.stopCh)
121138
c.stopCh = nil
122139

123-
c.stopWalletEventsWatcher()
124-
125140
c.stopPeriodicalLoaders()
126141
}
127142

@@ -237,63 +252,118 @@ func (c *Controller) startAccountsWatcher() {
237252
}()
238253
}
239254

240-
func (c *Controller) startWalletEventsWatcher() {
241-
if c.walletEventsWatcher != nil {
242-
return
243-
}
244-
245-
if c.walletFeed == nil {
255+
func (c *Controller) startNetworkEventsWatcher() {
256+
if c.networksProvider == nil {
246257
return
247258
}
248259

249-
walletEventCb := func(event walletevent.Event) {
250-
if event.Type != transfer.EventInternalERC721TransferDetected &&
251-
event.Type != transfer.EventInternalERC1155TransferDetected {
252-
return
253-
}
254-
255-
chainID := walletCommon.ChainID(event.ChainID)
256-
for _, account := range event.Accounts {
257-
c.refetchOwnershipIfRecentTransfer(account, chainID, event.At)
260+
ch, unsub := pubsub.Subscribe[network.EventActiveNetworksChanged](c.networksProvider.GetPublisher(), 10)
261+
go func() {
262+
defer gocommon.LogOnPanic()
263+
defer unsub()
264+
for {
265+
select {
266+
case <-c.stopCh:
267+
return
268+
case _, ok := <-ch:
269+
if !ok {
270+
return
271+
}
272+
c.checkPeriodicalLoaders()
273+
}
258274
}
259-
}
260-
261-
c.walletEventsWatcher = walletevent.NewWatcher(c.walletFeed, walletEventCb)
262-
263-
c.walletEventsWatcher.Start()
275+
}()
264276
}
265277

266-
func (c *Controller) stopWalletEventsWatcher() {
267-
if c.walletEventsWatcher != nil {
268-
c.walletEventsWatcher.Stop()
269-
c.walletEventsWatcher = nil
278+
func (c *Controller) startBalanceChangeWatcher() {
279+
if c.multistandardBalancePublisher == nil {
280+
return
270281
}
282+
283+
ch, unsub := pubsub.Subscribe[multistandardbalance.EventBalanceFetchFinished](c.multistandardBalancePublisher, 10)
284+
go func() {
285+
defer gocommon.LogOnPanic()
286+
defer unsub()
287+
for {
288+
select {
289+
case <-c.stopCh:
290+
return
291+
case event, ok := <-ch:
292+
if !ok {
293+
return
294+
}
295+
switch event.ResultType {
296+
case multistandardfetcher.ResultTypeERC721, multistandardfetcher.ResultTypeERC1155:
297+
if event.BalanceChanged {
298+
c.refetchOwnershipIfRecentTx(event.Key.Account, walletCommon.ChainID(event.Key.ChainID), event.NewState.FetchedAt)
299+
}
300+
}
301+
}
302+
}
303+
}()
271304
}
272305

273-
func (c *Controller) startNetworkEventsWatcher() {
274-
if c.networksProvider == nil {
306+
func (c *Controller) startTransferDetectionWatcher() {
307+
if c.transferDetectorPublisher == nil {
275308
return
276309
}
277310

278-
ch, unsub := pubsub.Subscribe[network.EventActiveNetworksChanged](c.networksProvider.GetPublisher(), 10)
311+
ch, unsub := pubsub.Subscribe[transferdetector.EventTransferDetectionFinished](c.transferDetectorPublisher, 10)
279312
go func() {
280313
defer gocommon.LogOnPanic()
281314
defer unsub()
282315
for {
283316
select {
284317
case <-c.stopCh:
285318
return
286-
case _, ok := <-ch:
319+
case msg, ok := <-ch:
287320
if !ok {
288321
return
289322
}
290-
c.checkPeriodicalLoaders()
323+
for _, event := range msg.Events {
324+
switch event.EventKey {
325+
case eventlog.ERC721Transfer:
326+
unpackedEvent, ok := event.Unpacked.(erc721.Erc721Transfer)
327+
if !ok {
328+
c.logger.Error("failed to unpack ERC721Transfer event")
329+
continue
330+
}
331+
c.refetchOwnershipIfRelevantEvent(msg.Accounts, unpackedEvent.From, unpackedEvent.To, msg.ChainID, unpackedEvent.Raw.BlockNumber)
332+
case eventlog.ERC1155TransferSingle:
333+
unpackedEvent, ok := event.Unpacked.(erc1155.Erc1155TransferSingle)
334+
if !ok {
335+
c.logger.Error("failed to unpack ERC1155TransferSingle event")
336+
continue
337+
}
338+
c.refetchOwnershipIfRelevantEvent(msg.Accounts, unpackedEvent.From, unpackedEvent.To, msg.ChainID, unpackedEvent.Raw.BlockNumber)
339+
case eventlog.ERC1155TransferBatch:
340+
unpackedEvent, ok := event.Unpacked.(erc1155.Erc1155TransferBatch)
341+
if !ok {
342+
c.logger.Error("failed to unpack ERC1155TransferBatch event")
343+
continue
344+
}
345+
c.refetchOwnershipIfRelevantEvent(msg.Accounts, unpackedEvent.From, unpackedEvent.To, msg.ChainID, unpackedEvent.Raw.BlockNumber)
346+
}
347+
}
291348
}
292349
}
293350
}()
294351
}
295352

296-
func (c *Controller) refetchOwnershipIfRecentTransfer(account common.Address, chainID walletCommon.ChainID, latestTxTimestamp int64) {
353+
func (c *Controller) refetchOwnershipIfRelevantEvent(checkedAccounts []common.Address, eventFrom common.Address, eventTo common.Address, chainID uint64, blockNumber uint64) {
354+
for _, address := range []common.Address{eventFrom, eventTo} {
355+
if slices.Contains(checkedAccounts, address) {
356+
blockTime, err := c.blockChainStateProvider.GetEstimatedBlockTime(context.TODO(), chainID, blockNumber)
357+
if err != nil {
358+
c.logger.Error("failed to get estimated block time", zap.Error(err))
359+
continue
360+
}
361+
c.refetchOwnershipIfRecentTx(address, walletCommon.ChainID(chainID), blockTime.Unix())
362+
}
363+
}
364+
}
365+
366+
func (c *Controller) refetchOwnershipIfRecentTx(account common.Address, chainID walletCommon.ChainID, latestTxTimestamp int64) {
297367
// Check last ownership update timestamp
298368
timestamp, err := c.storage.GetOwnershipUpdateTimestamp(account, chainID)
299369

0 commit comments

Comments
 (0)