Skip to content

Commit

Permalink
Merge pull request #6044 from onflow/janez/change-metrics-collection-…
Browse files Browse the repository at this point in the history
…in-computer

Change metrics collection in computer
  • Loading branch information
janezpodhostnik authored Jun 12, 2024
2 parents 09b0870 + 9e5913c commit da4f52c
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 111 deletions.
23 changes: 10 additions & 13 deletions engine/execution/computation/computer/computer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/onflow/flow-go/ledger/common/pathfinder"
"github.com/onflow/flow-go/ledger/complete"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/epochs"
"github.com/onflow/flow-go/module/executiondatasync/execution_data"
"github.com/onflow/flow-go/module/executiondatasync/provider"
Expand Down Expand Up @@ -155,13 +156,10 @@ func TestBlockExecutor_ExecuteBlock(t *testing.T) {
Times(2) // 1 collection + system collection

exemetrics.On("ExecutionTransactionExecuted",
mock.Anything, // duration
mock.Anything, // conflict retry count
mock.Anything, // computation used
mock.Anything, // memory used
mock.Anything, // number of events
mock.Anything, // size of events
false). // no failure
mock.Anything,
mock.MatchedBy(func(arg module.TransactionExecutionResultStats) bool {
return !arg.Failed // only successful transactions
})).
Return(nil).
Times(2 + 1) // 2 txs in collection + system chunk tx

Expand Down Expand Up @@ -1267,12 +1265,11 @@ func Test_ExecutingSystemCollection(t *testing.T) {

metrics.On("ExecutionTransactionExecuted",
mock.Anything, // duration
mock.Anything, // conflict retry count
mock.Anything, // computation used
mock.Anything, // memory used
expectedNumberOfEvents,
expectedEventSize,
false).
mock.MatchedBy(func(arg module.TransactionExecutionResultStats) bool {
return arg.EventCounts == expectedNumberOfEvents &&
arg.EventSize == expectedEventSize &&
!arg.Failed
})).
Return(nil).
Times(1) // system chunk tx

Expand Down
69 changes: 41 additions & 28 deletions engine/execution/computation/computer/result_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,12 @@ type resultCollector struct {
spockSignatures []crypto.Signature

blockStartTime time.Time
blockStats module.ExecutionResultStats
blockStats module.BlockExecutionResultStats
blockMeter *meter.Meter

currentCollectionStartTime time.Time
currentCollectionState *state.ExecutionState
currentCollectionStats module.ExecutionResultStats
currentCollectionStats module.CollectionExecutionResultStats
currentCollectionStorageSnapshot execution.ExtendableStorageSnapshot
}

Expand Down Expand Up @@ -123,9 +123,7 @@ func newResultCollector(
blockMeter: meter.NewMeter(meter.DefaultParameters()),
currentCollectionStartTime: now,
currentCollectionState: state.NewExecutionState(nil, state.DefaultParameters()),
currentCollectionStats: module.ExecutionResultStats{
NumberOfCollections: 1,
},
currentCollectionStats: module.CollectionExecutionResultStats{},
currentCollectionStorageSnapshot: storehouse.NewExecutingBlockSnapshot(
previousBlockSnapshot,
*block.StartState,
Expand Down Expand Up @@ -201,27 +199,16 @@ func (collector *resultCollector) commitCollection(

collector.spockSignatures = append(collector.spockSignatures, spock)

collector.currentCollectionStats.EventCounts = len(events)
collector.currentCollectionStats.EventSize = events.ByteSize()
collector.currentCollectionStats.NumberOfRegistersTouched = len(
collectionExecutionSnapshot.AllRegisterIDs())
for _, entry := range collectionExecutionSnapshot.UpdatedRegisters() {
collector.currentCollectionStats.NumberOfBytesWrittenToRegisters += len(
entry.Value)
}

collector.metrics.ExecutionCollectionExecuted(
time.Since(startTime),
collector.currentCollectionStats)

collector.blockStats.Merge(collector.currentCollectionStats)
collector.blockStats.Add(collector.currentCollectionStats)
collector.blockMeter.MergeMeter(collectionExecutionSnapshot.Meter)

collector.currentCollectionStartTime = time.Now()
collector.currentCollectionState = state.NewExecutionState(nil, state.DefaultParameters())
collector.currentCollectionStats = module.ExecutionResultStats{
NumberOfCollections: 1,
}
collector.currentCollectionStats = module.CollectionExecutionResultStats{}

for _, consumer := range collector.consumers {
err = consumer.OnExecutedCollection(collector.result.CollectionExecutionResultAt(collection.collectionIndex))
Expand Down Expand Up @@ -269,14 +256,12 @@ func (collector *resultCollector) processTransactionResult(
logger.Info().Msg("transaction executed successfully")
}

collector.metrics.ExecutionTransactionExecuted(
collector.handleTransactionExecutionMetrics(
timeSpent,
output,
txnExecutionSnapshot,
txn,
numConflictRetries,
output.ComputationUsed,
output.MemoryEstimate,
len(output.Events),
flow.EventsList(output.Events).ByteSize(),
output.Err != nil,
)

txnResult := flow.TransactionResult{
Expand All @@ -302,10 +287,6 @@ func (collector *resultCollector) processTransactionResult(
return fmt.Errorf("failed to merge into collection view: %w", err)
}

collector.currentCollectionStats.ComputationUsed += output.ComputationUsed
collector.currentCollectionStats.MemoryUsed += output.MemoryEstimate
collector.currentCollectionStats.NumberOfTransactions += 1

if !txn.lastTransactionInCollection {
return nil
}
Expand All @@ -316,6 +297,38 @@ func (collector *resultCollector) processTransactionResult(
collector.currentCollectionState.Finalize())
}

func (collector *resultCollector) handleTransactionExecutionMetrics(
timeSpent time.Duration,
output fvm.ProcedureOutput,
txnExecutionSnapshot *snapshot.ExecutionSnapshot,
txn TransactionRequest,
numConflictRetries int,
) {
transactionExecutionStats := module.TransactionExecutionResultStats{
ExecutionResultStats: module.ExecutionResultStats{
ComputationUsed: output.ComputationUsed,
MemoryUsed: output.MemoryEstimate,
EventCounts: len(output.Events),
EventSize: output.Events.ByteSize(),
NumberOfRegistersTouched: len(txnExecutionSnapshot.AllRegisterIDs()),
},
ComputationIntensities: output.ComputationIntensities,
NumberOfTxnConflictRetries: numConflictRetries,
Failed: output.Err != nil,
SystemTransaction: txn.isSystemTransaction,
}
for _, entry := range txnExecutionSnapshot.UpdatedRegisters() {
transactionExecutionStats.NumberOfBytesWrittenToRegisters += len(entry.Value)
}

collector.metrics.ExecutionTransactionExecuted(
timeSpent,
transactionExecutionStats,
)

collector.currentCollectionStats.Add(transactionExecutionStats)
}

func (collector *resultCollector) AddTransactionResult(
request TransactionRequest,
snapshot *snapshot.ExecutionSnapshot,
Expand Down
45 changes: 32 additions & 13 deletions module/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
httpmetrics "github.com/slok/go-http-metrics/metrics"

"github.com/onflow/flow-go/fvm/meter"
"github.com/onflow/flow-go/model/chainsync"
"github.com/onflow/flow-go/model/cluster"
"github.com/onflow/flow-go/model/flow"
Expand Down Expand Up @@ -897,8 +898,24 @@ type ExecutionResultStats struct {
EventSize int
NumberOfRegistersTouched int
NumberOfBytesWrittenToRegisters int
NumberOfCollections int
NumberOfTransactions int
}

type BlockExecutionResultStats struct {
CollectionExecutionResultStats
NumberOfCollections int
}

type CollectionExecutionResultStats struct {
ExecutionResultStats
NumberOfTransactions int
}

type TransactionExecutionResultStats struct {
ExecutionResultStats
NumberOfTxnConflictRetries int
Failed bool
SystemTransaction bool
ComputationIntensities meter.MeteredComputationIntensities
}

func (stats *ExecutionResultStats) Merge(other ExecutionResultStats) {
Expand All @@ -908,8 +925,17 @@ func (stats *ExecutionResultStats) Merge(other ExecutionResultStats) {
stats.EventSize += other.EventSize
stats.NumberOfRegistersTouched += other.NumberOfRegistersTouched
stats.NumberOfBytesWrittenToRegisters += other.NumberOfBytesWrittenToRegisters
stats.NumberOfCollections += other.NumberOfCollections
}

func (stats *CollectionExecutionResultStats) Add(other TransactionExecutionResultStats) {
stats.ExecutionResultStats.Merge(other.ExecutionResultStats)
stats.NumberOfTransactions += 1
}

func (stats *BlockExecutionResultStats) Add(other CollectionExecutionResultStats) {
stats.CollectionExecutionResultStats.Merge(other.ExecutionResultStats)
stats.NumberOfTransactions += other.NumberOfTransactions
stats.NumberOfCollections += 1
}

type ExecutionMetrics interface {
Expand All @@ -936,7 +962,7 @@ type ExecutionMetrics interface {
ExecutionLastFinalizedExecutedBlockHeight(height uint64)

// ExecutionBlockExecuted reports the total time and computation spent on executing a block
ExecutionBlockExecuted(dur time.Duration, stats ExecutionResultStats)
ExecutionBlockExecuted(dur time.Duration, stats BlockExecutionResultStats)

// ExecutionBlockExecutionEffortVectorComponent reports the unweighted effort of given ComputationKind at block level
ExecutionBlockExecutionEffortVectorComponent(string, uint)
Expand All @@ -945,17 +971,10 @@ type ExecutionMetrics interface {
ExecutionBlockCachedPrograms(programs int)

// ExecutionCollectionExecuted reports the total time and computation spent on executing a collection
ExecutionCollectionExecuted(dur time.Duration, stats ExecutionResultStats)
ExecutionCollectionExecuted(dur time.Duration, stats CollectionExecutionResultStats)

// ExecutionTransactionExecuted reports stats on executing a single transaction
ExecutionTransactionExecuted(
dur time.Duration,
numTxnConflictRetries int,
compUsed uint64,
memoryUsed uint64,
eventCounts int,
eventSize int,
failed bool)
ExecutionTransactionExecuted(dur time.Duration, stats TransactionExecutionResultStats)

// ExecutionChunkDataPackGenerated reports stats on chunk data pack generation
ExecutionChunkDataPackGenerated(proofSize, numberOfTransactions int)
Expand Down
18 changes: 11 additions & 7 deletions module/metrics/example/execution/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,17 @@ func main() {

collector.ExecutionBlockExecuted(
duration,
module.ExecutionResultStats{
ComputationUsed: uint64(rand.Int63n(1e6)),
MemoryUsed: uint64(rand.Int63n(1e6)),
EventCounts: 2,
EventSize: 100,
NumberOfCollections: 1,
NumberOfTransactions: 1,
module.BlockExecutionResultStats{
CollectionExecutionResultStats: module.CollectionExecutionResultStats{
ExecutionResultStats: module.ExecutionResultStats{
EventSize: 100,
EventCounts: 2,
MemoryUsed: uint64(rand.Int63n(1e6)),
ComputationUsed: uint64(rand.Int63n(1e6)),
},
NumberOfTransactions: 1,
},
NumberOfCollections: 1,
})

diskIncrease := rand.Int63n(1024 * 1024)
Expand Down
25 changes: 10 additions & 15 deletions module/metrics/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ func (ec *ExecutionCollector) FinishBlockReceivedToExecuted(blockID flow.Identif
// ExecutionBlockExecuted reports execution meta data after executing a block
func (ec *ExecutionCollector) ExecutionBlockExecuted(
dur time.Duration,
stats module.ExecutionResultStats,
stats module.BlockExecutionResultStats,
) {
ec.totalExecutedBlocksCounter.Inc()
ec.blockExecutionTime.Observe(float64(dur.Milliseconds()))
Expand All @@ -734,7 +734,7 @@ func (ec *ExecutionCollector) ExecutionBlockExecuted(
// ExecutionCollectionExecuted reports stats for executing a collection
func (ec *ExecutionCollector) ExecutionCollectionExecuted(
dur time.Duration,
stats module.ExecutionResultStats,
stats module.CollectionExecutionResultStats,
) {
ec.totalExecutedCollectionsCounter.Inc()
ec.collectionExecutionTime.Observe(float64(dur.Milliseconds()))
Expand All @@ -758,23 +758,18 @@ func (ec *ExecutionCollector) ExecutionBlockCachedPrograms(programs int) {
// ExecutionTransactionExecuted reports stats for executing a transaction
func (ec *ExecutionCollector) ExecutionTransactionExecuted(
dur time.Duration,
numConflictRetries int,
compUsed uint64,
memoryUsed uint64,
eventCounts int,
eventSize int,
failed bool,
stats module.TransactionExecutionResultStats,
) {
ec.totalExecutedTransactionsCounter.Inc()
ec.transactionExecutionTime.Observe(float64(dur.Milliseconds()))
ec.transactionConflictRetries.Observe(float64(numConflictRetries))
ec.transactionComputationUsed.Observe(float64(compUsed))
ec.transactionConflictRetries.Observe(float64(stats.NumberOfTxnConflictRetries))
ec.transactionComputationUsed.Observe(float64(stats.ComputationUsed))
ec.transactionNormalizedTimePerComputation.Observe(
flow.NormalizedExecutionTimePerComputationUnit(dur, compUsed))
ec.transactionMemoryEstimate.Observe(float64(memoryUsed))
ec.transactionEmittedEvents.Observe(float64(eventCounts))
ec.transactionEventSize.Observe(float64(eventSize))
if failed {
flow.NormalizedExecutionTimePerComputationUnit(dur, stats.ComputationUsed))
ec.transactionMemoryEstimate.Observe(float64(stats.MemoryUsed))
ec.transactionEmittedEvents.Observe(float64(stats.EventCounts))
ec.transactionEventSize.Observe(float64(stats.EventSize))
if stats.Failed {
ec.totalFailedTransactionsCounter.Inc()
}
}
Expand Down
61 changes: 31 additions & 30 deletions module/metrics/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,39 +137,40 @@ func (nc *NoopCollector) OnVerifiableChunkReceivedAtVerifierEngine()
func (nc *NoopCollector) OnResultApprovalDispatchedInNetworkByVerifier() {}
func (nc *NoopCollector) SetMaxChunkDataPackAttemptsForNextUnsealedHeightAtRequester(attempts uint64) {
}
func (nc *NoopCollector) OnFinalizedBlockArrivedAtAssigner(height uint64) {}
func (nc *NoopCollector) OnChunksAssignmentDoneAtAssigner(chunks int) {}
func (nc *NoopCollector) OnAssignedChunkProcessedAtAssigner() {}
func (nc *NoopCollector) OnAssignedChunkReceivedAtFetcher() {}
func (nc *NoopCollector) OnChunkDataPackRequestDispatchedInNetworkByRequester() {}
func (nc *NoopCollector) OnChunkDataPackRequestSentByFetcher() {}
func (nc *NoopCollector) OnChunkDataPackRequestReceivedByRequester() {}
func (nc *NoopCollector) OnChunkDataPackArrivedAtFetcher() {}
func (nc *NoopCollector) OnChunkDataPackSentToFetcher() {}
func (nc *NoopCollector) OnVerifiableChunkSentToVerifier() {}
func (nc *NoopCollector) OnBlockConsumerJobDone(uint64) {}
func (nc *NoopCollector) OnChunkConsumerJobDone(uint64) {}
func (nc *NoopCollector) OnChunkDataPackResponseReceivedFromNetworkByRequester() {}
func (nc *NoopCollector) TotalConnectionsInPool(connectionCount uint, connectionPoolSize uint) {}
func (nc *NoopCollector) ConnectionFromPoolReused() {}
func (nc *NoopCollector) ConnectionAddedToPool() {}
func (nc *NoopCollector) NewConnectionEstablished() {}
func (nc *NoopCollector) ConnectionFromPoolInvalidated() {}
func (nc *NoopCollector) ConnectionFromPoolUpdated() {}
func (nc *NoopCollector) ConnectionFromPoolEvicted() {}
func (nc *NoopCollector) StartBlockReceivedToExecuted(blockID flow.Identifier) {}
func (nc *NoopCollector) FinishBlockReceivedToExecuted(blockID flow.Identifier) {}
func (nc *NoopCollector) ExecutionComputationUsedPerBlock(computation uint64) {}
func (nc *NoopCollector) ExecutionStorageStateCommitment(bytes int64) {}
func (nc *NoopCollector) ExecutionCheckpointSize(bytes uint64) {}
func (nc *NoopCollector) ExecutionLastExecutedBlockHeight(height uint64) {}
func (nc *NoopCollector) ExecutionLastFinalizedExecutedBlockHeight(height uint64) {}
func (nc *NoopCollector) ExecutionBlockExecuted(_ time.Duration, _ module.ExecutionResultStats) {}
func (nc *NoopCollector) ExecutionCollectionExecuted(_ time.Duration, _ module.ExecutionResultStats) {
func (nc *NoopCollector) OnFinalizedBlockArrivedAtAssigner(height uint64) {}
func (nc *NoopCollector) OnChunksAssignmentDoneAtAssigner(chunks int) {}
func (nc *NoopCollector) OnAssignedChunkProcessedAtAssigner() {}
func (nc *NoopCollector) OnAssignedChunkReceivedAtFetcher() {}
func (nc *NoopCollector) OnChunkDataPackRequestDispatchedInNetworkByRequester() {}
func (nc *NoopCollector) OnChunkDataPackRequestSentByFetcher() {}
func (nc *NoopCollector) OnChunkDataPackRequestReceivedByRequester() {}
func (nc *NoopCollector) OnChunkDataPackArrivedAtFetcher() {}
func (nc *NoopCollector) OnChunkDataPackSentToFetcher() {}
func (nc *NoopCollector) OnVerifiableChunkSentToVerifier() {}
func (nc *NoopCollector) OnBlockConsumerJobDone(uint64) {}
func (nc *NoopCollector) OnChunkConsumerJobDone(uint64) {}
func (nc *NoopCollector) OnChunkDataPackResponseReceivedFromNetworkByRequester() {}
func (nc *NoopCollector) TotalConnectionsInPool(connectionCount uint, connectionPoolSize uint) {}
func (nc *NoopCollector) ConnectionFromPoolReused() {}
func (nc *NoopCollector) ConnectionAddedToPool() {}
func (nc *NoopCollector) NewConnectionEstablished() {}
func (nc *NoopCollector) ConnectionFromPoolInvalidated() {}
func (nc *NoopCollector) ConnectionFromPoolUpdated() {}
func (nc *NoopCollector) ConnectionFromPoolEvicted() {}
func (nc *NoopCollector) StartBlockReceivedToExecuted(blockID flow.Identifier) {}
func (nc *NoopCollector) FinishBlockReceivedToExecuted(blockID flow.Identifier) {}
func (nc *NoopCollector) ExecutionComputationUsedPerBlock(computation uint64) {}
func (nc *NoopCollector) ExecutionStorageStateCommitment(bytes int64) {}
func (nc *NoopCollector) ExecutionCheckpointSize(bytes uint64) {}
func (nc *NoopCollector) ExecutionLastExecutedBlockHeight(height uint64) {}
func (nc *NoopCollector) ExecutionLastFinalizedExecutedBlockHeight(height uint64) {}
func (nc *NoopCollector) ExecutionBlockExecuted(_ time.Duration, _ module.BlockExecutionResultStats) {
}
func (nc *NoopCollector) ExecutionCollectionExecuted(_ time.Duration, _ module.CollectionExecutionResultStats) {
}
func (nc *NoopCollector) ExecutionBlockExecutionEffortVectorComponent(_ string, _ uint) {}
func (nc *NoopCollector) ExecutionBlockCachedPrograms(programs int) {}
func (nc *NoopCollector) ExecutionTransactionExecuted(_ time.Duration, _ int, _, _ uint64, _, _ int, _ bool) {
func (nc *NoopCollector) ExecutionTransactionExecuted(_ time.Duration, stats module.TransactionExecutionResultStats) {
}
func (nc *NoopCollector) ExecutionChunkDataPackGenerated(_, _ int) {}
func (nc *NoopCollector) ExecutionScriptExecuted(dur time.Duration, compUsed, _, _ uint64) {}
Expand Down
Loading

0 comments on commit da4f52c

Please sign in to comment.