Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func createApp(

// syncLog syncs the log to the specific output at the end of the program.
func syncLog(log logger.Logger) {
if err := log.Sync(); err != nil && !errors.Is(err, syscall.ENOTTY) {
if err := log.Sync(); err != nil && !errors.Is(err, syscall.ENOTTY) && !errors.Is(err, syscall.EINVAL) {
fmt.Fprintf(os.Stderr, "failed to sync logs: %v\n", err)
}
}
136 changes: 90 additions & 46 deletions relayer/chains/evm/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math/big"
"sync"
"time"

ethereum "github.com/ethereum/go-ethereum"
Expand All @@ -15,6 +16,32 @@ import (
"github.com/bandprotocol/falcon/relayer/logger"
)

type EVMClients struct {
mu sync.RWMutex
clients map[string]*ethclient.Client
}

func NewEVMClients() EVMClients {
return EVMClients{
clients: make(map[string]*ethclient.Client),
}
}

func (ec *EVMClients) GetClient(endpoint string) (*ethclient.Client, bool) {
ec.mu.RLock()
defer ec.mu.RUnlock()

client, exists := ec.clients[endpoint]
return client, exists
}

func (ec *EVMClients) SetClient(endpoint string, client *ethclient.Client) {
ec.mu.Lock()
defer ec.mu.Unlock()

ec.clients[endpoint] = client
}

var _ Client = &client{}

// Client is the interface that handles interactions with the EVM chain.
Expand Down Expand Up @@ -45,7 +72,8 @@ type client struct {
Log logger.Logger

selectedEndpoint string
client *ethclient.Client
selectedClient *ethclient.Client
clients EVMClients
alert alert.Alert
}

Expand All @@ -58,29 +86,64 @@ func NewClient(chainName string, cfg *EVMChainProviderConfig, log logger.Logger,
ExecuteTimeout: cfg.ExecuteTimeout,
Log: log.With("chain_name", chainName),
alert: alert,
clients: NewEVMClients(),
}
}

// Connect connects to the EVM chain.
func (c *client) Connect(ctx context.Context) error {
var wg sync.WaitGroup
for idx, endpoint := range c.Endpoints {
_, ok := c.clients.GetClient(endpoint)
if ok {
continue
}

wg.Add(1)
go func(idx int, endpoint string) {
defer wg.Done()
client, err := ethclient.Dial(endpoint)
if err != nil {
c.Log.Warn(
"Failed to connect to EVM chain",
"endpoint", endpoint,
err,
)
alert.HandleAlert(
c.alert,
alert.NewTopic(alert.ConnectSingleChainClientErrorMsg).
WithChainName(c.ChainName).
WithEndpoint(endpoint),
err.Error(),
)
return
}
alert.HandleReset(
c.alert,
alert.NewTopic(alert.ConnectSingleChainClientErrorMsg).
WithChainName(c.ChainName).
WithEndpoint(endpoint),
)
c.clients.SetClient(endpoint, client)
}(idx, endpoint)
}

wg.Wait()
res, err := c.getClientWithMaxHeight(ctx)
if err != nil {
c.selectedEndpoint = ""
c.selectedClient = nil
c.Log.Error("Failed to connect to EVM chain", err)
return err
}

// Close existing client if it exists
if c.client != nil {
c.client.Close()
}

// only log when new endpoint is used
if c.selectedEndpoint != res.Endpoint {
c.Log.Info("Connected to EVM chain", "endpoint", res.Endpoint)
}

c.selectedEndpoint = res.Endpoint
c.client = res.Client
c.selectedClient = res.Client

return nil
}
Expand Down Expand Up @@ -110,7 +173,7 @@ func (c *client) NonceAt(ctx context.Context, address gethcommon.Address) (uint6
newCtx, cancel := context.WithTimeout(ctx, c.QueryTimeout)
defer cancel()

nonce, err := c.client.NonceAt(newCtx, address, nil)
nonce, err := c.selectedClient.NonceAt(newCtx, address, nil)
if err != nil {
c.Log.Error(
"Failed to get nonce",
Expand All @@ -129,7 +192,7 @@ func (c *client) GetBlockHeight(ctx context.Context) (uint64, error) {
newCtx, cancel := context.WithTimeout(ctx, c.QueryTimeout)
defer cancel()

blockHeight, err := c.client.BlockNumber(newCtx)
blockHeight, err := c.selectedClient.BlockNumber(newCtx)
if err != nil {
c.Log.Error("Failed to get block height", "endpoint", c.selectedEndpoint, err)
return 0, fmt.Errorf("[EVMClient] failed to get block height: %w", err)
Expand All @@ -143,7 +206,7 @@ func (c *client) GetBlock(ctx context.Context, height *big.Int) (*gethtypes.Bloc
newCtx, cancel := context.WithTimeout(ctx, c.QueryTimeout)
defer cancel()

block, err := c.client.BlockByNumber(newCtx, height)
block, err := c.selectedClient.BlockByNumber(newCtx, height)
if err != nil {
c.Log.Error(
"Failed to get block by height",
Expand All @@ -163,7 +226,7 @@ func (c *client) GetTxReceipt(ctx context.Context, txHash string) (*TxReceipt, e
defer cancel()

var receipt *TxReceipt
err := c.client.Client().CallContext(newCtx, &receipt, "eth_getTransactionReceipt", txHash)
err := c.selectedClient.Client().CallContext(newCtx, &receipt, "eth_getTransactionReceipt", txHash)
if err == nil && receipt == nil {
// it's normal to not have receipt for pending tx
err = ethereum.NotFound
Expand All @@ -186,7 +249,7 @@ func (c *client) GetTxByHash(ctx context.Context, txHash string) (*gethtypes.Tra
newCtx, cancel := context.WithTimeout(ctx, c.QueryTimeout)
defer cancel()

tx, isPending, err := c.client.TransactionByHash(newCtx, gethcommon.HexToHash(txHash))
tx, isPending, err := c.selectedClient.TransactionByHash(newCtx, gethcommon.HexToHash(txHash))
if err != nil {
c.Log.Error(
"Failed to get tx by hash",
Expand All @@ -210,7 +273,7 @@ func (c *client) Query(ctx context.Context, gethAddr gethcommon.Address, data []
newCtx, cancel := context.WithTimeout(ctx, c.QueryTimeout)
defer cancel()

res, err := c.client.CallContract(newCtx, callMsg, nil)
res, err := c.selectedClient.CallContract(newCtx, callMsg, nil)
if err != nil {
c.Log.Error(
"Failed to query contract",
Expand All @@ -229,7 +292,7 @@ func (c *client) EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64,
newCtx, cancel := context.WithTimeout(ctx, c.QueryTimeout)
defer cancel()

gas, err := c.client.EstimateGas(newCtx, msg)
gas, err := c.selectedClient.EstimateGas(newCtx, msg)
if err != nil {
c.Log.Error(
"Failed to estimate gas",
Expand All @@ -248,7 +311,7 @@ func (c *client) EstimateGasPrice(ctx context.Context) (*big.Int, error) {
newCtx, cancel := context.WithTimeout(ctx, c.QueryTimeout)
defer cancel()

gasPrice, err := c.client.SuggestGasPrice(newCtx)
gasPrice, err := c.selectedClient.SuggestGasPrice(newCtx)
if err != nil {
c.Log.Error(
"Failed to estimate gas price",
Expand All @@ -267,7 +330,7 @@ func (c *client) EstimateBaseFee(ctx context.Context) (*big.Int, error) {
newCtx, cancel := context.WithTimeout(ctx, c.QueryTimeout)
defer cancel()

latestHeader, err := c.client.HeaderByNumber(newCtx, nil)
latestHeader, err := c.selectedClient.HeaderByNumber(newCtx, nil)
if err != nil {
return nil, err
}
Expand All @@ -281,7 +344,7 @@ func (c *client) EstimateGasTipCap(ctx context.Context) (*big.Int, error) {
newCtx, cancel := context.WithTimeout(ctx, c.QueryTimeout)
defer cancel()

gasTipCap, err := c.client.SuggestGasTipCap(newCtx)
gasTipCap, err := c.selectedClient.SuggestGasTipCap(newCtx)
if err != nil {
c.Log.Error(
"Failed to estimate gas tip cap",
Expand Down Expand Up @@ -310,7 +373,7 @@ func (c *client) BroadcastTx(ctx context.Context, tx *gethtypes.Transaction) (st
newCtx, cancel := context.WithTimeout(ctx, c.ExecuteTimeout)
defer cancel()

if err := c.client.SendTransaction(newCtx, tx); err != nil {
if err := c.selectedClient.SendTransaction(newCtx, tx); err != nil {
c.Log.Error(
"Failed to broadcast tx",
"endpoint", c.selectedEndpoint,
Expand All @@ -328,23 +391,12 @@ func (c *client) BroadcastTx(ctx context.Context, tx *gethtypes.Transaction) (st
func (c *client) getClientWithMaxHeight(ctx context.Context) (ClientConnectionResult, error) {
ch := make(chan ClientConnectionResult, len(c.Endpoints))

for _, endpoint := range c.Endpoints {
go func(endpoint string) {
client, err := ethclient.Dial(endpoint)
if err != nil {
c.Log.Warn(
"Failed to connect to EVM chain",
"endpoint", endpoint,
err,
)
for idx, endpoint := range c.Endpoints {
go func(idx int, endpoint string) {
client, ok := c.clients.GetClient(endpoint)

if !ok {
ch <- ClientConnectionResult{endpoint, nil, 0}
alert.HandleAlert(
c.alert,
alert.NewTopic(alert.ConnectSingleChainClientErrorMsg).
WithChainName(c.ChainName).
WithEndpoint(endpoint),
err.Error(),
)
return
}

Expand All @@ -358,7 +410,6 @@ func (c *client) getClientWithMaxHeight(ctx context.Context) (ClientConnectionRe
"endpoint", endpoint,
err,
)
client.Close() // Close client on error
ch <- ClientConnectionResult{endpoint, nil, 0}

alert.HandleAlert(
Expand All @@ -377,7 +428,6 @@ func (c *client) getClientWithMaxHeight(ctx context.Context) (ClientConnectionRe
"Skipping client because it is not fully synced",
"endpoint", endpoint,
)
client.Close() // Close client when not synced
ch <- ClientConnectionResult{endpoint, nil, 0}
alert.HandleAlert(
c.alert,
Expand All @@ -396,7 +446,6 @@ func (c *client) getClientWithMaxHeight(ctx context.Context) (ClientConnectionRe
"endpoint", endpoint,
err,
)
client.Close() // Close client on error
ch <- ClientConnectionResult{endpoint, nil, 0}
alert.HandleAlert(
c.alert,
Expand All @@ -421,20 +470,15 @@ func (c *client) getClientWithMaxHeight(ctx context.Context) (ClientConnectionRe
)

ch <- ClientConnectionResult{endpoint, client, blockHeight}
}(endpoint)
}(idx, endpoint)
}

var result ClientConnectionResult
for i := 0; i < len(c.Endpoints); i++ {
r := <-ch
if r.Client != nil {
if r.BlockHeight > result.BlockHeight {
if result.Client != nil {
result.Client.Close()
}
if r.BlockHeight > result.BlockHeight || (r.Endpoint == c.selectedEndpoint && r.BlockHeight == result.BlockHeight) {
result = r
} else {
r.Client.Close()
}
}
}
Expand All @@ -455,7 +499,7 @@ func (c *client) getClientWithMaxHeight(ctx context.Context) (ClientConnectionRe

// checkAndConnect checks if the client is connected to the EVM chain, if not connect it.
func (c *client) CheckAndConnect(ctx context.Context) error {
if c.client != nil {
if c.selectedClient != nil {
return nil
}

Expand All @@ -467,7 +511,7 @@ func (c *client) GetBalance(ctx context.Context, gethAddr gethcommon.Address, bl
newCtx, cancel := context.WithTimeout(ctx, c.QueryTimeout)
defer cancel()

res, err := c.client.BalanceAt(newCtx, gethAddr, blockNumber)
res, err := c.selectedClient.BalanceAt(newCtx, gethAddr, blockNumber)
if err != nil {
c.Log.Error(
"Failed to query balance",
Expand Down
Loading