Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,6 @@ func processEventMessage(ctx context.Context, queues *v2queue.Queues, event Gene
return queues.ActiveStakingQueueClient.SendMessage(ctx, messageBody)
case queueClient.UnbondingStakingEventType:
return queues.UnbondingStakingQueueClient.SendMessage(ctx, messageBody)
case queueClient.WithdrawableStakingEventType:
return queues.WithdrawableStakingQueueClient.SendMessage(ctx, messageBody)
case queueClient.WithdrawnStakingEventType:
return queues.WithdrawnStakingQueueClient.SendMessage(ctx, messageBody)
default:
return fmt.Errorf("unknown event type: %v", event.EventType)
}
Expand Down
4 changes: 2 additions & 2 deletions config/config-local.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ address_screening:
enabled: true
external_apis:
coinmarketcap:
api_key: "coinmarketcap_api_key"
api_key: "9d2f6ffe-2fc0-4208-91b6-453701b558b1"
base_url: "https://pro-api.coinmarketcap.com/v1"
timeout: 10s # http client timeout
cache_ttl: 300s # mongodb ttl
chainalysis:
api_key: "chainalysis_api_key"
base_url: "https://api.chainalysis.com"
bbn:
rpc-addr: https://rpc-dapp.devnet.babylonlabs.io:443
rpc-addr: https://rpc.canon-devnet.babylonlabs.io:443/
timeout: 30s
maxretrytimes: 5
retryinterval: 500ms
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/babylonlabs-io/babylon-staking-indexer v1.0.5
github.com/babylonlabs-io/babylon/v4 v4.0.0-rc.0
github.com/babylonlabs-io/networks/parameters v0.2.2
github.com/babylonlabs-io/staking-queue-client v1.0.0
github.com/babylonlabs-io/staking-queue-client v1.1.0
github.com/btcsuite/btcd v0.24.3-0.20241011125836-24eb815168f4
github.com/btcsuite/btcd/btcec/v2 v2.3.4
github.com/btcsuite/btcd/btcutil v1.1.6
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ github.com/babylonlabs-io/babylon/v4 v4.0.0-rc.0 h1:bfR0DN1rLJqblpFA2hppBcf2/64X
github.com/babylonlabs-io/babylon/v4 v4.0.0-rc.0/go.mod h1:HfAbIoL3MnSe1w+HefFISiVjGks4Yfhy07fw/Fr/l7o=
github.com/babylonlabs-io/networks/parameters v0.2.2 h1:TCu39fZvjX5f6ZZrjhYe54M6wWxglNewuKu56yE+zrc=
github.com/babylonlabs-io/networks/parameters v0.2.2/go.mod h1:iEJVOzaLsE33vpP7J4u+CRGfkSIfErUAwRmgCFCBpyI=
github.com/babylonlabs-io/staking-queue-client v1.0.0 h1:mXAxZLJX/NbHW0R/pP3DXu75cIvl8B8ZIVeEdgao6w8=
github.com/babylonlabs-io/staking-queue-client v1.0.0/go.mod h1:n3fr3c+9LNiJlyETmcrVk94Zn76rAADhGZKxX+rVf+Q=
github.com/babylonlabs-io/staking-queue-client v1.1.0 h1:AUzYn28HDgXPxBffn3WVprb2cu9tLJ+rPO9mfDcQp9o=
github.com/babylonlabs-io/staking-queue-client v1.1.0/go.mod h1:n3fr3c+9LNiJlyETmcrVk94Zn76rAADhGZKxX+rVf+Q=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
Expand Down
51 changes: 0 additions & 51 deletions internal/v2/queue/handler/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,54 +78,3 @@ func (h *V2QueueHandler) UnbondingStakingHandler(ctx context.Context, messageBod
}
return nil
}

// WithdrawableStakingHandler processes withdrawable staking events
func (h *V2QueueHandler) WithdrawableStakingHandler(ctx context.Context, messageBody string) *types.Error {
var withdrawableStakingEvent queueClient.StakingEvent
err := json.Unmarshal([]byte(messageBody), &withdrawableStakingEvent)
if err != nil {
log.Ctx(ctx).Error().Err(err).Msg("Failed to unmarshal the message body into WithdrawableStakingEvent")
return types.NewError(http.StatusBadRequest, types.BadRequest, err)
}

// TODO: Perform the address lookup conversion
// https://github.com/babylonlabs-io/staking-api-service/issues/162

statsErr := h.Services.V2Service.ProcessWithdrawableDelegationStats(
ctx,
withdrawableStakingEvent.StakingTxHashHex,
withdrawableStakingEvent.StakerBtcPkHex,
withdrawableStakingEvent.StakingAmount,
withdrawableStakingEvent.StateHistory,
)
if statsErr != nil {
log.Ctx(ctx).Error().Err(statsErr).Msg("Failed to process staking stats calculation")
return statsErr
}

return nil
}

// WithdrawnStakingHandler processes withdrawn staking events
func (h *V2QueueHandler) WithdrawnStakingHandler(ctx context.Context, messageBody string) *types.Error {
var withdrawnStakingEvent queueClient.StakingEvent
err := json.Unmarshal([]byte(messageBody), &withdrawnStakingEvent)
if err != nil {
log.Ctx(ctx).Error().Err(err).Msg("Failed to unmarshal the message body into WithdrawnStakingEvent")
return types.NewError(http.StatusBadRequest, types.BadRequest, err)
}

statsErr := h.Services.V2Service.ProcessWithdrawnDelegationStats(
ctx,
withdrawnStakingEvent.StakingTxHashHex,
withdrawnStakingEvent.StakerBtcPkHex,
withdrawnStakingEvent.StakingAmount,
withdrawnStakingEvent.StateHistory,
)
if statsErr != nil {
log.Ctx(ctx).Error().Err(statsErr).Msg("Failed to process staking stats calculation")
return statsErr
}

return nil
}
60 changes: 10 additions & 50 deletions internal/v2/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@ import (
)

type Queues struct {
Handlers *v2queuehandler.V2QueueHandler
processingTimeout time.Duration
maxRetryAttempts int32
ActiveStakingQueueClient client.QueueClient
UnbondingStakingQueueClient client.QueueClient
WithdrawableStakingQueueClient client.QueueClient
WithdrawnStakingQueueClient client.QueueClient
Handlers *v2queuehandler.V2QueueHandler
processingTimeout time.Duration
maxRetryAttempts int32
ActiveStakingQueueClient client.QueueClient
UnbondingStakingQueueClient client.QueueClient
}

func New(cfg *queueConfig.QueueConfig, service *services.Services) (*Queues, error) {
Expand All @@ -42,29 +40,13 @@ func New(cfg *queueConfig.QueueConfig, service *services.Services) (*Queues, err
return nil, fmt.Errorf("error while creating UnbondingStakingQueueClient: %w", err)
}

withdrawableStakingQueueClient, err := client.NewQueueClient(
cfg, client.WithdrawableStakingQueueName,
)
if err != nil {
return nil, fmt.Errorf("error while creating WithdrawableStakingQueueClient: %w", err)
}

withdrawnStakingQueueClient, err := client.NewQueueClient(
cfg, client.WithdrawnStakingQueueName,
)
if err != nil {
return nil, fmt.Errorf("error while creating WithdrawnStakingQueueClient: %w", err)
}

handlers := v2queuehandler.NewV2QueueHandler(service)
return &Queues{
Handlers: handlers,
processingTimeout: cfg.QueueProcessingTimeout,
maxRetryAttempts: cfg.MsgMaxRetryAttempts,
ActiveStakingQueueClient: activeStakingQueueClient,
UnbondingStakingQueueClient: unbondingStakingQueueClient,
WithdrawableStakingQueueClient: withdrawableStakingQueueClient,
WithdrawnStakingQueueClient: withdrawnStakingQueueClient,
Handlers: handlers,
processingTimeout: cfg.QueueProcessingTimeout,
maxRetryAttempts: cfg.MsgMaxRetryAttempts,
ActiveStakingQueueClient: activeStakingQueueClient,
UnbondingStakingQueueClient: unbondingStakingQueueClient,
}, nil
}

Expand All @@ -84,14 +66,6 @@ func (q *Queues) StartReceivingMessages() error {
q.UnbondingStakingQueueClient,
q.Handlers.UnbondingStakingHandler, q.Handlers.HandleUnprocessedMessage,
},
{
q.WithdrawableStakingQueueClient,
q.Handlers.WithdrawableStakingHandler, q.Handlers.HandleUnprocessedMessage,
},
{
q.WithdrawnStakingQueueClient,
q.Handlers.WithdrawnStakingHandler, q.Handlers.HandleUnprocessedMessage,
},
// ...add more queues here
}

Expand Down Expand Up @@ -123,18 +97,6 @@ func (q *Queues) StopReceivingMessages() {
Str("queueName", q.UnbondingStakingQueueClient.GetQueueName()).
Msg("error while stopping queue")
}
withdrawableQueueErr := q.WithdrawableStakingQueueClient.Stop()
if withdrawableQueueErr != nil {
log.Error().Err(withdrawableQueueErr).
Str("queueName", q.WithdrawableStakingQueueClient.GetQueueName()).
Msg("error while stopping queue")
}
withdrawnQueueErr := q.WithdrawnStakingQueueClient.Stop()
if withdrawnQueueErr != nil {
log.Error().Err(withdrawnQueueErr).
Str("queueName", q.WithdrawnStakingQueueClient.GetQueueName()).
Msg("error while stopping queue")
}
// ...add more queues here
}

Expand Down Expand Up @@ -252,8 +214,6 @@ func (q *Queues) IsConnectionHealthy() error {

checkQueue("ActiveStakingQueueClient", q.ActiveStakingQueueClient)
checkQueue("UnbondingStakingQueueClient", q.UnbondingStakingQueueClient)
checkQueue("WithdrawableStakingQueueClient", q.WithdrawableStakingQueueClient)
checkQueue("WithdrawnStakingQueueClient", q.WithdrawnStakingQueueClient)

if len(errorMessages) > 0 {
return fmt.Errorf("queue health check failed: %s", strings.Join(errorMessages, "; "))
Expand Down
Loading