Skip to content

Commit

Permalink
[chain] Block Operation Metrics + Fix Fatal Usage (#355)
Browse files Browse the repository at this point in the history
* metrics PoC

* update cli

* add wait metric

* fix fatal usage

* use hard fail fatal
  • Loading branch information
patrick-ogrady authored Aug 15, 2023
1 parent 95942cc commit 22290ee
Show file tree
Hide file tree
Showing 7 changed files with 208 additions and 60 deletions.
15 changes: 15 additions & 0 deletions chain/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,11 @@ func (b *StatelessBlock) ShouldVerifyWithContext(context.Context) (bool, error)

// implements "block.WithVerifyContext"
func (b *StatelessBlock) VerifyWithContext(ctx context.Context, bctx *block.Context) error {
start := time.Now()
defer func() {
b.vm.RecordBlockVerify(time.Since(start))
}()

stateReady := b.vm.StateReady()
ctx, span := b.vm.Tracer().Start(
ctx, "StatelessBlock.VerifyWithContext",
Expand All @@ -312,6 +317,11 @@ func (b *StatelessBlock) VerifyWithContext(ctx context.Context, bctx *block.Cont

// implements "snowman.Block"
func (b *StatelessBlock) Verify(ctx context.Context) error {
start := time.Now()
defer func() {
b.vm.RecordBlockVerify(time.Since(start))
}()

stateReady := b.vm.StateReady()
ctx, span := b.vm.Tracer().Start(
ctx, "StatelessBlock.Verify",
Expand Down Expand Up @@ -595,6 +605,11 @@ func (b *StatelessBlock) innerVerify(ctx context.Context) (merkledb.TrieView, er

// implements "snowman.Block.choices.Decidable"
func (b *StatelessBlock) Accept(ctx context.Context) error {
start := time.Now()
defer func() {
b.vm.RecordBlockAccept(time.Since(start))
}()

ctx, span := b.vm.Tracer().Start(ctx, "StatelessBlock.Accept")
defer span.End()

Expand Down
1 change: 1 addition & 0 deletions chain/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func BuildBlock(
txs := mempool.Stream(ctx, streamBatch)
prepareStreamLock.Unlock()
if len(txs) == 0 {
b.vm.RecordClearedMempool()
break
}

Expand Down
3 changes: 3 additions & 0 deletions chain/dependencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,13 @@ type VM interface {
// TODO: break out into own interface
RecordRootCalculated(time.Duration) // only called in Verify
RecordWaitSignatures(time.Duration) // only called in Verify
RecordBlockVerify(time.Duration)
RecordBlockAccept(time.Duration)
RecordStateChanges(int)
RecordStateOperations(int)
RecordBuildCapped()
RecordEmptyBlockBuilt()
RecordClearedMempool()
}

type Mempool interface {
Expand Down
18 changes: 18 additions & 0 deletions examples/morpheusvm/cmd/morpheus-cli/cmd/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ var generatePrometheusCmd = &cobra.Command{
panels = append(panels, fmt.Sprintf("increase(avalanche_%s_vm_hypersdk_chain_wait_signatures_sum[5s])/1000000/5", chainID))
utils.Outf("{{yellow}}signature verification wait (ms/s):{{/}} %s\n", panels[len(panels)-1])

panels = append(panels, fmt.Sprintf("increase(avalanche_%s_vm_hypersdk_chain_cleared_mempool[5s])/5", chainID))
utils.Outf("{{yellow}}cleared mempool per second:{{/}} %s\n", panels[len(panels)-1])

panels = append(panels, fmt.Sprintf("avalanche_%s_vm_hypersdk_chain_mempool_size", chainID))
utils.Outf("{{yellow}}mempool size:{{/}} %s\n", panels[len(panels)-1])

Expand Down Expand Up @@ -104,6 +107,21 @@ var generatePrometheusCmd = &cobra.Command{
panels = append(panels, cli.OutboundFailed)
utils.Outf("{{yellow}}outbound failed:{{/}} %s\n", panels[len(panels)-1])

panels = append(panels, fmt.Sprintf("increase(avalanche_%s_vm_hypersdk_chain_block_build_sum[5s])/1000000/5", chainID))
utils.Outf("{{yellow}}block build (ms/s):{{/}} %s\n", panels[len(panels)-1])

panels = append(panels, fmt.Sprintf("increase(avalanche_%s_vm_hypersdk_chain_block_parse_sum[5s])/1000000/5", chainID))
utils.Outf("{{yellow}}block parse (ms/s):{{/}} %s\n", panels[len(panels)-1])

panels = append(panels, fmt.Sprintf("increase(avalanche_%s_vm_hypersdk_chain_block_verify_sum[5s])/1000000/5", chainID))
utils.Outf("{{yellow}}block verify (ms/s):{{/}} %s\n", panels[len(panels)-1])

panels = append(panels, fmt.Sprintf("increase(avalanche_%s_vm_hypersdk_chain_block_accept_sum[5s])/1000000/5", chainID))
utils.Outf("{{yellow}}block accept (ms/s):{{/}} %s\n", panels[len(panels)-1])

panels = append(panels, fmt.Sprintf("increase(avalanche_%s_vm_hypersdk_chain_block_process_sum[5s])/1000000/5", chainID))
utils.Outf("{{yellow}}block process [async] (ms/s):{{/}} %s\n", panels[len(panels)-1])

return panels
})
},
Expand Down
62 changes: 62 additions & 0 deletions vm/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,15 @@ type Metrics struct {
stateOperations prometheus.Counter
buildCapped prometheus.Counter
emptyBlockBuilt prometheus.Counter
clearedMempool prometheus.Counter
mempoolSize prometheus.Gauge
rootCalculated metric.Averager
waitSignatures metric.Averager
blockBuild metric.Averager
blockParse metric.Averager
blockVerify metric.Averager
blockAccept metric.Averager
blockProcess metric.Averager
}

func newMetrics() (*prometheus.Registry, *Metrics, error) {
Expand All @@ -47,6 +53,51 @@ func newMetrics() (*prometheus.Registry, *Metrics, error) {
if err != nil {
return nil, nil, err
}
blockBuild, err := metric.NewAverager(
"chain",
"block_build",
"time spent building blocks",
r,
)
if err != nil {
return nil, nil, err
}
blockParse, err := metric.NewAverager(
"chain",
"block_parse",
"time spent parsing blocks",
r,
)
if err != nil {
return nil, nil, err
}
blockVerify, err := metric.NewAverager(
"chain",
"block_verify",
"time spent verifying blocks",
r,
)
if err != nil {
return nil, nil, err
}
blockAccept, err := metric.NewAverager(
"chain",
"block_accept",
"time spent accepting blocks",
r,
)
if err != nil {
return nil, nil, err
}
blockProcess, err := metric.NewAverager(
"chain",
"block_process",
"time spent processing blocks",
r,
)
if err != nil {
return nil, nil, err
}

m := &Metrics{
unitsVerified: prometheus.NewCounter(prometheus.CounterOpts{
Expand Down Expand Up @@ -104,13 +155,23 @@ func newMetrics() (*prometheus.Registry, *Metrics, error) {
Name: "empty_block_built",
Help: "number of times empty block built",
}),
clearedMempool: prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "chain",
Name: "cleared_mempool",
Help: "number of times cleared mempool while building",
}),
mempoolSize: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "chain",
Name: "mempool_size",
Help: "number of transactions in the mempool",
}),
rootCalculated: rootCalculated,
waitSignatures: waitSignatures,
blockBuild: blockBuild,
blockParse: blockParse,
blockVerify: blockVerify,
blockAccept: blockAccept,
blockProcess: blockProcess,
}
errs := wrappers.Errs{}
errs.Add(
Expand All @@ -126,6 +187,7 @@ func newMetrics() (*prometheus.Registry, *Metrics, error) {
r.Register(m.mempoolSize),
r.Register(m.buildCapped),
r.Register(m.emptyBlockBuilt),
r.Register(m.clearedMempool),
)
return r, m, errs.Err
}
144 changes: 84 additions & 60 deletions vm/resolutions.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,86 +128,98 @@ func (vm *VM) Rejected(ctx context.Context, b *chain.StatelessBlock) {
vm.verifiedL.Unlock()
vm.mempool.Add(ctx, b.Txs)

// TODO: handle async?
if err := vm.c.Rejected(ctx, b); err != nil {
vm.snowCtx.Log.Fatal("rejected processing failed", zap.Error(err))
vm.Fatal("rejected processing failed", zap.Error(err))
}

// Ensure children of block are cleared, they may never be
// verified
vm.snowCtx.Log.Info("rejected block", zap.Stringer("id", b.ID()))
}

func (vm *VM) processAcceptedBlocks() {
// The VM closes [acceptedQueue] during shutdown. We wait for all enqueued blocks
// to be processed before returning as a guarantee to listeners (which may
// persist indexed state) instead of just exiting as soon as `vm.stop` is
// closed.
for b := range vm.acceptedQueue {
// We skip blocks that were not processed because metadata required to
// process blocks opaquely (like looking at results) is not populated.
//
// We don't need to worry about dangling messages in listeners because we
// don't allow subscription until the node is healthy.
if !b.Processed() {
vm.snowCtx.Log.Info("skipping unprocessed block", zap.Uint64("height", b.Hght))
continue
}
func (vm *VM) processAcceptedBlock(b *chain.StatelessBlock) {
start := time.Now()
defer func() {
vm.metrics.blockProcess.Observe(float64(time.Since(start)))
}()

// Update controller
if err := vm.c.Accepted(context.TODO(), b); err != nil {
vm.snowCtx.Log.Fatal("accepted processing failed", zap.Error(err))
}
// We skip blocks that were not processed because metadata required to
// process blocks opaquely (like looking at results) is not populated.
//
// We don't need to worry about dangling messages in listeners because we
// don't allow subscription until the node is healthy.
if !b.Processed() {
vm.snowCtx.Log.Info("skipping unprocessed block", zap.Uint64("height", b.Hght))
return
}

// Sign and store any warp messages (regardless if validator now, may become one)
results := b.Results()
for i, tx := range b.Txs {
// Only cache auth for accepted blocks to prevent cache manipulation from RPC submissions
vm.cacheAuth(tx.Auth)
// Update controller
if err := vm.c.Accepted(context.TODO(), b); err != nil {
vm.Fatal("accepted processing failed", zap.Error(err))
}

result := results[i]
if result.WarpMessage == nil {
continue
}
start := time.Now()
signature, err := vm.snowCtx.WarpSigner.Sign(result.WarpMessage)
if err != nil {
vm.snowCtx.Log.Fatal("unable to sign warp message", zap.Error(err))
}
if err := vm.StoreWarpSignature(tx.ID(), vm.snowCtx.PublicKey, signature); err != nil {
vm.snowCtx.Log.Fatal("unable to store warp signature", zap.Error(err))
}
vm.snowCtx.Log.Info(
"signed and stored warp message signature",
zap.Stringer("txID", tx.ID()),
zap.Duration("t", time.Since(start)),
)

// Kickoff job to fetch signatures from other validators in the
// background
//
// We pass bytes here so that signatures returned from validators can be
// verified before they are persisted.
vm.warpManager.GatherSignatures(context.TODO(), tx.ID(), result.WarpMessage.Bytes())
}
// Sign and store any warp messages (regardless if validator now, may become one)
results := b.Results()
for i, tx := range b.Txs {
// Only cache auth for accepted blocks to prevent cache manipulation from RPC submissions
vm.cacheAuth(tx.Auth)

// Update server
if err := vm.webSocketServer.AcceptBlock(b); err != nil {
vm.snowCtx.Log.Fatal("unable to accept block in websocket server", zap.Error(err))
result := results[i]
if result.WarpMessage == nil {
continue
}
start := time.Now()
signature, err := vm.snowCtx.WarpSigner.Sign(result.WarpMessage)
if err != nil {
vm.Fatal("unable to sign warp message", zap.Error(err))
}
// Must clear accepted txs before [SetMinTx] or else we will errnoueously
// send [ErrExpired] messages.
if err := vm.webSocketServer.SetMinTx(b.Tmstmp); err != nil {
vm.snowCtx.Log.Fatal("unable to set min tx in websocket server", zap.Error(err))
if err := vm.StoreWarpSignature(tx.ID(), vm.snowCtx.PublicKey, signature); err != nil {
vm.Fatal("unable to store warp signature", zap.Error(err))
}
vm.snowCtx.Log.Info(
"signed and stored warp message signature",
zap.Stringer("txID", tx.ID()),
zap.Duration("t", time.Since(start)),
)

// Kickoff job to fetch signatures from other validators in the
// background
//
// We pass bytes here so that signatures returned from validators can be
// verified before they are persisted.
vm.warpManager.GatherSignatures(context.TODO(), tx.ID(), result.WarpMessage.Bytes())
}

// Update server
if err := vm.webSocketServer.AcceptBlock(b); err != nil {
vm.Fatal("unable to accept block in websocket server", zap.Error(err))
}
// Must clear accepted txs before [SetMinTx] or else we will errnoueously
// send [ErrExpired] messages.
if err := vm.webSocketServer.SetMinTx(b.Tmstmp); err != nil {
vm.Fatal("unable to set min tx in websocket server", zap.Error(err))
}
}

func (vm *VM) processAcceptedBlocks() {
// Always close [acceptorDone] or we may block shutdown.
defer func() {
close(vm.acceptorDone)
vm.snowCtx.Log.Info("acceptor queue shutdown")
}()

// The VM closes [acceptedQueue] during shutdown. We wait for all enqueued blocks
// to be processed before returning as a guarantee to listeners (which may
// persist indexed state) instead of just exiting as soon as `vm.stop` is
// closed.
for b := range vm.acceptedQueue {
vm.processAcceptedBlock(b)
vm.snowCtx.Log.Info(
"block processed",
zap.Stringer("blkID", b.ID()),
zap.Uint64("height", b.Hght),
)
}
close(vm.acceptorDone)
vm.snowCtx.Log.Info("acceptor queue shutdown")
}

func (vm *VM) Accepted(ctx context.Context, b *chain.StatelessBlock) {
Expand Down Expand Up @@ -408,3 +420,15 @@ func (vm *VM) cacheAuth(auth chain.Auth) {
}
bv.Cache(auth)
}

func (vm *VM) RecordBlockVerify(t time.Duration) {
vm.metrics.blockVerify.Observe(float64(t))
}

func (vm *VM) RecordBlockAccept(t time.Duration) {
vm.metrics.blockAccept.Observe(float64(t))
}

func (vm *VM) RecordClearedMempool() {
vm.metrics.clearedMempool.Inc()
}
Loading

0 comments on commit 22290ee

Please sign in to comment.