diff --git a/cmd/chains.go b/cmd/chains.go index dee2ad811..d6f52631c 100644 --- a/cmd/chains.go +++ b/cmd/chains.go @@ -13,6 +13,7 @@ import ( "github.com/cosmos/relayer/v2/relayer" "github.com/cosmos/relayer/v2/relayer/chains/cosmos" + "github.com/cosmos/relayer/v2/relayer/provider" "github.com/spf13/cobra" registry "github.com/strangelove-ventures/lens/client/chain_registry" "go.uber.org/zap" @@ -476,6 +477,7 @@ func addChainsFromRegistry(ctx context.Context, a *appState, chains []string) er OutputFormat: chainConfig.OutputFormat, SignModeStr: chainConfig.SignModeStr, ExtraCodecs: chainConfig.ExtraCodecs, + Broadcast: provider.BroadcastModeBatch, Slip44: chainConfig.Slip44, } diff --git a/interchaintest/relayer.go b/interchaintest/relayer.go index 6c9be1189..cb93ca948 100644 --- a/interchaintest/relayer.go +++ b/interchaintest/relayer.go @@ -14,6 +14,7 @@ import ( "github.com/cosmos/relayer/v2/internal/relayertest" "github.com/cosmos/relayer/v2/relayer" "github.com/cosmos/relayer/v2/relayer/chains/cosmos" + "github.com/cosmos/relayer/v2/relayer/provider" interchaintestcosmos "github.com/strangelove-ventures/interchaintest/v7/chain/cosmos" "github.com/strangelove-ventures/interchaintest/v7/ibc" "github.com/stretchr/testify/require" @@ -77,6 +78,7 @@ func (r *Relayer) AddChainConfiguration(ctx context.Context, _ ibc.RelayerExecRe Timeout: "10s", OutputFormat: "json", SignModeStr: "direct", + Broadcast: provider.BroadcastModeBatch, }, }) diff --git a/relayer/chains/cosmos/log.go b/relayer/chains/cosmos/log.go index 2a85a1d94..75e43a982 100644 --- a/relayer/chains/cosmos/log.go +++ b/relayer/chains/cosmos/log.go @@ -64,7 +64,10 @@ func (cc *CosmosProvider) LogFailedTx(res *provider.RelayerTxResponse, err error } } - if res.Code != 0 && res.Data != "" { + if res.Code != 0 { + if sdkErr := cc.sdkError(res.Codespace, res.Code); err != nil { + fields = append(fields, zap.NamedError("sdk_error", sdkErr)) + } fields = append(fields, zap.Object("response", res)) cc.log.Warn( "Sent transaction but received failure response", diff --git a/relayer/chains/cosmos/provider.go b/relayer/chains/cosmos/provider.go index 188df9003..6dc37f460 100644 --- a/relayer/chains/cosmos/provider.go +++ b/relayer/chains/cosmos/provider.go @@ -31,21 +31,22 @@ var ( const tendermintEncodingThreshold = "v0.37.0-alpha" type CosmosProviderConfig struct { - Key string `json:"key" yaml:"key"` - ChainName string `json:"-" yaml:"-"` - ChainID string `json:"chain-id" yaml:"chain-id"` - RPCAddr string `json:"rpc-addr" yaml:"rpc-addr"` - AccountPrefix string `json:"account-prefix" yaml:"account-prefix"` - KeyringBackend string `json:"keyring-backend" yaml:"keyring-backend"` - GasAdjustment float64 `json:"gas-adjustment" yaml:"gas-adjustment"` - GasPrices string `json:"gas-prices" yaml:"gas-prices"` - MinGasAmount uint64 `json:"min-gas-amount" yaml:"min-gas-amount"` - Debug bool `json:"debug" yaml:"debug"` - Timeout string `json:"timeout" yaml:"timeout"` - OutputFormat string `json:"output-format" yaml:"output-format"` - SignModeStr string `json:"sign-mode" yaml:"sign-mode"` - ExtraCodecs []string `json:"extra-codecs" yaml:"extra-codecs"` - Slip44 int `json:"coin-type" yaml:"coin-type"` + Key string `json:"key" yaml:"key"` + ChainName string `json:"-" yaml:"-"` + ChainID string `json:"chain-id" yaml:"chain-id"` + RPCAddr string `json:"rpc-addr" yaml:"rpc-addr"` + AccountPrefix string `json:"account-prefix" yaml:"account-prefix"` + KeyringBackend string `json:"keyring-backend" yaml:"keyring-backend"` + GasAdjustment float64 `json:"gas-adjustment" yaml:"gas-adjustment"` + GasPrices string `json:"gas-prices" yaml:"gas-prices"` + MinGasAmount uint64 `json:"min-gas-amount" yaml:"min-gas-amount"` + Debug bool `json:"debug" yaml:"debug"` + Timeout string `json:"timeout" yaml:"timeout"` + OutputFormat string `json:"output-format" yaml:"output-format"` + SignModeStr string `json:"sign-mode" yaml:"sign-mode"` + ExtraCodecs []string `json:"extra-codecs" yaml:"extra-codecs"` + Slip44 int `json:"coin-type" yaml:"coin-type"` + Broadcast provider.BroadcastMode `json:"broadcast-mode" yaml:"broadcast-mode"` } func (pc CosmosProviderConfig) Validate() error { @@ -55,6 +56,10 @@ func (pc CosmosProviderConfig) Validate() error { return nil } +func (pc CosmosProviderConfig) BroadcastMode() provider.BroadcastMode { + return pc.Broadcast +} + // NewProvider validates the CosmosProviderConfig, instantiates a ChainClient and then instantiates a CosmosProvider func (pc CosmosProviderConfig) NewProvider(log *zap.Logger, homepath string, debug bool, chainName string) (provider.ChainProvider, error) { if err := pc.Validate(); err != nil { @@ -72,6 +77,10 @@ func (pc CosmosProviderConfig) NewProvider(log *zap.Logger, homepath string, deb } pc.ChainName = chainName + if pc.Broadcast == "" { + pc.Broadcast = provider.BroadcastModeBatch + } + return &CosmosProvider{ log: log, ChainClient: *cc, @@ -136,10 +145,14 @@ func (h CosmosIBCHeader) ConsensusState() ibcexported.ConsensusState { return &tmclient.ConsensusState{ Timestamp: h.SignedHeader.Time, Root: commitmenttypes.NewMerkleRoot(h.SignedHeader.AppHash), - NextValidatorsHash: h.ValidatorSet.Hash(), + NextValidatorsHash: h.SignedHeader.NextValidatorsHash, } } +func (h CosmosIBCHeader) NextValidatorsHash() []byte { + return h.SignedHeader.NextValidatorsHash +} + func (cc *CosmosProvider) ProviderConfig() provider.ProviderConfig { return cc.PCfg } @@ -207,19 +220,9 @@ func (cc *CosmosProvider) Address() (string, error) { } func (cc *CosmosProvider) TrustingPeriod(ctx context.Context) (time.Duration, error) { - res, err := cc.QueryStakingParams(ctx) - - var unbondingTime time.Duration + unbondingTime, err := cc.QueryUnbondingPeriod(ctx) if err != nil { - // Attempt ICS query - consumerUnbondingPeriod, consumerErr := cc.queryConsumerUnbondingPeriod(ctx) - if consumerErr != nil { - return 0, - fmt.Errorf("failed to query unbonding period as both standard and consumer chain: %s: %w", err.Error(), consumerErr) - } - unbondingTime = consumerUnbondingPeriod - } else { - unbondingTime = res.UnbondingTime + return 0, err } // We want the trusting period to be 85% of the unbonding time. diff --git a/relayer/chains/cosmos/query.go b/relayer/chains/cosmos/query.go index 19769f638..da7ac1607 100644 --- a/relayer/chains/cosmos/query.go +++ b/relayer/chains/cosmos/query.go @@ -178,24 +178,24 @@ func (cc *CosmosProvider) QueryBalanceWithAddress(ctx context.Context, address s return coins, nil } -func (cc *CosmosProvider) queryConsumerUnbondingPeriod(ctx context.Context) (time.Duration, error) { +func (cc *CosmosProvider) querySubspaceUnbondingPeriod(subspace string, ctx context.Context) (time.Duration, error) { queryClient := proposal.NewQueryClient(cc) - params := proposal.QueryParamsRequest{Subspace: "ccvconsumer", Key: "UnbondingPeriod"} + params := proposal.QueryParamsRequest{Subspace: subspace, Key: "UnbondingPeriod"} resICS, err := queryClient.Params(ctx, ¶ms) if err != nil { - return 0, fmt.Errorf("failed to make ccvconsumer params request: %w", err) + return 0, fmt.Errorf("failed to make %s params request: %w", subspace, err) } if resICS.Param.Value == "" { - return 0, fmt.Errorf("ccvconsumer unbonding period is empty") + return 0, fmt.Errorf("%s unbonding period is empty", subspace) } unbondingPeriod, err := strconv.ParseUint(strings.ReplaceAll(resICS.Param.Value, `"`, ""), 10, 64) if err != nil { - return 0, fmt.Errorf("failed to parse unbonding period from ccvconsumer param: %w", err) + return 0, fmt.Errorf("failed to parse unbonding period from %s param: %w", subspace, err) } return time.Duration(unbondingPeriod), nil @@ -203,22 +203,26 @@ func (cc *CosmosProvider) queryConsumerUnbondingPeriod(ctx context.Context) (tim // QueryUnbondingPeriod returns the unbonding period of the chain func (cc *CosmosProvider) QueryUnbondingPeriod(ctx context.Context) (time.Duration, error) { - req := stakingtypes.QueryParamsRequest{} - queryClient := stakingtypes.NewQueryClient(cc) - - res, err := queryClient.Params(ctx, &req) - if err != nil { - // Attempt ICS query - consumerUnbondingPeriod, consumerErr := cc.queryConsumerUnbondingPeriod(ctx) - if consumerErr != nil { - return 0, - fmt.Errorf("failed to query unbonding period as both standard and consumer chain: %s: %w", err.Error(), consumerErr) - } + res, err := cc.QueryStakingParams(ctx) + if err == nil { + return res.UnbondingTime, nil + } + // Attempt ICS query + consumerUnbondingPeriod, consumerErr := cc.querySubspaceUnbondingPeriod("ccvconsumer", ctx) + if consumerErr == nil { return consumerUnbondingPeriod, nil } - return res.Params.UnbondingTime, nil + poaUnbondingPeriod, poaErr := cc.querySubspaceUnbondingPeriod("poa", ctx) + if poaErr == nil { + return poaUnbondingPeriod, nil + } + + return 0, fmt.Errorf( + "failed to query unbonding period as both standard, consumer, and poa chain: %s, %s, %s", + err.Error(), consumerErr.Error(), poaErr.Error(), + ) } // QueryTendermintProof performs an ABCI query with the given key and returns diff --git a/relayer/chains/cosmos/tx.go b/relayer/chains/cosmos/tx.go index aa5fa303f..a33c1b9eb 100644 --- a/relayer/chains/cosmos/tx.go +++ b/relayer/chains/cosmos/tx.go @@ -26,19 +26,23 @@ import ( tmclient "github.com/cosmos/ibc-go/v7/modules/light-clients/07-tendermint" strideicqtypes "github.com/cosmos/relayer/v2/relayer/chains/cosmos/stride" "github.com/cosmos/relayer/v2/relayer/provider" + lensclient "github.com/strangelove-ventures/lens/client" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/light" + coretypes "github.com/tendermint/tendermint/rpc/core/types" tmtypes "github.com/tendermint/tendermint/types" "go.uber.org/zap" ) // Variables used for retries var ( - rtyAttNum = uint(5) - rtyAtt = retry.Attempts(rtyAttNum) - rtyDel = retry.Delay(time.Millisecond * 400) - rtyErr = retry.LastErrorOnly(true) - numRegex = regexp.MustCompile("[0-9]+") + rtyAttNum = uint(5) + rtyAtt = retry.Attempts(rtyAttNum) + rtyDel = retry.Delay(time.Millisecond * 400) + rtyErr = retry.LastErrorOnly(true) + numRegex = regexp.MustCompile("[0-9]+") + defaultBroadcastWaitTimeout = 10 * time.Minute + errUnknown = "unknown" ) // Default IBC settings @@ -188,11 +192,12 @@ func (cc *CosmosProvider) SendMessages(ctx context.Context, msgs []provider.Rela } rlyResp := &provider.RelayerTxResponse{ - Height: resp.Height, - TxHash: resp.TxHash, - Code: resp.Code, - Data: resp.Data, - Events: parseEventsFromTxResponse(resp), + Height: resp.Height, + TxHash: resp.TxHash, + Codespace: resp.Codespace, + Code: resp.Code, + Data: resp.Data, + Events: parseEventsFromTxResponse(resp), } // transaction was executed, log the success or failure using the tx response code @@ -210,6 +215,192 @@ func (cc *CosmosProvider) SendMessages(ctx context.Context, msgs []provider.Rela return rlyResp, true, nil } +// SendMessagesToMempool simulates and broadcasts a transaction with the given msgs and memo. +// This method will return once the transaction has entered the mempool. +// In an async goroutine, will wait for the tx to be included in the block unless asyncCtx exits. +// If there is no error broadcasting, the asyncCallback will be called with success/failure of the wait for block inclusion. +func (cc *CosmosProvider) SendMessagesToMempool( + ctx context.Context, + msgs []provider.RelayerMessage, + memo string, + + asyncCtx context.Context, + asyncCallback func(*provider.RelayerTxResponse, error), +) error { + // Guard against account sequence number mismatch errors by locking for the specific wallet for + // the account sequence query all the way through the transaction broadcast success/fail. + cc.txMu.Lock() + defer cc.txMu.Unlock() + + txBytes, sequence, fees, err := cc.buildMessages(ctx, msgs, memo) + if err != nil { + // Account sequence mismatch errors can happen on the simulated transaction also. + if strings.Contains(err.Error(), sdkerrors.ErrWrongSequence.Error()) { + cc.handleAccountSequenceMismatchError(err) + } + + return err + } + + if err := cc.broadcastTx(ctx, txBytes, msgs, fees, asyncCtx, defaultBroadcastWaitTimeout, asyncCallback); err != nil { + if strings.Contains(err.Error(), sdkerrors.ErrWrongSequence.Error()) { + cc.handleAccountSequenceMismatchError(err) + } + + return err + } + + // we had a successful tx broadcast with this sequence, so update it to the next + cc.updateNextAccountSequence(sequence + 1) + + return nil +} + +// sdkError will return the Cosmos SDK registered error for a given codespace/code combo if registered, otherwise nil. +func (cc *CosmosProvider) sdkError(codespace string, code uint32) error { + // ABCIError will return an error other than "unknown" if syncRes.Code is a registered error in syncRes.Codespace + // This catches all of the sdk errors https://github.com/cosmos/cosmos-sdk/blob/f10f5e5974d2ecbf9efc05bc0bfe1c99fdeed4b6/types/errors/errors.go + err := errors.Unwrap(sdkerrors.ABCIError(codespace, code, "error broadcasting transaction")) + if err.Error() != errUnknown { + return err + } + return nil +} + +// broadcastTx broadcasts a transaction with the given raw bytes and then, in an async goroutine, waits for the tx to be included in the block. +// The wait will end after either the asyncTimeout has run out or the asyncCtx exits. +// If there is no error broadcasting, the asyncCallback will be called with success/failure of the wait for block inclusion. +func (cc *CosmosProvider) broadcastTx( + ctx context.Context, // context for tx broadcast + tx []byte, // raw tx to be broadcasted + msgs []provider.RelayerMessage, // used for logging only + fees sdk.Coins, // used for metrics + + asyncCtx context.Context, // context for async wait for block inclusion after successful tx broadcast + asyncTimeout time.Duration, // timeout for waiting for block inclusion + asyncCallback func(*provider.RelayerTxResponse, error), // callback for success/fail of the wait for block inclusion +) error { + res, err := cc.ChainClient.RPCClient.BroadcastTxSync(ctx, tx) + if err != nil { + if res == nil { + // There are some cases where BroadcastTxSync will return an error but the associated + // ResultBroadcastTx will be nil. + return err + } + rlyResp := &provider.RelayerTxResponse{ + TxHash: res.Hash.String(), + Codespace: res.Codespace, + Code: res.Code, + Data: res.Data.String(), + } + cc.LogFailedTx(rlyResp, err, msgs) + return err + } + + cc.UpdateFeesSpent(cc.ChainId(), cc.Key(), fees) + + // TODO: maybe we need to check if the node has tx indexing enabled? + // if not, we need to find a new way to block until inclusion in a block + + go cc.waitForTx(asyncCtx, res.Hash, msgs, asyncTimeout, asyncCallback) + + return nil +} + +// waitForTx waits for a transaction to be included in a block, logs success/fail, then invokes callback. +// This is intended to be called as an async goroutine. +func (cc *CosmosProvider) waitForTx( + ctx context.Context, + txHash []byte, + msgs []provider.RelayerMessage, // used for logging only + waitTimeout time.Duration, + callback func(*provider.RelayerTxResponse, error), +) { + res, err := cc.waitForBlockInclusion(ctx, txHash, waitTimeout) + if err != nil { + cc.log.Error("Failed to wait for block inclusion", zap.Error(err)) + if callback != nil { + callback(nil, err) + } + return + } + + rlyResp := &provider.RelayerTxResponse{ + Height: res.Height, + TxHash: res.TxHash, + Codespace: res.Codespace, + Code: res.Code, + Data: res.Data, + Events: parseEventsFromTxResponse(res), + } + + // transaction was executed, log the success or failure using the tx response code + // NOTE: error is nil, logic should use the returned error to determine if the + // transaction was successfully executed. + + if res.Code != 0 { + // Check for any registered SDK errors + err := cc.sdkError(res.Codespace, res.Code) + if err == nil { + err = fmt.Errorf("transaction failed to execute") + } + if callback != nil { + callback(nil, err) + } + cc.LogFailedTx(rlyResp, nil, msgs) + return + } + + if callback != nil { + callback(rlyResp, nil) + } + cc.LogSuccessTx(res, msgs) +} + +// waitForBlockInclusion will wait for a transaction to be included in a block, up to waitTimeout or context cancellation. +func (cc *CosmosProvider) waitForBlockInclusion( + ctx context.Context, + txHash []byte, + waitTimeout time.Duration, +) (*sdk.TxResponse, error) { + exitAfter := time.After(waitTimeout) + for { + select { + case <-exitAfter: + return nil, fmt.Errorf("timed out after: %d; %w", waitTimeout, lensclient.ErrTimeoutAfterWaitingForTxBroadcast) + // This fixed poll is fine because it's only for logging and updating prometheus metrics currently. + case <-time.After(time.Millisecond * 100): + res, err := cc.ChainClient.RPCClient.Tx(ctx, txHash, false) + if err == nil { + return cc.mkTxResult(res) + } + if strings.Contains(err.Error(), "transaction indexing is disabled") { + return nil, fmt.Errorf("cannot determine success/failure of tx because transaction indexing is disabled on rpc url") + } + case <-ctx.Done(): + return nil, ctx.Err() + } + } +} + +// mkTxResult decodes a tendermint transaction into an SDK TxResponse. +func (cc *CosmosProvider) mkTxResult(resTx *coretypes.ResultTx) (*sdk.TxResponse, error) { + txbz, err := cc.ChainClient.Codec.TxConfig.TxDecoder()(resTx.Tx) + if err != nil { + return nil, err + } + p, ok := txbz.(intoAny) + if !ok { + return nil, fmt.Errorf("expecting a type implementing intoAny, got: %T", txbz) + } + any := p.AsAny() + return sdk.NewResponseResultTx(resTx, any, ""), nil +} + +type intoAny interface { + AsAny() *codectypes.Any +} + func parseEventsFromTxResponse(resp *sdk.TxResponse) []provider.RelayerEvent { var events []provider.RelayerEvent @@ -243,6 +434,13 @@ func (cc *CosmosProvider) buildMessages(ctx context.Context, msgs []provider.Rel txf = txf.WithMemo(memo) } + sequence := txf.Sequence() + cc.updateNextAccountSequence(sequence) + if sequence < cc.nextAccountSeq { + sequence = cc.nextAccountSeq + txf = txf.WithSequence(sequence) + } + // TODO: Make this work with new CalculateGas method // TODO: This is related to GRPC client stuff? // https://github.com/cosmos/cosmos-sdk/blob/5725659684fc93790a63981c653feee33ecf3225/client/tx/tx.go#L297 @@ -296,7 +494,7 @@ func (cc *CosmosProvider) buildMessages(ctx context.Context, msgs []provider.Rel return nil, 0, sdk.Coins{}, err } - return txBytes, txf.Sequence(), fees, nil + return txBytes, sequence, fees, nil } // handleAccountSequenceMismatchError will parse the error string, e.g.: diff --git a/relayer/processor/path_end_runtime.go b/relayer/processor/path_end_runtime.go index beb33c0e0..e48ad6944 100644 --- a/relayer/processor/path_end_runtime.go +++ b/relayer/processor/path_end_runtime.go @@ -2,6 +2,7 @@ package processor import ( "context" + "sync" "time" conntypes "github.com/cosmos/ibc-go/v7/modules/core/03-connection/types" @@ -46,6 +47,9 @@ type pathEndRuntime struct { // inSync indicates whether queries are in sync with latest height of the chain. inSync bool + lastClientUpdateHeight uint64 + lastClientUpdateHeightMu sync.Mutex + metrics *PrometheusMetrics } @@ -287,8 +291,11 @@ func (pathEnd *pathEndRuntime) shouldTerminate(ibcMessagesCache IBCMessagesCache } func (pathEnd *pathEndRuntime) mergeCacheData(ctx context.Context, cancel func(), d ChainProcessorCacheData, counterpartyChainID string, counterpartyInSync bool, messageLifecycle MessageLifecycle, counterParty *pathEndRuntime) { - pathEnd.inSync = d.InSync + pathEnd.lastClientUpdateHeightMu.Lock() pathEnd.latestBlock = d.LatestBlock + pathEnd.lastClientUpdateHeightMu.Unlock() + + pathEnd.inSync = d.InSync pathEnd.latestHeader = d.LatestHeader pathEnd.clientState = d.ClientState if d.ClientState.ConsensusHeight != pathEnd.clientState.ConsensusHeight { @@ -625,7 +632,7 @@ func (pathEnd *pathEndRuntime) shouldSendClientICQMessage(message provider.Clien return true } -func (pathEnd *pathEndRuntime) trackProcessingPacketMessage(t packetMessageToTrack) { +func (pathEnd *pathEndRuntime) trackProcessingPacketMessage(t packetMessageToTrack) uint64 { eventType := t.msg.eventType sequence := t.msg.info.Sequence channelKey, err := t.msg.channelKey() @@ -636,7 +643,7 @@ func (pathEnd *pathEndRuntime) trackProcessingPacketMessage(t packetMessageToTra zap.Uint64("sequence", sequence), zap.Error(err), ) - return + return 0 } msgProcessCache, ok := pathEnd.packetProcessing[channelKey] if !ok { @@ -658,11 +665,13 @@ func (pathEnd *pathEndRuntime) trackProcessingPacketMessage(t packetMessageToTra channelProcessingCache[sequence] = processingMessage{ lastProcessedHeight: pathEnd.latestBlock.Height, retryCount: retryCount, - assembled: t.assembled, + assembled: t.m != nil, } + + return retryCount } -func (pathEnd *pathEndRuntime) trackProcessingConnectionMessage(t connectionMessageToTrack) { +func (pathEnd *pathEndRuntime) trackProcessingConnectionMessage(t connectionMessageToTrack) uint64 { eventType := t.msg.eventType connectionKey := connectionInfoConnectionKey(t.msg.info).Counterparty() msgProcessCache, ok := pathEnd.connProcessing[eventType] @@ -680,11 +689,13 @@ func (pathEnd *pathEndRuntime) trackProcessingConnectionMessage(t connectionMess msgProcessCache[connectionKey] = processingMessage{ lastProcessedHeight: pathEnd.latestBlock.Height, retryCount: retryCount, - assembled: t.assembled, + assembled: t.m != nil, } + + return retryCount } -func (pathEnd *pathEndRuntime) trackProcessingChannelMessage(t channelMessageToTrack) { +func (pathEnd *pathEndRuntime) trackProcessingChannelMessage(t channelMessageToTrack) uint64 { eventType := t.msg.eventType channelKey := channelInfoChannelKey(t.msg.info).Counterparty() msgProcessCache, ok := pathEnd.channelProcessing[eventType] @@ -702,11 +713,13 @@ func (pathEnd *pathEndRuntime) trackProcessingChannelMessage(t channelMessageToT msgProcessCache[channelKey] = processingMessage{ lastProcessedHeight: pathEnd.latestBlock.Height, retryCount: retryCount, - assembled: t.assembled, + assembled: t.m != nil, } + + return retryCount } -func (pathEnd *pathEndRuntime) trackProcessingClientICQMessage(t clientICQMessageToTrack) { +func (pathEnd *pathEndRuntime) trackProcessingClientICQMessage(t clientICQMessageToTrack) uint64 { retryCount := uint64(0) queryID := t.msg.info.QueryID @@ -718,6 +731,8 @@ func (pathEnd *pathEndRuntime) trackProcessingClientICQMessage(t clientICQMessag pathEnd.clientICQProcessing[queryID] = processingMessage{ lastProcessedHeight: pathEnd.latestBlock.Height, retryCount: retryCount, - assembled: t.assembled, + assembled: t.m != nil, } + + return retryCount } diff --git a/relayer/processor/path_processor_internal.go b/relayer/processor/path_processor_internal.go index c443f2004..5ad31c962 100644 --- a/relayer/processor/path_processor_internal.go +++ b/relayer/processor/path_processor_internal.go @@ -1,7 +1,9 @@ package processor import ( + "bytes" "context" + "encoding/base64" "errors" "fmt" "sort" @@ -12,6 +14,7 @@ import ( chantypes "github.com/cosmos/ibc-go/v7/modules/core/04-channel/types" "github.com/cosmos/relayer/v2/relayer/provider" "go.uber.org/zap" + "go.uber.org/zap/zapcore" "golang.org/x/sync/errgroup" ) @@ -428,6 +431,10 @@ func (pp *PathProcessor) assembleMsgUpdateClient(ctx context.Context, src, dst * clientID := dst.info.ClientID clientConsensusHeight := dst.clientState.ConsensusHeight trustedConsensusHeight := dst.clientTrustedState.ClientState.ConsensusHeight + var trustedNextValidatorsHash []byte + if dst.clientTrustedState.IBCHeader != nil { + trustedNextValidatorsHash = dst.clientTrustedState.IBCHeader.NextValidatorsHash() + } // If the client state height is not equal to the client trusted state height and the client state height is // the latest block, we cannot send a MsgUpdateClient until another block is observed on the counterparty. @@ -454,9 +461,10 @@ func (pp *PathProcessor) assembleMsgUpdateClient(ctx context.Context, src, dst * IBCHeader: header, } trustedConsensusHeight = clientConsensusHeight + trustedNextValidatorsHash = header.NextValidatorsHash() } - if src.latestHeader.Height() == trustedConsensusHeight.RevisionHeight { + if src.latestHeader.Height() == trustedConsensusHeight.RevisionHeight && !bytes.Equal(src.latestHeader.NextValidatorsHash(), trustedNextValidatorsHash) { return nil, fmt.Errorf("latest header height is equal to the client trusted height: %d, "+ "need to wait for next block's header before we can assemble and send a new MsgUpdateClient", trustedConsensusHeight.RevisionHeight) @@ -485,12 +493,22 @@ func (pp *PathProcessor) updateClientTrustedState(src *pathEndRuntime, dst *path // need to assemble new trusted state ibcHeader, ok := dst.ibcHeaderCache[src.clientState.ConsensusHeight.RevisionHeight+1] if !ok { + if ibcHeaderCurrent, ok := dst.ibcHeaderCache[src.clientState.ConsensusHeight.RevisionHeight]; ok && + dst.clientTrustedState.IBCHeader != nil && + bytes.Equal(dst.clientTrustedState.IBCHeader.NextValidatorsHash(), ibcHeaderCurrent.NextValidatorsHash()) { + src.clientTrustedState = provider.ClientTrustedState{ + ClientState: src.clientState, + IBCHeader: ibcHeaderCurrent, + } + return + } pp.log.Debug("No cached IBC header for client trusted height", zap.String("chain_id", src.info.ChainID), zap.String("client_id", src.info.ClientID), zap.Uint64("height", src.clientState.ConsensusHeight.RevisionHeight+1), ) return + } src.clientTrustedState = provider.ClientTrustedState{ ClientState: src.clientState, @@ -740,12 +758,13 @@ func (pp *PathProcessor) assembleMessage( switch m := msg.(type) { case packetIBCMessage: message, err = pp.assemblePacketMessage(ctx, m, src, dst) - om.pktMsgs[i] = packetMessageToTrack{ - msg: m, - assembled: err == nil, + tracker := packetMessageToTrack{ + msg: m, } if err == nil { - dst.log.Debug("Will send packet message", + tracker.m = message + om.pktMsgs[i] = tracker + dst.log.Debug("Assembled packet message", zap.String("event_type", m.eventType), zap.Uint64("sequence", m.info.Sequence), zap.String("src_channel", m.info.SourceChannel), @@ -756,24 +775,22 @@ func (pp *PathProcessor) assembleMessage( } case connectionIBCMessage: message, err = pp.assembleConnectionMessage(ctx, m, src, dst) - om.connMsgs[i] = connectionMessageToTrack{ - msg: m, - assembled: err == nil, - } + tracker := connectionMessageToTrack{msg: m} if err == nil { - dst.log.Debug("Will send connection message", + tracker.m = message + om.connMsgs[i] = tracker + dst.log.Debug("Assembled connection message", zap.String("event_type", m.eventType), zap.String("connection_id", m.info.ConnID), ) } case channelIBCMessage: message, err = pp.assembleChannelMessage(ctx, m, src, dst) - om.chanMsgs[i] = channelMessageToTrack{ - msg: m, - assembled: err == nil, - } + tracker := channelMessageToTrack{msg: m} if err == nil { - dst.log.Debug("Will send channel message", + tracker.m = message + om.chanMsgs[i] = tracker + dst.log.Debug("Assembled channel message", zap.String("event_type", m.eventType), zap.String("channel_id", m.info.ChannelID), zap.String("port_id", m.info.PortID), @@ -781,12 +798,11 @@ func (pp *PathProcessor) assembleMessage( } case clientICQMessage: message, err = pp.assembleClientICQMessage(ctx, m, src, dst) - om.clientICQMsgs[i] = clientICQMessageToTrack{ - msg: m, - assembled: err == nil, - } + tracker := clientICQMessageToTrack{msg: m} if err == nil { - dst.log.Debug("Will send ICQ message", + tracker.m = message + om.clientICQMsgs[i] = tracker + dst.log.Debug("Assembled ICQ message", zap.String("type", m.info.Type), zap.String("query_id", string(m.info.QueryID)), ) @@ -796,7 +812,6 @@ func (pp *PathProcessor) assembleMessage( pp.log.Error("Error assembling channel message", zap.Error(err)) return } - om.Append(message) } func (pp *PathProcessor) assembleAndSendMessages( @@ -817,10 +832,16 @@ func (pp *PathProcessor) assembleAndSendMessages( consensusHeightTime = dst.clientState.ConsensusTime } clientUpdateThresholdMs := pp.clientUpdateThresholdTime.Milliseconds() - if (float64(dst.clientState.TrustingPeriod.Milliseconds())*2/3 < float64(time.Since(consensusHeightTime).Milliseconds())) || - (clientUpdateThresholdMs > 0 && time.Since(consensusHeightTime).Milliseconds() > clientUpdateThresholdMs) { + + dst.lastClientUpdateHeightMu.Lock() + enoughBlocksPassed := (dst.latestBlock.Height - blocksToRetrySendAfter) > dst.lastClientUpdateHeight + dst.lastClientUpdateHeightMu.Unlock() + + if enoughBlocksPassed && + ((float64(dst.clientState.TrustingPeriod.Milliseconds())*2/3 < float64(time.Since(consensusHeightTime).Milliseconds())) || + (clientUpdateThresholdMs > 0 && time.Since(consensusHeightTime).Milliseconds() > clientUpdateThresholdMs)) { needsClientUpdate = true - pp.log.Info("Client close to expiration", + pp.log.Info("Client update threshold condition met", zap.String("chain_id:", dst.info.ChainID), zap.String("client_id:", dst.info.ClientID), zap.Int64("trusting_period", dst.clientState.TrustingPeriod.Milliseconds()), @@ -831,137 +852,380 @@ func (pp *PathProcessor) assembleAndSendMessages( return nil } } - om := outgoingMessages{ - msgs: make( - []provider.RelayerMessage, - 0, - len(messages.packetMessages)+len(messages.connectionMessages)+len(messages.channelMessages)+len(messages.clientICQMessages), - ), - } + om := outgoingMessages{} msgUpdateClient, err := pp.assembleMsgUpdateClient(ctx, src, dst) if err != nil { return err } - om.Append(msgUpdateClient) + om.msgUpdateClient = msgUpdateClient // Each assembleMessage call below will make a query on the source chain, so these operations can run in parallel. var wg sync.WaitGroup - // connection messages are highest priority om.connMsgs = make([]connectionMessageToTrack, len(messages.connectionMessages)) for i, msg := range messages.connectionMessages { wg.Add(1) go pp.assembleMessage(ctx, msg, src, dst, &om, i, &wg) } + om.chanMsgs = make([]channelMessageToTrack, len(messages.channelMessages)) + for i, msg := range messages.channelMessages { + wg.Add(1) + go pp.assembleMessage(ctx, msg, src, dst, &om, i, &wg) + } + + om.clientICQMsgs = make([]clientICQMessageToTrack, len(messages.clientICQMessages)) + for i, msg := range messages.clientICQMessages { + wg.Add(1) + go pp.assembleMessage(ctx, msg, src, dst, &om, i, &wg) + } + + om.pktMsgs = make([]packetMessageToTrack, len(messages.packetMessages)) + for i, msg := range messages.packetMessages { + wg.Add(1) + go pp.assembleMessage(ctx, msg, src, dst, &om, i, &wg) + } + wg.Wait() - if len(om.msgs) == 1 { - om.chanMsgs = make([]channelMessageToTrack, len(messages.channelMessages)) - // only assemble and send channel handshake messages if there are no conn handshake messages - // this prioritizes connection handshake messages, useful if a connection handshake needs to occur before a channel handshake - for i, msg := range messages.channelMessages { - wg.Add(1) - go pp.assembleMessage(ctx, msg, src, dst, &om, i, &wg) + assembled := 0 + for _, m := range om.connMsgs { + if m.m != nil { + assembled++ + } + } + for _, m := range om.chanMsgs { + if m.m != nil { + assembled++ } + } + for _, m := range om.clientICQMsgs { + if m.m != nil { + assembled++ + } + } + for _, m := range om.pktMsgs { + if m.m != nil { + assembled++ + } + } + + broadcastBatch := dst.chainProvider.ProviderConfig().BroadcastMode() == provider.BroadcastModeBatch + batchMsgs := []provider.RelayerMessage{om.msgUpdateClient} + + for _, t := range om.connMsgs { + retries := dst.trackProcessingConnectionMessage(t) + if t.m == nil { + continue + } + if broadcastBatch && retries == 0 { + batchMsgs = append(batchMsgs, t.m) + continue + } + go pp.sendConnectionMessage(ctx, src, dst, om.msgUpdateClient, t) + } - wg.Wait() + for _, t := range om.chanMsgs { + retries := dst.trackProcessingChannelMessage(t) + if t.m == nil { + continue + } + if broadcastBatch && retries == 0 { + batchMsgs = append(batchMsgs, t.m) + continue + } + go pp.sendChannelMessage(ctx, src, dst, om.msgUpdateClient, t) } - if len(om.msgs) == 1 { - om.clientICQMsgs = make([]clientICQMessageToTrack, len(messages.clientICQMessages)) - // only assemble and send ICQ messages if there are no conn or chan handshake messages - for i, msg := range messages.clientICQMessages { - wg.Add(1) - go pp.assembleMessage(ctx, msg, src, dst, &om, i, &wg) + for _, t := range om.clientICQMsgs { + retries := dst.trackProcessingClientICQMessage(t) + if t.m == nil { + continue + } + if broadcastBatch && retries == 0 { + batchMsgs = append(batchMsgs, t.m) + continue } + go pp.sendClientICQMessage(ctx, src, dst, om.msgUpdateClient, t) - wg.Wait() } - if len(om.msgs) == 1 { - om.pktMsgs = make([]packetMessageToTrack, len(messages.packetMessages)) - // only assemble and send packet messages if there are no handshake messages - for i, msg := range messages.packetMessages { - wg.Add(1) - go pp.assembleMessage(ctx, msg, src, dst, &om, i, &wg) + for _, t := range om.pktMsgs { + retries := dst.trackProcessingPacketMessage(t) + if t.m == nil { + continue } + if broadcastBatch && retries == 0 { + batchMsgs = append(batchMsgs, t.m) + continue + } + go pp.sendPacketMessage(ctx, src, dst, om.msgUpdateClient, t) + } - wg.Wait() + if len(batchMsgs) > 1 { + go pp.sendBatchMessages(ctx, src, dst, batchMsgs, om.pktMsgs) } - if len(om.msgs) == 1 && !needsClientUpdate { + if assembled == 0 { + if needsClientUpdate { + go pp.sendClientUpdate(ctx, src, dst, om.msgUpdateClient) + return nil + } // only msgUpdateClient, don't need to send return errors.New("all messages failed to assemble") } - for _, m := range om.connMsgs { - dst.trackProcessingConnectionMessage(m) - } + return nil +} - for _, m := range om.chanMsgs { - dst.trackProcessingChannelMessage(m) - } +func (pp *PathProcessor) sendClientUpdate( + ctx context.Context, + src, dst *pathEndRuntime, + msgUpdateClient provider.RelayerMessage, +) { + broadcastCtx, cancel := context.WithTimeout(ctx, messageSendTimeout) + defer cancel() - for _, m := range om.clientICQMsgs { - dst.trackProcessingClientICQMessage(m) - } + dst.log.Debug("Will relay client update") - for _, m := range om.pktMsgs { - dst.trackProcessingPacketMessage(m) + dst.lastClientUpdateHeightMu.Lock() + dst.lastClientUpdateHeight = dst.latestBlock.Height + dst.lastClientUpdateHeightMu.Unlock() + + err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, []provider.RelayerMessage{msgUpdateClient}, pp.memo, ctx, nil) + if err != nil { + pp.log.Error("Error sending client update message", + zap.String("src_chain_id", src.info.ChainID), + zap.String("dst_chain_id", dst.info.ChainID), + zap.String("src_client_id", src.info.ClientID), + zap.String("dst_client_id", dst.info.ClientID), + zap.Error(err), + ) + return } + dst.log.Debug("Client update broadcast completed") +} + +func (pp *PathProcessor) sendBatchMessages( + ctx context.Context, + src, dst *pathEndRuntime, + msgs []provider.RelayerMessage, + pktMsgs []packetMessageToTrack, +) { + broadcastCtx, cancel := context.WithTimeout(ctx, messageSendTimeout) + defer cancel() - go pp.sendMessages(ctx, src, dst, &om, pp.memo) + dst.log.Debug("Will relay batch of messages", zap.Int("count", len(msgs))) - return nil + err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, msgs, pp.memo, ctx, func(rtr *provider.RelayerTxResponse, err error) { + // only increment metrics counts for successful packets + if err != nil || pp.metrics == nil { + return + } + for _, tracker := range pktMsgs { + var channel, port string + if tracker.msg.eventType == chantypes.EventTypeRecvPacket { + channel = tracker.msg.info.DestChannel + port = tracker.msg.info.DestPort + } else { + channel = tracker.msg.info.SourceChannel + port = tracker.msg.info.SourcePort + } + pp.metrics.IncPacketsRelayed(dst.info.PathName, dst.info.ChainID, channel, port, tracker.msg.eventType) + } + }) + if err != nil { + errFields := []zapcore.Field{ + zap.String("src_chain_id", src.info.ChainID), + zap.String("dst_chain_id", dst.info.ChainID), + zap.String("src_client_id", src.info.ClientID), + zap.String("dst_client_id", dst.info.ClientID), + zap.Error(err), + } + if errors.Is(err, chantypes.ErrRedundantTx) { + pp.log.Debug("Packet(s) already handled by another relayer", errFields...) + return + } + pp.log.Error("Error sending batch of messages", errFields...) + return + } + dst.log.Debug("Batch messages broadcast completed") } -func (pp *PathProcessor) sendMessages(ctx context.Context, src, dst *pathEndRuntime, om *outgoingMessages, memo string) { - ctx, cancel := context.WithTimeout(ctx, messageSendTimeout) +func (pp *PathProcessor) sendPacketMessage( + ctx context.Context, + src, dst *pathEndRuntime, + msgUpdateClient provider.RelayerMessage, + tracker packetMessageToTrack, +) { + msgs := []provider.RelayerMessage{msgUpdateClient, tracker.m} + + broadcastCtx, cancel := context.WithTimeout(ctx, messageSendTimeout) defer cancel() - _, txSuccess, err := dst.chainProvider.SendMessages(ctx, om.msgs, pp.memo) + packetFields := []zapcore.Field{ + zap.String("event_type", tracker.msg.eventType), + zap.String("src_port", tracker.msg.info.SourcePort), + zap.String("src_channel", tracker.msg.info.SourceChannel), + zap.String("dst_port", tracker.msg.info.DestPort), + zap.String("dst_channel", tracker.msg.info.DestChannel), + zap.Uint64("sequence", tracker.msg.info.Sequence), + zap.String("timeout_height", fmt.Sprintf("%d-%d", tracker.msg.info.TimeoutHeight.RevisionNumber, tracker.msg.info.TimeoutHeight.RevisionHeight)), + zap.Uint64("timeout_timestamp", tracker.msg.info.TimeoutTimestamp), + zap.String("data", base64.StdEncoding.EncodeToString(tracker.msg.info.Data)), + zap.String("ack", base64.StdEncoding.EncodeToString(tracker.msg.info.Ack)), + } + + dst.log.Debug("Will relay packet message", packetFields...) + + callback := func(rtr *provider.RelayerTxResponse, err error) { + // only increment metrics counts for successful packets + if err != nil || pp.metrics == nil { + return + } + var channel, port string + if tracker.msg.eventType == chantypes.EventTypeRecvPacket { + channel = tracker.msg.info.DestChannel + port = tracker.msg.info.DestPort + } else { + channel = tracker.msg.info.SourceChannel + port = tracker.msg.info.SourcePort + } + pp.metrics.IncPacketsRelayed(dst.info.PathName, dst.info.ChainID, channel, port, tracker.msg.eventType) + } + + err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, msgs, pp.memo, ctx, callback) if err != nil { + errFields := append([]zapcore.Field{ + zap.String("src_chain_id", src.info.ChainID), + zap.String("dst_chain_id", dst.info.ChainID), + zap.String("src_client_id", src.info.ClientID), + zap.String("dst_client_id", dst.info.ClientID), + }, packetFields...) + errFields = append(errFields, zap.Error(err)) + if errors.Is(err, chantypes.ErrRedundantTx) { - pp.log.Debug("Packet(s) already handled by another relayer", - zap.String("src_chain_id", src.info.ChainID), - zap.String("dst_chain_id", dst.info.ChainID), - zap.String("src_client_id", src.info.ClientID), - zap.String("dst_client_id", dst.info.ClientID), - zap.Object("messages", om), - zap.Error(err), - ) + pp.log.Debug("Packet already handled by another relayer", errFields...) return } - pp.log.Error("Error sending messages", + pp.log.Error("Error sending packet message", errFields...) + return + } + dst.log.Debug("Packet message broadcast completed", packetFields...) +} + +func (pp *PathProcessor) sendChannelMessage( + ctx context.Context, + src, dst *pathEndRuntime, + msgUpdateClient provider.RelayerMessage, + tracker channelMessageToTrack, +) { + msgs := []provider.RelayerMessage{msgUpdateClient, tracker.m} + + broadcastCtx, cancel := context.WithTimeout(ctx, messageSendTimeout) + defer cancel() + + channelFields := []zapcore.Field{ + zap.String("event_type", tracker.msg.eventType), + zap.String("port_id", tracker.msg.info.PortID), + zap.String("channel_id", tracker.msg.info.ChannelID), + zap.String("counterparty_port_id", tracker.msg.info.CounterpartyPortID), + zap.String("counterparty_channel_id", tracker.msg.info.CounterpartyChannelID), + zap.String("connection_id", tracker.msg.info.ConnID), + zap.String("counterparty_connection_id", tracker.msg.info.CounterpartyConnID), + zap.String("order", tracker.msg.info.Order.String()), + zap.String("version", tracker.msg.info.Version), + } + + dst.log.Debug("Will relay channel message", channelFields...) + + err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, msgs, pp.memo, ctx, nil) + if err != nil { + errFields := []zapcore.Field{ zap.String("src_chain_id", src.info.ChainID), zap.String("dst_chain_id", dst.info.ChainID), zap.String("src_client_id", src.info.ClientID), zap.String("dst_client_id", dst.info.ClientID), - zap.Object("messages", om), - zap.Error(err), - ) + } + errFields = append(errFields, channelFields...) + errFields = append(errFields, zap.Error(err)) + pp.log.Error("Error sending channel handshake message", errFields...) return } - if !txSuccess { - dst.log.Error("Error sending messages, transaction was not successful") - return + dst.log.Debug("Channel handshake message broadcast completed", channelFields...) +} + +func (pp *PathProcessor) sendConnectionMessage( + ctx context.Context, + src, dst *pathEndRuntime, + msgUpdateClient provider.RelayerMessage, + tracker connectionMessageToTrack, +) { + msgs := []provider.RelayerMessage{msgUpdateClient, tracker.m} + + broadcastCtx, cancel := context.WithTimeout(ctx, messageSendTimeout) + defer cancel() + + connFields := []zapcore.Field{ + zap.String("event_type", tracker.msg.eventType), + zap.String("client_id", tracker.msg.info.ClientID), + zap.String("counterparty_client_id", tracker.msg.info.CounterpartyClientID), + zap.String("connection_id", tracker.msg.info.ConnID), + zap.String("counterparty_connection_id", tracker.msg.info.CounterpartyConnID), + zap.String("counterparty_commitment_prefix", tracker.msg.info.CounterpartyCommitmentPrefix.String()), } - if pp.metrics == nil { + dst.log.Debug("Will relay connection message", connFields...) + + err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, msgs, pp.memo, ctx, nil) + if err != nil { + errFields := []zapcore.Field{ + zap.String("src_chain_id", src.info.ChainID), + zap.String("dst_chain_id", dst.info.ChainID), + zap.String("src_client_id", src.info.ClientID), + zap.String("dst_client_id", dst.info.ClientID), + } + errFields = append(errFields, connFields...) + errFields = append(errFields, zap.Error(err)) + pp.log.Error("Error sending connection handshake message", errFields...) return } - for _, m := range om.pktMsgs { - var channel, port string - if m.msg.eventType == chantypes.EventTypeRecvPacket { - channel = m.msg.info.DestChannel - port = m.msg.info.DestPort - } else { - channel = m.msg.info.SourceChannel - port = m.msg.info.SourcePort + dst.log.Debug("Connection handshake message broadcast completed", connFields...) +} + +func (pp *PathProcessor) sendClientICQMessage( + ctx context.Context, + src, dst *pathEndRuntime, + msgUpdateClient provider.RelayerMessage, + tracker clientICQMessageToTrack, +) { + msgs := []provider.RelayerMessage{msgUpdateClient, tracker.m} + + broadcastCtx, cancel := context.WithTimeout(ctx, messageSendTimeout) + defer cancel() + + icqFields := []zapcore.Field{ + zap.String("type", tracker.msg.info.Type), + zap.String("query_id", string(tracker.msg.info.QueryID)), + zap.String("request", string(tracker.msg.info.Request)), + } + + dst.log.Debug("Will relay Stride ICQ message", icqFields...) + + err := dst.chainProvider.SendMessagesToMempool(broadcastCtx, msgs, pp.memo, ctx, nil) + if err != nil { + errFields := []zapcore.Field{ + zap.String("src_chain_id", src.info.ChainID), + zap.String("dst_chain_id", dst.info.ChainID), + zap.String("src_client_id", src.info.ClientID), + zap.String("dst_client_id", dst.info.ClientID), } - pp.metrics.IncPacketsRelayed(dst.info.PathName, dst.info.ChainID, channel, port, m.msg.eventType) + errFields = append(errFields, icqFields...) + errFields = append(errFields, zap.Error(err)) + pp.log.Error("Error sending client ICQ message", errFields...) + return } + dst.log.Debug("Stride ICQ message broadcast completed", icqFields...) } func (pp *PathProcessor) assemblePacketMessage( diff --git a/relayer/processor/types_internal.go b/relayer/processor/types_internal.go index a949a6aad..573a47f26 100644 --- a/relayer/processor/types_internal.go +++ b/relayer/processor/types_internal.go @@ -3,7 +3,6 @@ package processor import ( "strconv" "strings" - "sync" chantypes "github.com/cosmos/ibc-go/v7/modules/core/04-channel/types" "github.com/cosmos/relayer/v2/relayer/provider" @@ -207,12 +206,11 @@ func channelInfoChannelKey(c provider.ChannelInfo) ChannelKey { // outgoingMessages is a slice of relayer messages that can be // appended to concurrently. type outgoingMessages struct { - mu sync.Mutex - msgs []provider.RelayerMessage - pktMsgs []packetMessageToTrack - connMsgs []connectionMessageToTrack - chanMsgs []channelMessageToTrack - clientICQMsgs []clientICQMessageToTrack + msgUpdateClient provider.RelayerMessage + pktMsgs []packetMessageToTrack + connMsgs []connectionMessageToTrack + chanMsgs []channelMessageToTrack + clientICQMsgs []clientICQMessageToTrack } // MarshalLogObject satisfies the zapcore.ObjectMarshaler interface @@ -247,33 +245,24 @@ func (om *outgoingMessages) MarshalLogObject(enc zapcore.ObjectEncoder) error { return nil } -// Append acquires a lock on om's mutex and then appends msg. -// When there are no more possible concurrent calls to Append, -// it is safe to directly access om.msgs. -func (om *outgoingMessages) Append(msg provider.RelayerMessage) { - om.mu.Lock() - defer om.mu.Unlock() - om.msgs = append(om.msgs, msg) -} - type packetMessageToTrack struct { - msg packetIBCMessage - assembled bool + msg packetIBCMessage + m provider.RelayerMessage } type connectionMessageToTrack struct { - msg connectionIBCMessage - assembled bool + msg connectionIBCMessage + m provider.RelayerMessage } type channelMessageToTrack struct { - msg channelIBCMessage - assembled bool + msg channelIBCMessage + m provider.RelayerMessage } type clientICQMessageToTrack struct { - msg clientICQMessage - assembled bool + msg clientICQMessage + m provider.RelayerMessage } // orderFromString parses a string into a channel order byte. diff --git a/relayer/processor/types_test.go b/relayer/processor/types_test.go index 28dc697c7..39224dc74 100644 --- a/relayer/processor/types_test.go +++ b/relayer/processor/types_test.go @@ -12,6 +12,7 @@ type mockIBCHeader struct{} func (h mockIBCHeader) Height() uint64 { return 0 } func (h mockIBCHeader) ConsensusState() ibcexported.ConsensusState { return nil } +func (h mockIBCHeader) NextValidatorsHash() []byte { return nil } func TestIBCHeaderCachePrune(t *testing.T) { cache := make(processor.IBCHeaderCache) diff --git a/relayer/provider/provider.go b/relayer/provider/provider.go index 3abf0235d..a49d19a90 100644 --- a/relayer/provider/provider.go +++ b/relayer/provider/provider.go @@ -18,9 +18,17 @@ import ( "go.uber.org/zap/zapcore" ) +type BroadcastMode string + +const ( + BroadcastModeSingle BroadcastMode = "single" + BroadcastModeBatch BroadcastMode = "batch" +) + type ProviderConfig interface { NewProvider(log *zap.Logger, homepath string, debug bool, chainName string) (ChainProvider, error) Validate() error + BroadcastMode() BroadcastMode } type RelayerMessage interface { @@ -29,11 +37,12 @@ type RelayerMessage interface { } type RelayerTxResponse struct { - Height int64 - TxHash string - Code uint32 - Data string - Events []RelayerEvent + Height int64 + TxHash string + Codespace string + Code uint32 + Data string + Events []RelayerEvent } type RelayerEvent struct { @@ -49,7 +58,7 @@ type LatestBlock struct { type IBCHeader interface { Height() uint64 ConsensusState() ibcexported.ConsensusState - // require conversion implementation for third party chains + NextValidatorsHash() []byte } // ClientState holds the current state of a client from a single chain's perspective @@ -193,6 +202,7 @@ func (es loggableEvents) MarshalLogArray(enc zapcore.ArrayEncoder) error { func (r RelayerTxResponse) MarshalLogObject(enc zapcore.ObjectEncoder) error { enc.AddInt64("height", r.Height) enc.AddString("tx_hash", r.TxHash) + enc.AddString("codespace", r.Codespace) enc.AddUint32("code", r.Code) enc.AddString("data", r.Data) enc.AddArray("events", loggableEvents(r.Events)) @@ -367,6 +377,14 @@ type ChainProvider interface { SendMessage(ctx context.Context, msg RelayerMessage, memo string) (*RelayerTxResponse, bool, error) SendMessages(ctx context.Context, msgs []RelayerMessage, memo string) (*RelayerTxResponse, bool, error) + SendMessagesToMempool( + ctx context.Context, + msgs []RelayerMessage, + memo string, + + asyncCtx context.Context, + asyncCallback func(*RelayerTxResponse, error), + ) error ChainName() string ChainId() string