Skip to content

Commit 8e3d75c

Browse files
authored
log metrics around processing durations (#217)
### TL;DR Added performance metrics for database operations and Kafka publishing. ### What changed? - Added Prometheus histogram metrics for key operations in the metrics package: - `staging_insert_duration_seconds` - `main_storage_insert_duration_seconds` - `publish_duration_seconds` - `staging_delete_duration_seconds` - `get_block_numbers_to_commit_duration_seconds` - `get_staging_data_duration_seconds` - Instrumented the `Committer` and `Poller` components with timing measurements - Added timing for `getBlockNumbersToCommit()` function - Added timing for staging data retrieval in `getSequentialBlockDataToCommit()` - Added timing for main storage insertion, Kafka publishing, and staging data deletion in the `commit()` function - Added timing for staging data insertion in the `handleWorkerResults()` function - All timing logs use a consistent format with the "metric" field for easier filtering ### How to test? 1. Run the application with debug logging enabled 2. Monitor the logs for entries with `"metric"` field 3. Verify that duration metrics are being reported for all operations 4. Check Prometheus endpoint to confirm metrics are being exposed correctly ### Why make this change? This change adds detailed performance metrics for database operations and Kafka publishing, which will help identify bottlenecks in the data processing pipeline. By measuring the duration of key operations, we can better understand where performance issues might be occurring and optimize accordingly. The consistent logging format with the "metric" field makes it easier to filter and analyze these performance metrics in log aggregation tools. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Added detailed performance metrics and timing logs for key data handling and publishing operations, including staging and main storage inserts, publishing to Kafka, and data deletion. - Introduced Prometheus histogram metrics to monitor operation durations, enhancing observability of system performance. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
2 parents 23884ad + 65f8943 commit 8e3d75c

File tree

3 files changed

+62
-0
lines changed

3 files changed

+62
-0
lines changed

internal/metrics/metrics.go

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,3 +105,42 @@ var (
105105
Help: "The last block number that was published",
106106
})
107107
)
108+
109+
// Operation Duration Metrics
110+
var (
111+
StagingInsertDuration = promauto.NewHistogram(prometheus.HistogramOpts{
112+
Name: "staging_insert_duration_seconds",
113+
Help: "Time taken to insert data into staging storage",
114+
Buckets: prometheus.DefBuckets,
115+
})
116+
117+
MainStorageInsertDuration = promauto.NewHistogram(prometheus.HistogramOpts{
118+
Name: "main_storage_insert_duration_seconds",
119+
Help: "Time taken to insert data into main storage",
120+
Buckets: prometheus.DefBuckets,
121+
})
122+
123+
PublishDuration = promauto.NewHistogram(prometheus.HistogramOpts{
124+
Name: "publish_duration_seconds",
125+
Help: "Time taken to publish block data to Kafka",
126+
Buckets: prometheus.DefBuckets,
127+
})
128+
129+
StagingDeleteDuration = promauto.NewHistogram(prometheus.HistogramOpts{
130+
Name: "staging_delete_duration_seconds",
131+
Help: "Time taken to delete data from staging storage",
132+
Buckets: prometheus.DefBuckets,
133+
})
134+
135+
GetBlockNumbersToCommitDuration = promauto.NewHistogram(prometheus.HistogramOpts{
136+
Name: "get_block_numbers_to_commit_duration_seconds",
137+
Help: "Time taken to get block numbers to commit from storage",
138+
Buckets: prometheus.DefBuckets,
139+
})
140+
141+
GetStagingDataDuration = promauto.NewHistogram(prometheus.HistogramOpts{
142+
Name: "get_staging_data_duration_seconds",
143+
Help: "Time taken to get data from staging storage",
144+
Buckets: prometheus.DefBuckets,
145+
})
146+
)

internal/orchestrator/committer.go

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,12 @@ func (c *Committer) Start(ctx context.Context) {
8080
}
8181

8282
func (c *Committer) getBlockNumbersToCommit() ([]*big.Int, error) {
83+
startTime := time.Now()
84+
defer func() {
85+
log.Debug().Str("metric", "get_block_numbers_to_commit_duration").Msgf("getBlockNumbersToCommit duration: %f", time.Since(startTime).Seconds())
86+
metrics.GetBlockNumbersToCommitDuration.Observe(time.Since(startTime).Seconds())
87+
}()
88+
8389
latestCommittedBlockNumber, err := c.storage.MainStorage.GetMaxBlockNumber(c.rpc.GetChainID())
8490
log.Info().Msgf("Committer found this max block number in main storage: %s", latestCommittedBlockNumber.String())
8591
if err != nil {
@@ -117,7 +123,11 @@ func (c *Committer) getSequentialBlockDataToCommit(ctx context.Context) ([]commo
117123
return nil, nil
118124
}
119125

126+
startTime := time.Now()
120127
blocksData, err := c.storage.StagingStorage.GetStagingData(storage.QueryFilter{BlockNumbers: blocksToCommit, ChainId: c.rpc.GetChainID()})
128+
log.Debug().Str("metric", "get_staging_data_duration").Msgf("StagingStorage.GetStagingData duration: %f", time.Since(startTime).Seconds())
129+
metrics.GetStagingDataDuration.Observe(time.Since(startTime).Seconds())
130+
121131
if err != nil {
122132
return nil, fmt.Errorf("error fetching blocks to commit: %v", err)
123133
}
@@ -168,20 +178,29 @@ func (c *Committer) commit(ctx context.Context, blockData []common.BlockData) er
168178
}
169179
log.Debug().Msgf("Committing %d blocks", len(blockNumbers))
170180

181+
mainStorageStart := time.Now()
171182
if err := c.storage.MainStorage.InsertBlockData(blockData); err != nil {
172183
log.Error().Err(err).Msgf("Failed to commit blocks: %v", blockNumbers)
173184
return fmt.Errorf("error saving data to main storage: %v", err)
174185
}
186+
log.Debug().Str("metric", "main_storage_insert_duration").Msgf("MainStorage.InsertBlockData duration: %f", time.Since(mainStorageStart).Seconds())
187+
metrics.MainStorageInsertDuration.Observe(time.Since(mainStorageStart).Seconds())
175188

189+
publishStart := time.Now()
176190
go func() {
177191
if err := c.publisher.PublishBlockData(blockData); err != nil {
178192
log.Error().Err(err).Msg("Failed to publish block data to kafka")
179193
}
194+
log.Debug().Str("metric", "publish_duration").Msgf("Publisher.PublishBlockData duration: %f", time.Since(publishStart).Seconds())
195+
metrics.PublishDuration.Observe(time.Since(publishStart).Seconds())
180196
}()
181197

198+
stagingDeleteStart := time.Now()
182199
if err := c.storage.StagingStorage.DeleteStagingData(blockData); err != nil {
183200
return fmt.Errorf("error deleting data from staging storage: %v", err)
184201
}
202+
log.Debug().Str("metric", "staging_delete_duration").Msgf("StagingStorage.DeleteStagingData duration: %f", time.Since(stagingDeleteStart).Seconds())
203+
metrics.StagingDeleteDuration.Observe(time.Since(stagingDeleteStart).Seconds())
185204

186205
// Find highest block number from committed blocks
187206
highestBlock := blockData[0].Block

internal/orchestrator/poller.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -234,6 +234,8 @@ func (p *Poller) handleWorkerResults(results []rpc.GetFullBlockResult) {
234234
Traces: result.Data.Traces,
235235
})
236236
}
237+
238+
startTime := time.Now()
237239
if err := p.storage.StagingStorage.InsertStagingData(blockData); err != nil {
238240
e := fmt.Errorf("error inserting block data: %v", err)
239241
log.Error().Err(e)
@@ -245,6 +247,8 @@ func (p *Poller) handleWorkerResults(results []rpc.GetFullBlockResult) {
245247
}
246248
metrics.PolledBatchSize.Set(float64(len(blockData)))
247249
}
250+
log.Debug().Str("metric", "staging_insert_duration").Msgf("StagingStorage.InsertStagingData duration: %f", time.Since(startTime).Seconds())
251+
metrics.StagingInsertDuration.Observe(time.Since(startTime).Seconds())
248252

249253
if len(failedResults) > 0 {
250254
p.handleBlockFailures(failedResults)

0 commit comments

Comments
 (0)