Skip to content

LO2302 support timeout #158

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 14 commits into
base: main
Choose a base branch
from
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ protoImage=$(DOCKER) run --user 0 --rm -v $(CURDIR):/workspace --workdir /worksp
build:
go build -o ./build/yrly .

TESTMOCKS = core/mock_chain_test.go
.PHONY: test
test:
test: $(TESTMOCKS)
go test -v ./...

proto-gen:
Expand All @@ -20,4 +21,7 @@ proto-update-deps:
@echo "Updating Protobuf dependencies"
$(DOCKER) run --user 0 --rm -v $(CURDIR)/proto:/workspace --workdir /workspace $(protoImageName) buf mod update

$(TESTMOCKS):
go generate ./...

.PHONY: proto-gen proto-update-deps
13 changes: 13 additions & 0 deletions chains/tendermint/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,19 @@ func (c *Chain) queryChannel(ctx context.Context, height int64, prove bool) (cha
return res, nil
}

// QueryNextSequenceReceive returns a info about nextSequence
func (c *Chain) QueryNextSequenceReceive(ctx core.QueryContext) (res *chantypes.QueryNextSequenceReceiveResponse, err error) {
return c.queryNextSequenceReceive(ctx.Context(), int64(ctx.Height().GetRevisionHeight()), false)
}

func (c *Chain) queryNextSequenceReceive(ctx context.Context, height int64, prove bool) (chanRes *chantypes.QueryNextSequenceReceiveResponse, err error) {
res, err := chanutils.QueryNextSequenceReceive(c.CLIContext(height).WithCmdContext(ctx), c.PathEnd.PortID, c.PathEnd.ChannelID, prove)
if err != nil {
return nil, err
}
return res, nil
}

// QueryClientConsensusState retrieves the latest consensus state for a client in state at a given height
func (c *Chain) QueryClientConsensusState(
ctx core.QueryContext, dstClientConsHeight ibcexported.Height) (*clienttypes.QueryConsensusStateResponse, error) {
Expand Down
5 changes: 5 additions & 0 deletions cmd/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,11 @@ func relayMsgsCmd(ctx *config.Context) *cobra.Command {
return err
}

sp, err = st.ProcessTimeoutPackets(cmd.Context(), c[src], c[dst], sh, sp)
if err != nil {
return err
}

msgs := core.NewRelayMsgs()

doExecuteRelaySrc := len(sp.Dst) > 0
Expand Down
4 changes: 4 additions & 0 deletions core/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func (pc *ProvableChain) SetupForRelay(ctx context.Context) error {
return nil
}

//go:generate mockgen -source=chain.go -destination=mock_chain_test.go -package core
// Chain represents a chain that supports sending transactions and querying the state
type Chain interface {
// GetAddress returns the address of relayer
Expand Down Expand Up @@ -144,6 +145,9 @@ type ICS04Querier interface {
// QueryChannel returns the channel associated with a channelID
QueryChannel(ctx QueryContext) (chanRes *chantypes.QueryChannelResponse, err error)

// QueryNextSequenceReceive returns a info about nextSequence
QueryNextSequenceReceive(ctx QueryContext) (res *chantypes.QueryNextSequenceReceiveResponse, err error)

// QueryUnreceivedPackets returns a list of unrelayed packet commitments
QueryUnreceivedPackets(ctx QueryContext, seqs []uint64) ([]uint64, error)

Expand Down
178 changes: 167 additions & 11 deletions core/naive-strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import (
"fmt"
"log/slog"
"time"
"encoding/binary"

retry "github.com/avast/retry-go"
sdk "github.com/cosmos/cosmos-sdk/types"
ibcexported "github.com/cosmos/ibc-go/v8/modules/core/exported"
chantypes "github.com/cosmos/ibc-go/v8/modules/core/04-channel/types"
host "github.com/cosmos/ibc-go/v8/modules/core/24-host"
"github.com/hyperledger-labs/yui-relayer/metrics"
Expand Down Expand Up @@ -199,6 +201,114 @@ func (st *NaiveStrategy) UnrelayedPackets(ctx context.Context, src, dst *Provabl
}, nil
}

func (st *NaiveStrategy) ProcessTimeoutPackets(ctx context.Context, src, dst *ProvableChain, sh SyncHeaders, rp *RelayPackets) (*RelayPackets, error) {
logger := GetChannelPairLogger(src, dst)
var (
srcPackets PacketInfoList
dstPackets PacketInfoList
srcLatestHeight ibcexported.Height
srcLatestTimestamp uint64
srcLatestFinalizedHeight ibcexported.Height
srcLatestFinalizedTimestamp uint64
dstLatestHeight ibcexported.Height
dstLatestTimestamp uint64
dstLatestFinalizedHeight ibcexported.Height
dstLatestFinalizedTimestamp uint64
)

if 0 < len(rp.Src) {
if h, err := dst.LatestHeight(context.TODO()); err != nil {
logger.Error("fail to get dst.LatestHeight", err)
return nil, err
} else {
dstLatestHeight = h
}

if t, err := dst.Timestamp(context.TODO(), dstLatestHeight); err != nil {
logger.Error("fail to get dst.Timestamp", err)
return nil, err
} else {
dstLatestTimestamp = uint64(t.UnixNano())
}

dstLatestFinalizedHeight = sh.GetLatestFinalizedHeader(dst.ChainID()).GetHeight()
if t, err := dst.Timestamp(context.TODO(), dstLatestFinalizedHeight); err != nil {
logger.Error("fail to get dst.Timestamp", err)
return nil, err
} else {
dstLatestFinalizedTimestamp = uint64(t.UnixNano())
}
}
if 0 < len(rp.Dst) {
if h, err := src.LatestHeight(context.TODO()); err != nil {
logger.Error("fail to get src.LatestHeight", err)
return nil, err
} else {
srcLatestHeight = h
}
if t, err := src.Timestamp(context.TODO(), srcLatestHeight); err != nil {
logger.Error("fail to get src.Timestamp", err)
return nil, err
} else {
srcLatestTimestamp = uint64(t.UnixNano())
}

srcLatestFinalizedHeight = sh.GetLatestFinalizedHeader(src.ChainID()).GetHeight()
if t, err := src.Timestamp(context.TODO(), srcLatestFinalizedHeight); err != nil {
logger.Error("fail to get src.Timestamp", err)
return nil, err
} else {
srcLatestFinalizedTimestamp = uint64(t.UnixNano())
}
}

isTimeout := func(p *PacketInfo, height ibcexported.Height, timestamp uint64) (bool) {
return (!p.TimeoutHeight.IsZero() && p.TimeoutHeight.LTE(height)) ||
(p.TimeoutTimestamp != 0 && p.TimeoutTimestamp <= timestamp)
}

for i, p := range rp.Src {
if isTimeout(p, dstLatestFinalizedHeight, dstLatestFinalizedTimestamp) {
p.TimedOut = true
if src.Path().GetOrder() == chantypes.ORDERED {
if i == 0 {
dstPackets = append(dstPackets, p)
}
break
} else {
dstPackets = append(dstPackets, p)
}
} else if isTimeout(p, dstLatestHeight, dstLatestTimestamp) {
break
} else {
p.TimedOut = false
srcPackets = append(srcPackets, p)
}
}
for i, p := range rp.Dst {
if (isTimeout(p, srcLatestFinalizedHeight, srcLatestFinalizedTimestamp)) {
p.TimedOut = true
if dst.Path().GetOrder() == chantypes.ORDERED {
if i == 0 {
srcPackets = append(srcPackets, p)
}
break
} else {
srcPackets = append(srcPackets, p)
}
} else if (isTimeout(p, srcLatestHeight, srcLatestTimestamp)) {
break
} else {
p.TimedOut = false
dstPackets = append(dstPackets, p)
}
}
return &RelayPackets{
Src: srcPackets,
Dst: dstPackets,
}, nil
}

func (st *NaiveStrategy) RelayPackets(ctx context.Context, src, dst *ProvableChain, rp *RelayPackets, sh SyncHeaders, doExecuteRelaySrc, doExecuteRelayDst bool) (*RelayMsgs, error) {
logger := GetChannelPairLogger(src, dst)
defer logger.TimeTrack(time.Now(), "RelayPackets", "num_src", len(rp.Src), "num_dst", len(rp.Dst))
Expand Down Expand Up @@ -392,20 +502,66 @@ func (st *NaiveStrategy) UnrelayedAcknowledgements(ctx context.Context, src, dst
// TODO add packet-timeout support
func collectPackets(ctx QueryContext, chain *ProvableChain, packets PacketInfoList, signer sdk.AccAddress) ([]sdk.Msg, error) {
logger := GetChannelLogger(chain)

var nextSequenceRecv uint64
if chain.Path().GetOrder() == chantypes.ORDERED {
for _, p := range packets {
if p.TimedOut {
res, err := chain.QueryNextSequenceReceive(ctx)
if err != nil {
logger.Error("failed to QueryNextSequenceReceive", err,
"height", ctx.Height(),
)
return nil, err
}
nextSequenceRecv = res.NextSequenceReceive
break
}
}
} else {
// nextSequenceRecv has no effect in unordered channel but ibc-go expect it is not zero.
nextSequenceRecv = 1
}

var msgs []sdk.Msg
for _, p := range packets {
commitment := chantypes.CommitPacket(chain.Codec(), &p.Packet)
path := host.PacketCommitmentPath(p.SourcePort, p.SourceChannel, p.Sequence)
proof, proofHeight, err := chain.ProveState(ctx, path, commitment)
if err != nil {
logger.Error("failed to ProveState", err,
"height", ctx.Height(),
"path", path,
"commitment", commitment,
)
return nil, err
var msg sdk.Msg
if p.TimedOut {
// make path of original packet's destination port and channel
var path string
var commitment []byte
if chain.Path().GetOrder() == chantypes.ORDERED {
path = host.NextSequenceRecvPath(p.SourcePort, p.SourceChannel)
commitment = make([]byte, 8)
binary.BigEndian.PutUint64(commitment[0:], nextSequenceRecv)
} else {
path = host.PacketReceiptPath(p.SourcePort, p.SourceChannel, p.Sequence)
commitment = []byte{} //ABSENSE
}
proof, proofHeight, err := chain.ProveState(ctx, path, commitment)
if err != nil {
logger.Error("failed to ProveState", err,
"height", ctx.Height(),
"path", path,
"commitment", commitment,
)
return nil, err
}
msg = chantypes.NewMsgTimeout(p.Packet, nextSequenceRecv, proof, proofHeight, signer.String())
} else {
path := host.PacketCommitmentPath(p.SourcePort, p.SourceChannel, p.Sequence)
commitment := chantypes.CommitPacket(chain.Codec(), &p.Packet)
proof, proofHeight, err := chain.ProveState(ctx, path, commitment)
if err != nil {
logger.Error("failed to ProveState", err,
"height", ctx.Height(),
"path", path,
"commitment", commitment,
)
return nil, err
}
msg = chantypes.NewMsgRecvPacket(p.Packet, proof, proofHeight, signer.String())
}
msg := chantypes.NewMsgRecvPacket(p.Packet, proof, proofHeight, signer.String())
msgs = append(msgs, msg)
}
return msgs, nil
Expand Down
6 changes: 3 additions & 3 deletions core/provers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

"github.com/cosmos/cosmos-sdk/codec"
clienttypes "github.com/cosmos/ibc-go/v8/modules/core/02-client/types"
"github.com/cosmos/ibc-go/v8/modules/core/exported"
ibcexported "github.com/cosmos/ibc-go/v8/modules/core/exported"
)

// Prover represents a prover that supports generating a commitment proof
Expand All @@ -31,7 +31,7 @@ type StateProver interface {

// ProveHostConsensusState returns an existence proof of the consensus state at `height`
// This proof would be ignored in ibc-go, but it is required to `getSelfConsensusState` of ibc-solidity.
ProveHostConsensusState(ctx QueryContext, height exported.Height, consensusState exported.ConsensusState) (proof []byte, err error)
ProveHostConsensusState(ctx QueryContext, height ibcexported.Height, consensusState ibcexported.ConsensusState) (proof []byte, err error)
}

// LightClient provides functions for creating and updating on-chain light clients on the counterparty chain
Expand All @@ -41,7 +41,7 @@ type LightClient interface {
// CreateInitialLightClientState returns a pair of ClientState and ConsensusState based on the state of the self chain at `height`.
// These states will be submitted to the counterparty chain as MsgCreateClient.
// If `height` is nil, the latest finalized height is selected automatically.
CreateInitialLightClientState(ctx context.Context, height exported.Height) (exported.ClientState, exported.ConsensusState, error)
CreateInitialLightClientState(ctx context.Context, height ibcexported.Height) (ibcexported.ClientState, ibcexported.ConsensusState, error)

// SetupHeadersForUpdate returns the finalized header and any intermediate headers needed to apply it to the client on the counterpaty chain
// The order of the returned header slice should be as: [<intermediate headers>..., <update header>]
Expand Down
10 changes: 8 additions & 2 deletions core/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,12 @@ func (srv *RelayService) Serve(ctx context.Context) error {
return err
}

pseqs2, err := srv.st.ProcessTimeoutPackets(ctx, srv.src, srv.dst, srv.sh, pseqs)
if err != nil {
logger.Error("failed to process timeout packets", err)
return err
}

// get unrelayed acks
aseqs, err := srv.st.UnrelayedAcknowledgements(ctx, srv.src, srv.dst, srv.sh, false)
if err != nil {
Expand All @@ -128,7 +134,7 @@ func (srv *RelayService) Serve(ctx context.Context) error {

msgs := NewRelayMsgs()

doExecuteRelaySrc, doExecuteRelayDst := srv.shouldExecuteRelay(ctx, pseqs)
doExecuteRelaySrc, doExecuteRelayDst := srv.shouldExecuteRelay(ctx, pseqs2)
doExecuteAckSrc, doExecuteAckDst := srv.shouldExecuteRelay(ctx, aseqs)
// update clients
if m, err := srv.st.UpdateClients(ctx, srv.src, srv.dst, doExecuteRelaySrc, doExecuteRelayDst, doExecuteAckSrc, doExecuteAckDst, srv.sh, true); err != nil {
Expand All @@ -139,7 +145,7 @@ func (srv *RelayService) Serve(ctx context.Context) error {
}

// relay packets if unrelayed seqs exist
if m, err := srv.st.RelayPackets(ctx, srv.src, srv.dst, pseqs, srv.sh, doExecuteRelaySrc, doExecuteRelayDst); err != nil {
if m, err := srv.st.RelayPackets(ctx, srv.src, srv.dst, pseqs2, srv.sh, doExecuteRelaySrc, doExecuteRelayDst); err != nil {
logger.Error("failed to relay packets", err)
return err
} else {
Expand Down
Loading
Loading