11package delegators
22
33import (
4+ "context"
45 "errors"
56 "fmt"
67 "io"
@@ -12,10 +13,13 @@ import (
1213 "github.com/teambenny/goetl/etldata"
1314 "github.com/teambenny/goetl/etlutil"
1415
16+ "cosmossdk.io/collections"
1517 "cosmossdk.io/log"
18+ "cosmossdk.io/math"
1619
1720 sdk "github.com/cosmos/cosmos-sdk/types"
1821 "github.com/cosmos/cosmos-sdk/types/bech32"
22+ bankkeeper "github.com/cosmos/cosmos-sdk/x/bank/keeper"
1923 stakingtypes "github.com/cosmos/cosmos-sdk/x/staking/types"
2024)
2125
@@ -62,13 +66,7 @@ func (r *delegatorsReader) ProcessData(_ etldata.Payload, outputChan chan etldat
6266
6367 configureSdk (prefix )
6468
65- lastSeenAddr := ""
66- keepers .Bank .IterateAllBalances (ctx , func (addr sdk.AccAddress , _ sdk.Coin ) (stop bool ) {
67- if addr .String () == lastSeenAddr {
68- return false
69- }
70- lastSeenAddr = addr .String ()
71-
69+ err = IterateAllAddresses (ctx , keepers .Bank , func (addr sdk.AccAddress ) (stop bool ) {
7270 for _ , val := range validators {
7371 valAddr , err := sdk .ValAddressFromBech32 (val .OperatorAddress )
7472 etlutil .KillPipelineIfErr (err , killChan )
@@ -105,6 +103,10 @@ func (r *delegatorsReader) ProcessData(_ etldata.Payload, outputChan chan etldat
105103
106104 return false
107105 })
106+ if err != nil {
107+ r .logger .Error (err .Error ())
108+ killChan <- err
109+ }
108110}
109111
110112func (r * delegatorsReader ) Finish (_ chan etldata.Payload , killChan chan error ) {
@@ -121,6 +123,23 @@ func (r *delegatorsReader) String() string {
121123 return "DelegatorsReader"
122124}
123125
126+ // IterateAllAddresses iterates over all the accounts that are provided to a callback.
127+ // If true is returned from the callback, iteration is halted.
128+ func IterateAllAddresses (ctx context.Context , bankKeeper bankkeeper.BaseKeeper , cb func (sdk.AccAddress ) bool ) error {
129+ lastSeenAddr := ""
130+ err := bankKeeper .Balances .Walk (ctx , nil , func (key collections.Pair [sdk.AccAddress , string ], _ math.Int ) (stop bool , err error ) {
131+ addr := key .K1 ()
132+ if addr .String () == lastSeenAddr {
133+ return false , nil
134+ }
135+ lastSeenAddr = addr .String ()
136+
137+ return cb (addr ), nil
138+ })
139+
140+ return err
141+ }
142+
124143func convertAndEncodeMust (hrp string , bech string ) string {
125144 _ , bytes , err := bech32 .DecodeAndConvert (bech )
126145 if err != nil {
0 commit comments