@@ -9,33 +9,74 @@ import (
99
1010 "github.com/axone-protocol/cosmos-extractor/pkg/keeper"
1111 cmtproto "github.com/cometbft/cometbft/proto/tendermint/types"
12+ "github.com/samber/lo"
1213 "github.com/teambenny/goetl"
1314 "github.com/teambenny/goetl/etldata"
14- "github.com/teambenny/goetl/etlutil"
1515
1616 "cosmossdk.io/collections"
1717 "cosmossdk.io/log"
1818 "cosmossdk.io/math"
1919
2020 sdk "github.com/cosmos/cosmos-sdk/types"
2121 bankkeeper "github.com/cosmos/cosmos-sdk/x/bank/keeper"
22+ stakingkeeper "github.com/cosmos/cosmos-sdk/x/staking/keeper"
2223 stakingtypes "github.com/cosmos/cosmos-sdk/x/staking/types"
2324)
2425
26+ type ReaderOption func (* delegatorsReader ) error
27+
28+ func WithChainName (chainName string ) ReaderOption {
29+ return func (r * delegatorsReader ) error {
30+ r .chainName = chainName
31+ return nil
32+ }
33+ }
34+
35+ func WithLogger (logger log.Logger ) ReaderOption {
36+ return func (r * delegatorsReader ) error {
37+ r .logger = logger
38+ return nil
39+ }
40+ }
41+
42+ func WithMinSharesFilter (minShares math.LegacyDec ) ReaderOption {
43+ return func (r * delegatorsReader ) error {
44+ r .minSharesFilter = minShares
45+ return nil
46+ }
47+ }
48+
49+ func WithMaxSharesFilter (maxShares math.LegacyDec ) ReaderOption {
50+ return func (r * delegatorsReader ) error {
51+ r .maxSharesFilter = maxShares
52+ return nil
53+ }
54+ }
55+
2556type delegatorsReader struct {
26- chainName string
27- src string
28- logger log.Logger
29- closer io.Closer
57+ chainName string
58+ src string
59+ logger log.Logger
60+ closer io.Closer
61+ minSharesFilter math.LegacyDec
62+ maxSharesFilter math.LegacyDec
3063}
3164
3265// NewDelegatorsReader returns a new Reader that reads delegators from a blockchain data stores.
33- func NewDelegatorsReader (chainName , src string , logger log. Logger ) (goetl.Processor , error ) {
34- return & delegatorsReader {
35- chainName : chainName ,
66+ func NewDelegatorsReader (src string , options ... ReaderOption ) (goetl.Processor , error ) {
67+ r := & delegatorsReader {
68+ chainName : "mystery" ,
3669 src : src ,
37- logger : logger ,
38- }, nil
70+ logger : log .NewNopLogger (),
71+ }
72+
73+ for _ , option := range options {
74+ if err := option (r ); err != nil {
75+ return nil , err
76+ }
77+ }
78+
79+ return r , nil
3980}
4081
4182func (r * delegatorsReader ) ProcessData (_ etldata.Payload , outputChan chan etldata.Payload , killChan chan error ) {
@@ -65,22 +106,17 @@ func (r *delegatorsReader) ProcessData(_ etldata.Payload, outputChan chan etldat
65106
66107 configureSdk (prefix )
67108
68- err = IterateAllAddresses (ctx , keepers .Bank , func (addr sdk.AccAddress ) (stop bool ) {
69- for _ , val := range validators {
70- valAddr , err := sdk .ValAddressFromBech32 (val .OperatorAddress )
71- etlutil .KillPipelineIfErr (err , killChan )
72-
73- delegation , err := keepers .Staking .GetDelegation (ctx , addr , valAddr )
74- if err != nil {
75- if errors .Is (err , stakingtypes .ErrNoDelegation ) {
76- continue
77- }
109+ err = iterateAllAddresses (ctx , keepers .Bank , func (addr sdk.AccAddress ) (stop bool ) {
110+ delegations := lo .RejectMap (validators ,
111+ extractDelegations (ctx , addr , r .logger , keepers .Staking , killChan ))
112+ shares := lo .Reduce (delegations , computeShares (), math .LegacyZeroDec ())
78113
79- r . logger . Error ( err . Error ())
80- killChan <- err
81- return true
82- }
114+ if ( ! r . maxSharesFilter . IsNil () && shares . GT ( r . maxSharesFilter )) ||
115+ ( ! r . minSharesFilter . IsNil () && shares . LT ( r . minSharesFilter )) {
116+ return false
117+ }
83118
119+ for _ , delegation := range delegations {
84120 payload := Delegation {
85121 ChainName : r .chainName ,
86122 DelegatorNativeAddr : delegation .DelegatorAddress ,
@@ -122,7 +158,7 @@ func (r *delegatorsReader) String() string {
122158
123159// IterateAllAddresses iterates over all the accounts that are provided to a callback.
124160// If true is returned from the callback, iteration is halted.
125- func IterateAllAddresses (ctx context.Context , bankKeeper bankkeeper.BaseKeeper , cb func (sdk.AccAddress ) bool ) error {
161+ func iterateAllAddresses (ctx context.Context , bankKeeper bankkeeper.BaseKeeper , cb func (sdk.AccAddress ) bool ) error {
126162 lastSeenAddr := ""
127163 err := bankKeeper .Balances .Walk (ctx , nil , func (key collections.Pair [sdk.AccAddress , string ], _ math.Int ) (stop bool , err error ) {
128164 addr := key .K1 ()
@@ -137,6 +173,37 @@ func IterateAllAddresses(ctx context.Context, bankKeeper bankkeeper.BaseKeeper,
137173 return err
138174}
139175
176+ func extractDelegations (
177+ ctx context.Context , address sdk.AccAddress , logger log.Logger , stakingKeeper * stakingkeeper.Keeper , killChan chan error ,
178+ ) func (item stakingtypes.Validator , index int ) (stakingtypes.Delegation , bool ) {
179+ return func (item stakingtypes.Validator , _ int ) (stakingtypes.Delegation , bool ) {
180+ valAddr , err := sdk .ValAddressFromBech32 (item .OperatorAddress )
181+ if err != nil {
182+ logger .Error (err .Error ())
183+ killChan <- err
184+ return stakingtypes.Delegation {}, true
185+ }
186+
187+ delegation , err := stakingKeeper .GetDelegation (ctx , address , valAddr )
188+ if err != nil {
189+ if errors .Is (err , stakingtypes .ErrNoDelegation ) {
190+ return stakingtypes.Delegation {}, true
191+ }
192+
193+ logger .Error (err .Error ())
194+ killChan <- err
195+ return stakingtypes.Delegation {}, true
196+ }
197+ return delegation , false
198+ }
199+ }
200+
201+ func computeShares () func (acc math.LegacyDec , delegation stakingtypes.Delegation , _ int ) math.LegacyDec {
202+ return func (acc math.LegacyDec , delegation stakingtypes.Delegation , _ int ) math.LegacyDec {
203+ return acc .Add (delegation .Shares )
204+ }
205+ }
206+
140207func guessPrefixFromValoper (valoper string ) (string , error ) {
141208 if idx := strings .Index (valoper , "valoper" ); idx != - 1 {
142209 return valoper [:idx ], nil
0 commit comments