Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,15 @@ func (t *testRetryPolicy) GetRetryType(err error) RetryType {
return Retry
}

// speculativeTestObserver is a simple observer for testing speculativeExecutions execution metrics
type speculativeTestObserver struct {
executions int
}

func (o *speculativeTestObserver) ObserveQuery(ctx context.Context, q ObservedQuery) {
o.executions = q.SpeculativeExecutions
}

func TestSpeculativeExecution(t *testing.T) {
log := newTestLogger(LogLevelDebug)
defer func() {
Expand Down Expand Up @@ -523,8 +532,11 @@ func TestSpeculativeExecution(t *testing.T) {
// test Speculative policy with 1 additional execution
sp := &SimpleSpeculativeExecution{NumAttempts: 1, TimeoutDelay: 200 * time.Millisecond}

// Add an observer to capture speculativeExecutions execution metrics
observer := &speculativeTestObserver{}

// Build the query
qry := db.Query("speculative").RetryPolicy(rt).SetSpeculativeExecutionPolicy(sp).Idempotent(true)
qry := db.Query("speculative").RetryPolicy(rt).SetSpeculativeExecutionPolicy(sp).Idempotent(true).Observer(observer)

// Execute the query and close, check that it doesn't error out
if err := qry.Exec(); err != nil {
Expand All @@ -549,6 +561,12 @@ func TestSpeculativeExecution(t *testing.T) {
if requests1+requests2+requests3 > 6 {
t.Errorf("error: expected to see 6 attempts, got %v\n", requests1+requests2+requests3)
}

// Verify that the observer captured speculativeExecutions execution attempts
// With NumAttempts: 1, we expect 1 speculativeExecutions attempt (in addition to the main execution)
if observer.executions != 1 {
t.Errorf("expected observer to capture 1 speculativeExecutions attempt, got %d", observer.executions)
}
}

// This tests that the policy connection pool handles SSL correctly
Expand Down
45 changes: 29 additions & 16 deletions query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ type internalRequest interface {
attempt(keyspace string, end, start time.Time, iter *Iter, host *HostInfo)
retryPolicy() RetryPolicy
speculativeExecutionPolicy() SpeculativeExecutionPolicy
speculativeExecutionStarted() // Used to update speculative execution count
getQueryMetrics() *queryMetrics
getRoutingInfo() *queryRoutingInfo
getKeyspaceFunc() func() string
Expand Down Expand Up @@ -91,6 +92,8 @@ func (q *queryExecutor) speculate(ctx context.Context, qry internalRequest, sp S
for i := 0; i < sp.Attempts(); i++ {
select {
case <-ticker.C:
// Increment speculative count in metrics so it's available to the observer
qry.speculativeExecutionStarted()
go q.run(ctx, qry, hostIter, results)
case <-ctx.Done():
return newErrIter(ctx.Err(), qry.getQueryMetrics(), qry.Keyspace(), qry.getRoutingInfo(), qry.getKeyspaceFunc())
Expand Down Expand Up @@ -383,17 +386,18 @@ func (q *internalQuery) attempt(keyspace string, end, start time.Time, iter *Ite
if q.qryOpts.observer != nil {
metricsForHost := q.hostMetricsManager.attempt(latency, host)
q.qryOpts.observer.ObserveQuery(q.qryOpts.context, ObservedQuery{
Keyspace: keyspace,
Statement: q.qryOpts.stmt,
Values: q.qryOpts.values,
Start: start,
End: end,
Rows: iter.numRows,
Host: host,
Metrics: metricsForHost,
Err: iter.err,
Attempt: attempt,
Query: q.originalQuery,
Keyspace: keyspace,
Statement: q.qryOpts.stmt,
Values: q.qryOpts.values,
Start: start,
End: end,
Rows: iter.numRows,
Host: host,
Metrics: metricsForHost,
Err: iter.err,
Attempt: attempt,
Query: q.originalQuery,
SpeculativeExecutions: q.metrics.speculativeExecutions(),
})
}
}
Expand All @@ -410,6 +414,10 @@ func (q *internalQuery) speculativeExecutionPolicy() SpeculativeExecutionPolicy
return q.qryOpts.spec
}

func (q *internalQuery) speculativeExecutionStarted() {
q.metrics.speculativeExecution()
}

func (q *internalQuery) GetRoutingKey() ([]byte, error) {
if q.qryOpts.routingKey != nil {
return q.qryOpts.routingKey, nil
Expand Down Expand Up @@ -612,11 +620,12 @@ func (b *internalBatch) attempt(keyspace string, end, start time.Time, iter *Ite
Start: start,
End: end,
// Rows not used in batch observations // TODO - might be able to support it when using BatchCAS
Host: host,
Metrics: metricsForHost,
Err: iter.err,
Attempt: attempt,
Batch: b.originalBatch,
Host: host,
Metrics: metricsForHost,
Err: iter.err,
Attempt: attempt,
Batch: b.originalBatch,
SpeculativeExecutions: b.metrics.speculativeExecutions(),
})
}

Expand All @@ -628,6 +637,10 @@ func (b *internalBatch) speculativeExecutionPolicy() SpeculativeExecutionPolicy
return b.batchOpts.spec
}

func (b *internalBatch) speculativeExecutionStarted() {
b.metrics.speculativeExecution()
}

func (b *internalBatch) GetRoutingKey() ([]byte, error) {
if b.batchOpts.routingKey != nil {
return b.batchOpts.routingKey, nil
Expand Down
17 changes: 17 additions & 0 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -909,6 +909,8 @@ type hostMetrics struct {
type queryMetrics struct {
totalAttempts int64
totalLatency int64
// totalSpeculativeExecutions is the number of speculative executions launched for this query.
totalSpeculativeExecutions int64
}

func (qm *queryMetrics) attempt(addLatency time.Duration) int {
Expand All @@ -920,6 +922,15 @@ func (qm *queryMetrics) attempts() int {
return int(atomic.LoadInt64(&qm.totalAttempts))
}

func (qm *queryMetrics) speculativeExecutions() int {
return int(atomic.LoadInt64(&qm.totalSpeculativeExecutions))
}

// increments the speculative execution count.
func (qm *queryMetrics) speculativeExecution() {
atomic.AddInt64(&qm.totalSpeculativeExecutions, 1)
}

func (qm *queryMetrics) latency() int64 {
attempts := atomic.LoadInt64(&qm.totalAttempts)
if attempts == 0 {
Expand Down Expand Up @@ -2324,6 +2335,9 @@ type ObservedQuery struct {
// The first attempt is number zero and any retries have non-zero attempt number.
Attempt int

// SpeculativeExecutions is the number of speculative executions launched
SpeculativeExecutions int

// Query object associated with this request. Should be used as read only.
Query *Query
}
Expand Down Expand Up @@ -2364,6 +2378,9 @@ type ObservedBatch struct {
// The first attempt is number zero and any retries have non-zero attempt number.
Attempt int

// SpeculativeExecutions is the number of speculative executions launched
SpeculativeExecutions int

// Batch object associated with this request. Should be used as read only.
Batch *Batch
}
Expand Down