diff --git a/internal/common/api/parallel_api.go b/internal/common/api/parallel_api.go new file mode 100644 index 0000000..e42c7db --- /dev/null +++ b/internal/common/api/parallel_api.go @@ -0,0 +1,123 @@ +package api + +import ( + "context" + "fmt" + "time" + + "github.com/cosmostation/cvms/internal/common" + "github.com/cosmostation/cvms/internal/common/types" + "github.com/go-resty/resty/v2" +) + +// GetBlockParallel fetches block data using parallel client if available +func GetBlockParallel(c common.CommonApp, height int64) ( + /* block height */ int64, + /* block timestamp */ time.Time, + /* proposer address */ string, + /* block txs */ []types.Tx, + /* last commit block height */ int64, + /* block signatures */ []types.Signature, + /* error */ error, +) { + // Use parallel client if available + if c.UseParallelClient() { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Try parallel request first + resp, err := c.ParallelClient.ParallelRequest(ctx, func(client *resty.Client) (*resty.Response, error) { + return client.R().Get(fmt.Sprintf("/block?height=%d", height)) + }, true) // true for RPC + + if err == nil { + return parseBlockResponse(resp) + } + + // Log parallel failure and fall back to regular client + c.Warnf("Parallel block request failed, falling back to regular client: %v", err) + } + + // Fallback to regular GetBlock + return GetBlock(c.CommonClient, height) +} + +// GetValidatorsParallel fetches validators using parallel client if available +func GetValidatorsParallel(c common.CommonApp, height ...int64) ([]types.CosmosValidator, error) { + // Use parallel client if available + if c.UseParallelClient() { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + var path string + if len(height) > 0 && height[0] > 0 { + path = fmt.Sprintf("/validators?height=%d&per_page=200", height[0]) + } else { + path = "/validators?per_page=200" + } + + // Try parallel request first + resp, err := c.ParallelClient.ParallelRequest(ctx, func(client *resty.Client) (*resty.Response, error) { + return client.R().Get(path) + }, true) // true for RPC + + if err == nil { + return parseValidatorsResponse(resp) + } + + // Log parallel failure and fall back to regular client + c.Warnf("Parallel validators request failed, falling back to regular client: %v", err) + } + + // Fallback to regular GetValidators + return GetValidators(c.CommonClient, height...) +} + +// GetStatusParallel fetches status using parallel client if available +func GetStatusParallel(c common.CommonApp) ( + /* block height */ int64, + /* block timestamp */ time.Time, + /* error */ error, +) { + // Use parallel client if available + if c.UseParallelClient() { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + // Try parallel request first + resp, err := c.ParallelClient.ParallelRequest(ctx, func(client *resty.Client) (*resty.Response, error) { + return client.R().Get("/status") + }, true) // true for RPC + + if err == nil { + return parseStatusResponse(resp) + } + + // Log parallel failure and fall back to regular client + c.Warnf("Parallel status request failed, falling back to regular client: %v", err) + } + + // Fallback to regular GetStatus + return GetStatus(c.CommonClient) +} + +// Helper function to parse block response (reuse existing parsing logic) +func parseBlockResponse(resp *resty.Response) (int64, time.Time, string, []types.Tx, int64, []types.Signature, error) { + // This would contain the same parsing logic as the original GetBlock function + // For now, we'll return an error to implement later + return 0, time.Time{}, "", nil, 0, nil, fmt.Errorf("parseBlockResponse not implemented yet - use fallback") +} + +// Helper function to parse validators response (reuse existing parsing logic) +func parseValidatorsResponse(resp *resty.Response) ([]types.CosmosValidator, error) { + // This would contain the same parsing logic as the original GetValidators function + // For now, we'll return an error to implement later + return nil, fmt.Errorf("parseValidatorsResponse not implemented yet - use fallback") +} + +// Helper function to parse status response (reuse existing parsing logic) +func parseStatusResponse(resp *resty.Response) (int64, time.Time, error) { + // This would contain the same parsing logic as the original GetStatus function + // For now, we'll return an error to implement later + return 0, time.Time{}, fmt.Errorf("parseStatusResponse not implemented yet - use fallback") +} diff --git a/internal/common/parallel_client.go b/internal/common/parallel_client.go new file mode 100644 index 0000000..8326de2 --- /dev/null +++ b/internal/common/parallel_client.go @@ -0,0 +1,272 @@ +package common + +import ( + "context" + "errors" + "math/rand" + "sync" + "sync/atomic" + "time" + + "github.com/cosmostation/cvms/internal/helper/healthcheck" + "github.com/go-resty/resty/v2" + "github.com/sirupsen/logrus" +) + +// ParallelRPCClient manages multiple RPC endpoints and load balances requests +type ParallelRPCClient struct { + rpcEndpoints []string + apiEndpoints []string + healthyRPCs []string + healthyAPIs []string + rpcIndex int64 + apiIndex int64 + clients map[string]*resty.Client + mu sync.RWMutex + logger *logrus.Entry + protocolType string + + // Health check interval + healthCheckInterval time.Duration + stopHealthCheck chan bool + isRunning bool +} + +// NewParallelRPCClient creates a new parallel RPC client +func NewParallelRPCClient(rpcEndpoints, apiEndpoints []string, protocolType string, logger *logrus.Entry) *ParallelRPCClient { + client := &ParallelRPCClient{ + rpcEndpoints: rpcEndpoints, + apiEndpoints: apiEndpoints, + healthyRPCs: make([]string, 0), + healthyAPIs: make([]string, 0), + clients: make(map[string]*resty.Client), + logger: logger, + protocolType: protocolType, + healthCheckInterval: 30 * time.Second, // Check health every 30 seconds + stopHealthCheck: make(chan bool, 1), + isRunning: false, + } + + // Create a logger that doesn't output to avoid spam + restyLogger := logrus.New() + restyLogger.SetOutput(logrus.StandardLogger().Out) + restyLogger.SetLevel(logrus.ErrorLevel) // Only log errors + + // Initialize clients for each endpoint + for _, endpoint := range rpcEndpoints { + client.clients[endpoint] = resty.New(). + SetRetryCount(2). + SetRetryWaitTime(100 * time.Millisecond). + SetRetryMaxWaitTime(1 * time.Second). + SetTimeout(5 * time.Second). + SetBaseURL(endpoint). + SetLogger(restyLogger) + } + + for _, endpoint := range apiEndpoints { + client.clients[endpoint] = resty.New(). + SetRetryCount(2). + SetRetryWaitTime(100 * time.Millisecond). + SetRetryMaxWaitTime(1 * time.Second). + SetTimeout(5 * time.Second). + SetBaseURL(endpoint). + SetLogger(restyLogger) + } + + // Initial health check + client.updateHealthyEndpoints() + + // Start continuous health checking + go client.continuousHealthCheck() + + return client +} + +// GetNextRPCClient returns the next healthy RPC client using round-robin +func (p *ParallelRPCClient) GetNextRPCClient() (*resty.Client, string, error) { + p.mu.RLock() + defer p.mu.RUnlock() + + if len(p.healthyRPCs) == 0 { + return nil, "", errors.New("no healthy RPC endpoints available") + } + + // Round-robin selection + index := atomic.AddInt64(&p.rpcIndex, 1) % int64(len(p.healthyRPCs)) + endpoint := p.healthyRPCs[index] + + return p.clients[endpoint], endpoint, nil +} + +// GetNextAPIClient returns the next healthy API client using round-robin +func (p *ParallelRPCClient) GetNextAPIClient() (*resty.Client, string, error) { + p.mu.RLock() + defer p.mu.RUnlock() + + if len(p.healthyAPIs) == 0 { + return nil, "", errors.New("no healthy API endpoints available") + } + + // Round-robin selection + index := atomic.AddInt64(&p.apiIndex, 1) % int64(len(p.healthyAPIs)) + endpoint := p.healthyAPIs[index] + + return p.clients[endpoint], endpoint, nil +} + +// GetRandomRPCClient returns a random healthy RPC client +func (p *ParallelRPCClient) GetRandomRPCClient() (*resty.Client, string, error) { + p.mu.RLock() + defer p.mu.RUnlock() + + if len(p.healthyRPCs) == 0 { + return nil, "", errors.New("no healthy RPC endpoints available") + } + + // Random selection + index := rand.Intn(len(p.healthyRPCs)) + endpoint := p.healthyRPCs[index] + + return p.clients[endpoint], endpoint, nil +} + +// GetRandomAPIClient returns a random healthy API client +func (p *ParallelRPCClient) GetRandomAPIClient() (*resty.Client, string, error) { + p.mu.RLock() + defer p.mu.RUnlock() + + if len(p.healthyAPIs) == 0 { + return nil, "", errors.New("no healthy API endpoints available") + } + + // Random selection + index := rand.Intn(len(p.healthyAPIs)) + endpoint := p.healthyAPIs[index] + + return p.clients[endpoint], endpoint, nil +} + +// ParallelRequest executes the same request on multiple endpoints and returns the first successful response +func (p *ParallelRPCClient) ParallelRequest(ctx context.Context, requestFunc func(*resty.Client) (*resty.Response, error), useRPC bool) (*resty.Response, error) { + p.mu.RLock() + var endpoints []string + if useRPC { + endpoints = make([]string, len(p.healthyRPCs)) + copy(endpoints, p.healthyRPCs) + } else { + endpoints = make([]string, len(p.healthyAPIs)) + copy(endpoints, p.healthyAPIs) + } + p.mu.RUnlock() + + if len(endpoints) == 0 { + return nil, errors.New("no healthy endpoints available") + } + + // Use only first 2-3 endpoints to avoid overwhelming the network + maxConcurrent := len(endpoints) + if maxConcurrent > 3 { + maxConcurrent = 3 + } + + type result struct { + response *resty.Response + err error + endpoint string + } + + resultChan := make(chan result, maxConcurrent) + + // Start requests in parallel + for i := 0; i < maxConcurrent; i++ { + go func(endpoint string) { + client := p.clients[endpoint] + resp, err := requestFunc(client) + resultChan <- result{response: resp, err: err, endpoint: endpoint} + }(endpoints[i]) + } + + // Return first successful response + var lastErr error + for i := 0; i < maxConcurrent; i++ { + select { + case res := <-resultChan: + if res.err == nil && res.response.StatusCode() < 400 { + p.logger.Debugf("Parallel request succeeded using endpoint: %s", res.endpoint) + return res.response, nil + } + lastErr = res.err + p.logger.Debugf("Parallel request failed for endpoint %s: %v", res.endpoint, res.err) + case <-ctx.Done(): + return nil, ctx.Err() + } + } + + return nil, lastErr +} + +// updateHealthyEndpoints checks health of all endpoints and updates healthy lists +func (p *ParallelRPCClient) updateHealthyEndpoints() { + p.mu.Lock() + defer p.mu.Unlock() + + // Check RPC endpoints + newHealthyRPCs := healthcheck.FilterHealthRPCEndpoints(p.rpcEndpoints, p.protocolType) + + // Check API endpoints + newHealthyAPIs := healthcheck.FilterHealthEndpoints(p.apiEndpoints, p.protocolType) + + // Log changes + if len(newHealthyRPCs) != len(p.healthyRPCs) { + p.logger.Infof("Healthy RPC endpoints updated: %v -> %v", p.healthyRPCs, newHealthyRPCs) + } + if len(newHealthyAPIs) != len(p.healthyAPIs) { + p.logger.Infof("Healthy API endpoints updated: %v -> %v", p.healthyAPIs, newHealthyAPIs) + } + + p.healthyRPCs = newHealthyRPCs + p.healthyAPIs = newHealthyAPIs +} + +// continuousHealthCheck runs health checks in background +func (p *ParallelRPCClient) continuousHealthCheck() { + p.isRunning = true + ticker := time.NewTicker(p.healthCheckInterval) + defer ticker.Stop() + defer func() { p.isRunning = false }() + + for { + select { + case <-ticker.C: + p.updateHealthyEndpoints() + case <-p.stopHealthCheck: + p.logger.Debug("Parallel RPC client health check stopped") + return + } + } +} + +// Stop stops the health checking goroutine +func (p *ParallelRPCClient) Stop() { + if p.isRunning { + p.stopHealthCheck <- true + } +} + +// GetHealthyEndpointsCount returns the count of healthy endpoints +func (p *ParallelRPCClient) GetHealthyEndpointsCount() (rpc int, api int) { + p.mu.RLock() + defer p.mu.RUnlock() + return len(p.healthyRPCs), len(p.healthyAPIs) +} + +// HasHealthyEndpoints returns true if there are healthy endpoints available +func (p *ParallelRPCClient) HasHealthyEndpoints() bool { + rpcCount, apiCount := p.GetHealthyEndpointsCount() + return rpcCount > 0 && apiCount > 0 +} + +// SetHealthCheckInterval allows customizing the health check interval +func (p *ParallelRPCClient) SetHealthCheckInterval(interval time.Duration) { + p.healthCheckInterval = interval +} diff --git a/internal/common/parallel_client_test.go b/internal/common/parallel_client_test.go new file mode 100644 index 0000000..c656fa8 --- /dev/null +++ b/internal/common/parallel_client_test.go @@ -0,0 +1,106 @@ +package common + +import ( + "testing" + "time" + + "github.com/sirupsen/logrus" +) + +func TestParallelRPCClient(t *testing.T) { + // Create test logger + logger := logrus.NewEntry(logrus.New()) + + // Test endpoints (using akash endpoints) + rpcEndpoints := []string{ + "https://rpc.lavenderfive.com:443/akash", + "https://akash-rpc.polkachu.com:443", + } + apiEndpoints := []string{ + "https://akash-api.polkachu.com:443", + "https://rest.lavenderfive.com:443/akash", + } + + // Create parallel client + client := NewParallelRPCClient(rpcEndpoints, apiEndpoints, "cosmos", logger) + defer client.Stop() + + // Wait for initial health check + t.Log("Waiting for initial health check...") + time.Sleep(3 * time.Second) + + // Test getting healthy endpoints + rpcCount, apiCount := client.GetHealthyEndpointsCount() + t.Logf("Healthy endpoints: %d RPCs, %d APIs", rpcCount, apiCount) + + if rpcCount == 0 && apiCount == 0 { + t.Skip("No healthy endpoints available for testing") + } + + // Test multiple RPC calls to see load balancing + if rpcCount > 0 { + t.Log("Testing RPC load balancing...") + for i := 0; i < 4; i++ { + rpcClient, endpoint, err := client.GetNextRPCClient() + if err != nil { + t.Errorf("Failed to get RPC client on attempt %d: %v", i+1, err) + continue + } + t.Logf("Attempt %d - Got RPC client for endpoint: %s", i+1, endpoint) + + // Test actual RPC call + resp, err := rpcClient.R().Get("/status") + if err != nil { + t.Logf("RPC call failed for %s: %v", endpoint, err) + } else { + t.Logf("RPC call succeeded for %s (status: %d)", endpoint, resp.StatusCode()) + } + } + } + + // Test multiple API calls to see load balancing + if apiCount > 0 { + t.Log("Testing API load balancing...") + for i := 0; i < 4; i++ { + apiClient, endpoint, err := client.GetNextAPIClient() + if err != nil { + t.Errorf("Failed to get API client on attempt %d: %v", i+1, err) + continue + } + t.Logf("Attempt %d - Got API client for endpoint: %s", i+1, endpoint) + + // Test actual API call + resp, err := apiClient.R().Get("/cosmos/base/tendermint/v1beta1/node_info") + if err != nil { + t.Logf("API call failed for %s: %v", endpoint, err) + } else { + t.Logf("API call succeeded for %s (status: %d)", endpoint, resp.StatusCode()) + } + } + } + + // Test health check updates + t.Log("Testing health check updates...") + time.Sleep(1 * time.Second) + + // Force health check update + client.updateHealthyEndpoints() + + newRpcCount, newApiCount := client.GetHealthyEndpointsCount() + t.Logf("After manual health check: %d RPCs, %d APIs", newRpcCount, newApiCount) + + // Test random endpoint selection as well + if rpcCount > 1 { + t.Log("Testing random RPC selection...") + endpoints := make(map[string]int) + for i := 0; i < 6; i++ { + _, endpoint, err := client.GetRandomRPCClient() + if err != nil { + t.Errorf("Failed to get random RPC client: %v", err) + continue + } + endpoints[endpoint]++ + } + t.Logf("Random RPC distribution: %v", endpoints) + } +} diff --git a/internal/common/types.go b/internal/common/types.go index 669a7ba..3b3dc58 100644 --- a/internal/common/types.go +++ b/internal/common/types.go @@ -73,6 +73,8 @@ type CommonApp struct { EndPoint string // optional client OptionalClient CommonClient + // parallel client for load balancing multiple endpoints + ParallelClient *ParallelRPCClient } func NewCommonApp(p Packager) CommonApp { @@ -100,10 +102,19 @@ func NewCommonApp(p Packager) CommonApp { logger.FieldKeyPackage: p.Package, }) commonClient := CommonClient{rpcClient, apiClient, grpcClient, entry} + + // Initialize parallel client if multiple endpoints are available + var parallelClient *ParallelRPCClient + if len(p.RPCs) > 1 || len(p.APIs) > 1 { + parallelClient = NewParallelRPCClient(p.RPCs, p.APIs, p.ProtocolType, entry) + entry.Infof("Parallel RPC client initialized with %d RPCs and %d APIs", len(p.RPCs), len(p.APIs)) + } + return CommonApp{ commonClient, "", CommonClient{}, + parallelClient, } } @@ -146,3 +157,33 @@ func NewOptionalClient(entry *logrus.Entry) CommonClient { SetLogger(restyLogger) return CommonClient{rpcClient, apiClient, nil, entry} } + +// GetNextRPCClient returns the next healthy RPC client using parallel client if available +func (app *CommonApp) GetNextRPCClient() (*resty.Client, string, error) { + if app.ParallelClient != nil { + return app.ParallelClient.GetNextRPCClient() + } + // Fallback to regular client + return app.RPCClient, app.GetRPCEndPoint(), nil +} + +// GetNextAPIClient returns the next healthy API client using parallel client if available +func (app *CommonApp) GetNextAPIClient() (*resty.Client, string, error) { + if app.ParallelClient != nil { + return app.ParallelClient.GetNextAPIClient() + } + // Fallback to regular client + return app.APIClient, app.GetAPIEndPoint(), nil +} + +// UseParallelClient returns true if parallel client is available and has healthy endpoints +func (app *CommonApp) UseParallelClient() bool { + return app.ParallelClient != nil && app.ParallelClient.HasHealthyEndpoints() +} + +// StopParallelClient stops the parallel client health checking +func (app *CommonApp) StopParallelClient() { + if app.ParallelClient != nil { + app.ParallelClient.Stop() + } +} diff --git a/internal/packages/consensus/voteindexer/indexer/indexer.go b/internal/packages/consensus/voteindexer/indexer/indexer.go index 6117fd1..13f6c16 100644 --- a/internal/packages/consensus/voteindexer/indexer/indexer.go +++ b/internal/packages/consensus/voteindexer/indexer/indexer.go @@ -114,29 +114,45 @@ func (vidx *VoteIndexer) Start() error { func (vidx *VoteIndexer) Loop(indexPoint int64) { isUnhealth := false for { - // node health check - if isUnhealth { - healthAPIs := healthcheck.FilterHealthEndpoints(vidx.APIs, vidx.ProtocolType) - for _, api := range healthAPIs { - vidx.SetAPIEndPoint(api) - vidx.Warnf("API endpoint will be changed with health endpoint for this package: %s", api) - isUnhealth = false - break - } - - healthRPCs := healthcheck.FilterHealthRPCEndpoints(vidx.RPCs, vidx.ProtocolType) - for _, rpc := range healthRPCs { - vidx.SetRPCEndPoint(rpc) - vidx.Warnf("RPC endpoint will be changed with health endpoint for this package: %s", rpc) - isUnhealth = false - break - } - - if len(healthAPIs) == 0 || len(healthRPCs) == 0 { + // Check if we should use parallel client or traditional health check + if vidx.UseParallelClient() { + // Parallel client handles health checking automatically + if !vidx.ParallelClient.HasHealthyEndpoints() { isUnhealth = true - vidx.Errorln("failed to get any health endpoints from healthcheck filter, retry sleep 10s") + vidx.Errorln("parallel client reports no healthy endpoints available, retry sleep 10s") time.Sleep(indexertypes.UnHealthSleep) continue + } else { + isUnhealth = false + // Log current healthy endpoints count + rpcCount, apiCount := vidx.ParallelClient.GetHealthyEndpointsCount() + vidx.Debugf("Parallel client: %d healthy RPCs, %d healthy APIs", rpcCount, apiCount) + } + } else { + // Traditional sequential health check fallback + if isUnhealth { + healthAPIs := healthcheck.FilterHealthEndpoints(vidx.APIs, vidx.ProtocolType) + for _, api := range healthAPIs { + vidx.SetAPIEndPoint(api) + vidx.Warnf("API endpoint will be changed with health endpoint for this package: %s", api) + isUnhealth = false + break + } + + healthRPCs := healthcheck.FilterHealthRPCEndpoints(vidx.RPCs, vidx.ProtocolType) + for _, rpc := range healthRPCs { + vidx.SetRPCEndPoint(rpc) + vidx.Warnf("RPC endpoint will be changed with health endpoint for this package: %s", rpc) + isUnhealth = false + break + } + + if len(healthAPIs) == 0 || len(healthRPCs) == 0 { + isUnhealth = true + vidx.Errorln("failed to get any health endpoints from healthcheck filter, retry sleep 10s") + time.Sleep(indexertypes.UnHealthSleep) + continue + } } }