Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 82 additions & 12 deletions pkg/cmd/roachtest/tests/inspect_throughput.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import (
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/workload/histogram"
"github.com/cockroachdb/cockroach/pkg/workload/histogram/exporter"
"github.com/lib/pq"
"github.com/stretchr/testify/require"
)

func registerInspectThoughput(r registry.Registry) {
Expand All @@ -34,7 +36,7 @@ func registerInspectThoughput(r registry.Registry) {

// Long run: 12 nodes × 8 CPUs, 1B rows, 2 index checks (runs INSPECT twice: 1 index, then 2 indexes), ~5 hours (in v25.4)
const indexesForLongRun = 2
r.Add(makeInspectThroughputTest(r, 12, 8, 1_000_000_000, 8*time.Hour, indexesForLongRun))
r.Add(makeInspectThroughputTest(r, 12, 8, 1_000_000_000, 11*time.Hour, indexesForLongRun))
}

// initInspectHistograms creates a histogram registry with multiple named metrics.
Expand Down Expand Up @@ -225,9 +227,7 @@ func makeInspectThroughputTest(
before := timeutil.Now()

inspectSQL := fmt.Sprintf("INSPECT TABLE bulkingest.bulkingest WITH OPTIONS INDEX (%s)", cfg.indexListSQL)
if _, err := db.Exec(inspectSQL); err != nil {
t.Fatal(err)
}
jobID := runInspectInBackground(ctx, t, db, inspectSQL)

// Tick after INSPECT completes to capture elapsed time for this specific metric.
tickHistogram(cfg.metricName)
Expand All @@ -249,15 +249,9 @@ func makeInspectThroughputTest(
0
)
FROM system.job_info
WHERE job_id = (
SELECT job_id
FROM [SHOW JOBS]
WHERE job_type = 'INSPECT'
ORDER BY created DESC
LIMIT 1
)
WHERE job_id = $1
AND info_key = 'legacy_progress'`
err := db.QueryRow(querySQL).Scan(&jobTotalCheckCount)
err := db.QueryRow(querySQL, jobID).Scan(&jobTotalCheckCount)
if err != nil {
t.L().Printf("Warning: failed to query job total check count: %v", err)
} else {
Expand Down Expand Up @@ -301,3 +295,79 @@ func disableRowCountValidation(t test.Test, db *gosql.DB) {
}
}
}

// runInspectInBackground runs an INSPECT command with a short statement timeout,
// forcing it to run as a background job. It then polls the job until completion,
// reporting progress at 10% intervals. Returns the job ID.
func runInspectInBackground(
ctx context.Context, t test.Test, db *gosql.DB, inspectSQL string,
) (jobID int64) {
// Set a short statement timeout to force INSPECT to run as a background job.
_, err := db.Exec("SET statement_timeout = '5s'")
require.NoError(t, err)

// Reset statement timeout after we're done.
defer func() {
_, resetErr := db.Exec("RESET statement_timeout")
require.NoError(t, resetErr)
}()

_, err = db.Exec(inspectSQL)

// This may fail if the INSPECT took longer than the statement_timeout to run.
// So, we tolerate only statement timeout errors here.
if err != nil {
require.ErrorContains(t, err, "statement timeout")
}

// Get the INSPECT job ID.
getJobIDSQL := `
SELECT job_id
FROM [SHOW JOBS]
WHERE job_type = 'INSPECT'
ORDER BY created DESC
LIMIT 1`
if err := db.QueryRow(getJobIDSQL).Scan(&jobID); err != nil {
t.Fatalf("failed to get INSPECT job ID: %v", err)
}
t.L().Printf("INSPECT job ID: %d", jobID)

// Poll the job until it completes, reporting progress at 10% intervals.
const pollInterval = 5 * time.Second
lastReportedThreshold := -1
ticker := time.NewTicker(pollInterval)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
t.Fatalf("context canceled while waiting for INSPECT job %d", jobID)
case <-ticker.C:
var status jobs.State
var fractionCompleted float64
checkJobSQL := `
SELECT status, fraction_completed
FROM [SHOW JOBS]
WHERE job_id = $1`
if err := db.QueryRow(checkJobSQL, jobID).Scan(&status, &fractionCompleted); err != nil {
t.Fatalf("failed to query job %d status: %v", jobID, err)
}

// Report progress at 10% thresholds (0%, 10%, 20%, ..., 90%).
currentThreshold := int(fractionCompleted * 10)
if currentThreshold > lastReportedThreshold && currentThreshold < 10 {
t.L().Printf("INSPECT job %d: %d%% complete", jobID, currentThreshold*10)
lastReportedThreshold = currentThreshold
}

// Check if job is complete.
switch status {
case jobs.StateSucceeded:
t.L().Printf("INSPECT job %d: 100%% complete (succeeded)", jobID)
return jobID
case jobs.StateFailed, jobs.StateCanceled:
t.Fatalf("INSPECT job %d finished with status: %s", jobID, status)
}
}
}
}