Skip to content

Commit

Permalink
fix: Improved multinode proxy (#249)
Browse files Browse the repository at this point in the history
* Improved multinode proxy

* remove forced cluster check

* improve UI sync status

* fix typo

* fix UI

* fix UI chain list
  • Loading branch information
LexLuthr authored Oct 18, 2024
1 parent 3656721 commit 5e26663
Show file tree
Hide file tree
Showing 2 changed files with 192 additions and 39 deletions.
188 changes: 160 additions & 28 deletions deps/apiinfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,29 @@ package deps

import (
"context"
"errors"
"fmt"
"math/rand"
"net/http"
"reflect"
"sync"
"time"

logging "github.com/ipfs/go-log/v2"
"github.com/urfave/cli/v2"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-jsonrpc"
"github.com/filecoin-project/go-state-types/big"

"github.com/filecoin-project/curio/api"

"github.com/filecoin-project/lotus/chain/types"
cliutil "github.com/filecoin-project/lotus/cli/util"
"github.com/filecoin-project/lotus/lib/retry"
)

var clog = logging.Logger("curio/chain")

func GetFullNodeAPIV1Curio(ctx *cli.Context, ainfoCfg []string) (api.Chain, jsonrpc.ClientCloser, error) {
if tn, ok := ctx.App.Metadata["testnode-full"]; ok {
return tn.(api.Chain), func() {}, nil
Expand Down Expand Up @@ -48,17 +55,15 @@ func GetFullNodeAPIV1Curio(ctx *cli.Context, ainfoCfg []string) (api.Chain, json
for _, head := range httpHeads {
v1api, closer, err := newChainNodeRPCV1(ctx.Context, head.addr, head.header)
if err != nil {
log.Warnf("Not able to establish connection to node with addr: %s, Reason: %s", head.addr, err.Error())
clog.Warnf("Not able to establish connection to node with addr: %s, Reason: %s", head.addr, err.Error())
continue
}
fullNodes = append(fullNodes, v1api)
closers = append(closers, closer)
}

// When running in cluster mode and trying to establish connections to multiple nodes, fail
// if less than 2 lotus nodes are actually running
if len(httpHeads) > 1 && len(fullNodes) < 2 {
return nil, nil, xerrors.Errorf("Not able to establish connection to more than a single node")
if len(fullNodes) == 0 {
return nil, nil, xerrors.Errorf("failed to establish connection with all nodes")
}

finalCloser := func() {
Expand Down Expand Up @@ -96,54 +101,152 @@ func newChainNodeRPCV1(ctx context.Context, addr string, requestHeader http.Head
return &res, closer, err
}

const initialBackoff = time.Second
const maxRetryAttempts = 5
const maxBehindBestHealthy = 1

var errorsToRetry = []error{&jsonrpc.RPCConnectionError{}, &jsonrpc.ErrClient{}}

const preferredAllBad = -1

// FullNodeProxy creates a proxy for the Chain API
// TODO: port improvements here from https://github.com/filecoin-project/lotus/pull/11470
func FullNodeProxy[T api.Chain](ins []T, outstr *api.ChainStruct) {
providerCount := len(ins)

var healthyLk sync.Mutex
unhealthyProviders := make([]bool, providerCount)

nextHealthyProvider := func(start int) int {
healthyLk.Lock()
defer healthyLk.Unlock()

for i := 0; i < providerCount; i++ {
idx := (start + i) % providerCount
if !unhealthyProviders[idx] {
return idx
}
}
return preferredAllBad
}

// watch provider health
startWatch := func() {
if len(ins) == 1 {
// not like we have any onter node to go to..
return
}

// don't bother for short-running commands
time.Sleep(250 * time.Millisecond)

var bestKnownTipset, nextBestKnownTipset *types.TipSet

for {
var wg sync.WaitGroup
wg.Add(providerCount)

for i := 0; i < providerCount; i++ {
go func(i int) {
defer wg.Done()

toctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) // todo better timeout
ch, err := ins[i].ChainHead(toctx)
cancel()

// error is definitely not healthy
if err != nil {
healthyLk.Lock()
unhealthyProviders[i] = true
healthyLk.Unlock()

clog.Debugw("rpc check chain head call failed", "fail_type", "rpc_error", "provider", i, "error", err)
return
}

healthyLk.Lock()
// maybe set best next
if nextBestKnownTipset == nil || big.Cmp(ch.ParentWeight(), nextBestKnownTipset.ParentWeight()) > 0 || len(ch.Blocks()) > len(nextBestKnownTipset.Blocks()) {
nextBestKnownTipset = ch
}

if bestKnownTipset != nil {
// if we're behind the best tipset, mark as unhealthy
unhealthyProviders[i] = ch.Height() < bestKnownTipset.Height()-maxBehindBestHealthy
if unhealthyProviders[i] {
clog.Debugw("rpc check chain head call failed", "fail_type", "behind_best", "provider", i, "height", ch.Height(), "best_height", bestKnownTipset.Height())
}
}
healthyLk.Unlock()
}(i)
}

wg.Wait()
bestKnownTipset = nextBestKnownTipset

time.Sleep(5 * time.Second)
}
}
var starWatchOnce sync.Once

// populate output api proxy

outs := api.GetInternalStructs(outstr)

var rins []reflect.Value
var apiProviders []reflect.Value
for _, in := range ins {
rins = append(rins, reflect.ValueOf(in))
apiProviders = append(apiProviders, reflect.ValueOf(in))
}

for _, out := range outs {
rProxyInternal := reflect.ValueOf(out).Elem()
rOutStruct := reflect.ValueOf(out).Elem()

for f := 0; f < rProxyInternal.NumField(); f++ {
field := rProxyInternal.Type().Field(f)
for f := 0; f < rOutStruct.NumField(); f++ {
field := rOutStruct.Type().Field(f)

var fns []reflect.Value
for _, rin := range rins {
fns = append(fns, rin.MethodByName(field.Name))
var providerFuncs []reflect.Value
for _, rin := range apiProviders {
mv := rin.MethodByName(field.Name)
if !mv.IsValid() {
continue
}
providerFuncs = append(providerFuncs, mv)
}

rProxyInternal.Field(f).Set(reflect.MakeFunc(field.Type, func(args []reflect.Value) (results []reflect.Value) {
errorsToRetry := []error{&jsonrpc.RPCConnectionError{}, &jsonrpc.ErrClient{}}
initialBackoff, err := time.ParseDuration("1s")
if err != nil {
return nil
}
rOutStruct.Field(f).Set(reflect.MakeFunc(field.Type, func(args []reflect.Value) (results []reflect.Value) {
starWatchOnce.Do(func() {
go startWatch()
})

ctx := args[0].Interface().(context.Context)

curr := -1
preferredProvider := new(int)
*preferredProvider = nextHealthyProvider(0)
if *preferredProvider == preferredAllBad {
// select at random, retry will do it's best
*preferredProvider = rand.Intn(providerCount)
}

// for calls that need to be performed on the same node
// primarily for miner when calling create block and submit block subsequently
key := contextKey("retry-node")
if ctx.Value(key) != nil {
if (*ctx.Value(key).(**int)) == nil {
*ctx.Value(key).(**int) = &curr
*ctx.Value(key).(**int) = preferredProvider
} else {
curr = **ctx.Value(key).(**int) - 1
preferredProvider = *ctx.Value(key).(**int)
}
}

total := len(rins)
result, _ := retry.Retry(ctx, 5, initialBackoff, errorsToRetry, func() ([]reflect.Value, error) {
curr = (curr + 1) % total
result, _ := Retry(ctx, maxRetryAttempts, initialBackoff, errorsToRetry, func(isRetry bool) ([]reflect.Value, error) {
if isRetry {
pp := nextHealthyProvider(*preferredProvider + 1)
if pp == -1 {
return nil, xerrors.Errorf("no healthy providers")
}
*preferredProvider = pp
}

result := fns[curr].Call(args)
result := providerFuncs[*preferredProvider].Call(args)
if result[len(result)-1].IsNil() {
return result, nil
}
Expand All @@ -155,3 +258,32 @@ func FullNodeProxy[T api.Chain](ins []T, outstr *api.ChainStruct) {
}
}
}

func Retry[T any](ctx context.Context, attempts int, initialBackoff time.Duration, errorTypes []error, f func(isRetry bool) (T, error)) (result T, err error) {
for i := 0; i < attempts; i++ {
if i > 0 {
clog.Debugw("Retrying after error:", err)
time.Sleep(initialBackoff)
initialBackoff *= 2
}
result, err = f(i > 0)
if err == nil || !ErrorIsIn(err, errorTypes) {
return result, err
}
if ctx.Err() != nil {
return result, ctx.Err()
}
}
clog.Errorf("Failed after %d attempts, last error: %s", attempts, err)
return result, err
}

func ErrorIsIn(err error, errorTypes []error) bool {
for _, etype := range errorTypes {
tmp := reflect.New(reflect.PointerTo(reflect.ValueOf(etype).Elem().Type())).Interface()
if errors.As(err, &tmp) {
return true
}
}
return false
}
43 changes: 32 additions & 11 deletions web/api/webrpc/sync_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import (
"github.com/BurntSushi/toml"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-jsonrpc"

"github.com/filecoin-project/curio/api"
"github.com/filecoin-project/curio/build"

cliutil "github.com/filecoin-project/lotus/cli/util"
Expand Down Expand Up @@ -70,19 +73,18 @@ func (a *WebRPC) SyncerState(ctx context.Context) ([]RpcInfo, error) {
}
}

rpcInfos := map[string]minimalApiInfo{} // config name -> api info
confNameToAddr := map[string]string{} // config name -> api address
var rpcInfos []string
confNameToAddr := make(map[string][]string) // config name -> api addresses

err := forEachConfig[minimalApiInfo](a, func(name string, info minimalApiInfo) error {
if len(info.Apis.ChainApiInfo) == 0 {
return nil
}

rpcInfos[name] = info

for _, addr := range info.Apis.ChainApiInfo {
rpcInfos = append(rpcInfos, addr)
ai := cliutil.ParseApiInfo(addr)
confNameToAddr[name] = ai.Addr
confNameToAddr[name] = append(confNameToAddr[name], ai.Addr)
}

return nil
Expand All @@ -98,7 +100,7 @@ func (a *WebRPC) SyncerState(ctx context.Context) ([]RpcInfo, error) {

var wg sync.WaitGroup
for _, info := range rpcInfos {
ai := cliutil.ParseApiInfo(info.Apis.ChainApiInfo[0])
ai := cliutil.ParseApiInfo(info)
if dedup[ai.Addr] {
continue
}
Expand All @@ -107,9 +109,11 @@ func (a *WebRPC) SyncerState(ctx context.Context) ([]RpcInfo, error) {
go func() {
defer wg.Done()
var clayers []string
for layer, a := range confNameToAddr {
if a == ai.Addr {
clayers = append(clayers, layer)
for layer, adrs := range confNameToAddr {
for _, adr := range adrs {
if adr == ai.Addr {
clayers = append(clayers, layer)
}
}
}

Expand All @@ -123,13 +127,30 @@ func (a *WebRPC) SyncerState(ctx context.Context) ([]RpcInfo, error) {
defer infosLk.Unlock()
infos[ai.Addr] = myinfo
}()
ver, err := a.deps.Chain.Version(ctx)

addr, err := ai.DialArgs("v1")
if err != nil {
log.Warnf("could not get DialArgs: %w", err)
}

var res api.ChainStruct
closer, err := jsonrpc.NewMergeClient(ctx, addr, "Filecoin",
api.GetInternalStructs(&res), ai.AuthHeader(), []jsonrpc.Option{jsonrpc.WithErrors(jsonrpc.NewErrors())}...)
if err != nil {
log.Warnf("error creating jsonrpc client: %v", err)
return
}
defer closer()

full := &res

ver, err := full.Version(ctx)
if err != nil {
log.Warnw("Version", "error", err)
return
}

head, err := a.deps.Chain.ChainHead(ctx)
head, err := full.ChainHead(ctx)
if err != nil {
log.Warnw("ChainHead", "error", err)
return
Expand Down

0 comments on commit 5e26663

Please sign in to comment.