From 473b6e02c60e59ade477e8e37c60e88393c71196 Mon Sep 17 00:00:00 2001 From: ankitaryan10 Date: Sun, 7 Dec 2025 13:11:58 -0800 Subject: [PATCH] Adding total attempt metric for SpeculativeRetries --- conn_test.go | 20 +++++++++++++++++++- query_executor.go | 45 +++++++++++++++++++++++++++++---------------- session.go | 17 +++++++++++++++++ 3 files changed, 65 insertions(+), 17 deletions(-) diff --git a/conn_test.go b/conn_test.go index 60e4a2a8a..7eb801db7 100644 --- a/conn_test.go +++ b/conn_test.go @@ -491,6 +491,15 @@ func (t *testRetryPolicy) GetRetryType(err error) RetryType { return Retry } +// speculativeTestObserver is a simple observer for testing speculativeExecutions execution metrics +type speculativeTestObserver struct { + executions int +} + +func (o *speculativeTestObserver) ObserveQuery(ctx context.Context, q ObservedQuery) { + o.executions = q.SpeculativeExecutions +} + func TestSpeculativeExecution(t *testing.T) { log := newTestLogger(LogLevelDebug) defer func() { @@ -523,8 +532,11 @@ func TestSpeculativeExecution(t *testing.T) { // test Speculative policy with 1 additional execution sp := &SimpleSpeculativeExecution{NumAttempts: 1, TimeoutDelay: 200 * time.Millisecond} + // Add an observer to capture speculativeExecutions execution metrics + observer := &speculativeTestObserver{} + // Build the query - qry := db.Query("speculative").RetryPolicy(rt).SetSpeculativeExecutionPolicy(sp).Idempotent(true) + qry := db.Query("speculative").RetryPolicy(rt).SetSpeculativeExecutionPolicy(sp).Idempotent(true).Observer(observer) // Execute the query and close, check that it doesn't error out if err := qry.Exec(); err != nil { @@ -549,6 +561,12 @@ func TestSpeculativeExecution(t *testing.T) { if requests1+requests2+requests3 > 6 { t.Errorf("error: expected to see 6 attempts, got %v\n", requests1+requests2+requests3) } + + // Verify that the observer captured speculativeExecutions execution attempts + // With NumAttempts: 1, we expect 1 speculativeExecutions attempt (in addition to the main execution) + if observer.executions != 1 { + t.Errorf("expected observer to capture 1 speculativeExecutions attempt, got %d", observer.executions) + } } // This tests that the policy connection pool handles SSL correctly diff --git a/query_executor.go b/query_executor.go index 2d7a62335..a49d8e908 100644 --- a/query_executor.go +++ b/query_executor.go @@ -61,6 +61,7 @@ type internalRequest interface { attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo) retryPolicy() RetryPolicy speculativeExecutionPolicy() SpeculativeExecutionPolicy + speculativeExecutionStarted() // Used to update speculative execution count getQueryMetrics() *queryMetrics getRoutingInfo() *queryRoutingInfo getKeyspaceFunc() func() string @@ -91,6 +92,8 @@ func (q *queryExecutor) speculate(ctx context.Context, qry internalRequest, sp S for i := 0; i < sp.Attempts(); i++ { select { case <-ticker.C: + // Increment speculative count in metrics so it's available to the observer + qry.speculativeExecutionStarted() go q.run(ctx, qry, hostIter, results) case <-ctx.Done(): return newErrIter(ctx.Err(), qry.getQueryMetrics(), qry.Keyspace(), qry.getRoutingInfo(), qry.getKeyspaceFunc()) @@ -383,17 +386,18 @@ func (q *internalQuery) attempt(keyspace string, end, start time.Time, iter *Ite if q.qryOpts.observer != nil { metricsForHost := q.hostMetricsManager.attempt(latency, host) q.qryOpts.observer.ObserveQuery(q.qryOpts.context, ObservedQuery{ - Keyspace: keyspace, - Statement: q.qryOpts.stmt, - Values: q.qryOpts.values, - Start: start, - End: end, - Rows: iter.numRows, - Host: host, - Metrics: metricsForHost, - Err: iter.err, - Attempt: attempt, - Query: q.originalQuery, + Keyspace: keyspace, + Statement: q.qryOpts.stmt, + Values: q.qryOpts.values, + Start: start, + End: end, + Rows: iter.numRows, + Host: host, + Metrics: metricsForHost, + Err: iter.err, + Attempt: attempt, + Query: q.originalQuery, + SpeculativeExecutions: q.metrics.speculativeExecutions(), }) } } @@ -410,6 +414,10 @@ func (q *internalQuery) speculativeExecutionPolicy() SpeculativeExecutionPolicy return q.qryOpts.spec } +func (q *internalQuery) speculativeExecutionStarted() { + q.metrics.speculativeExecution() +} + func (q *internalQuery) GetRoutingKey() ([]byte, error) { if q.qryOpts.routingKey != nil { return q.qryOpts.routingKey, nil @@ -612,11 +620,12 @@ func (b *internalBatch) attempt(keyspace string, end, start time.Time, iter *Ite Start: start, End: end, // Rows not used in batch observations // TODO - might be able to support it when using BatchCAS - Host: host, - Metrics: metricsForHost, - Err: iter.err, - Attempt: attempt, - Batch: b.originalBatch, + Host: host, + Metrics: metricsForHost, + Err: iter.err, + Attempt: attempt, + Batch: b.originalBatch, + SpeculativeExecutions: b.metrics.speculativeExecutions(), }) } @@ -628,6 +637,10 @@ func (b *internalBatch) speculativeExecutionPolicy() SpeculativeExecutionPolicy return b.batchOpts.spec } +func (b *internalBatch) speculativeExecutionStarted() { + b.metrics.speculativeExecution() +} + func (b *internalBatch) GetRoutingKey() ([]byte, error) { if b.batchOpts.routingKey != nil { return b.batchOpts.routingKey, nil diff --git a/session.go b/session.go index 264ed8922..c5da65102 100644 --- a/session.go +++ b/session.go @@ -909,6 +909,8 @@ type hostMetrics struct { type queryMetrics struct { totalAttempts int64 totalLatency int64 + // totalSpeculativeExecutions is the number of speculative executions launched for this query. + totalSpeculativeExecutions int64 } func (qm *queryMetrics) attempt(addLatency time.Duration) int { @@ -920,6 +922,15 @@ func (qm *queryMetrics) attempts() int { return int(atomic.LoadInt64(&qm.totalAttempts)) } +func (qm *queryMetrics) speculativeExecutions() int { + return int(atomic.LoadInt64(&qm.totalSpeculativeExecutions)) +} + +// increments the speculative execution count. +func (qm *queryMetrics) speculativeExecution() { + atomic.AddInt64(&qm.totalSpeculativeExecutions, 1) +} + func (qm *queryMetrics) latency() int64 { attempts := atomic.LoadInt64(&qm.totalAttempts) if attempts == 0 { @@ -2324,6 +2335,9 @@ type ObservedQuery struct { // The first attempt is number zero and any retries have non-zero attempt number. Attempt int + // SpeculativeExecutions is the number of speculative executions launched + SpeculativeExecutions int + // Query object associated with this request. Should be used as read only. Query *Query } @@ -2364,6 +2378,9 @@ type ObservedBatch struct { // The first attempt is number zero and any retries have non-zero attempt number. Attempt int + // SpeculativeExecutions is the number of speculative executions launched + SpeculativeExecutions int + // Batch object associated with this request. Should be used as read only. Batch *Batch }