Skip to content

Commit f881d3f

Browse files
committed
workload/schemachange: add INSPECT operation to random schema workload
Adds a new inspect operation to the schema change workload, enabling random generation of INSPECT TABLE and INSPECT DATABASE statements. Features: - Support for TABLE/DB targets, AS OF SYSTEM TIME - Always runs in DETACHED mode so that it can be run inside a transaciton - Results checked post-run via SHOW INSPECT ERRORS Errors reported in JSON, consistent with existing workload logs Closes #155483 Epic: CRDB-55075 Release note: none
1 parent 1e00b30 commit f881d3f

File tree

4 files changed

+169
-2
lines changed

4 files changed

+169
-2
lines changed

pkg/workload/schemachange/operation_generator.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3456,6 +3456,71 @@ func (og *operationGenerator) validate(ctx context.Context, tx pgx.Tx) (*opStmt,
34563456
return validateStmt, errors.Errorf("Validation FAIL:\n%s", strings.Join(errs, "\n"))
34573457
}
34583458

3459+
func (og *operationGenerator) inspect(ctx context.Context, tx pgx.Tx) (*opStmt, error) {
3460+
stmt := makeOpStmt(OpStmtDML)
3461+
3462+
var sb strings.Builder
3463+
sb.WriteString("INSPECT ")
3464+
3465+
if og.randIntn(2) == 0 {
3466+
tableName, err := og.randTable(ctx, tx, og.pctExisting(true), "")
3467+
if err != nil {
3468+
return nil, err
3469+
}
3470+
tableExists, err := og.tableExists(ctx, tx, tableName)
3471+
if err != nil {
3472+
return nil, err
3473+
}
3474+
sb.WriteString("TABLE ")
3475+
sb.WriteString(tableName.String())
3476+
if !tableExists {
3477+
stmt.expectedExecErrors.add(pgcode.UndefinedTable)
3478+
}
3479+
} else {
3480+
database, err := og.getDatabase(ctx, tx)
3481+
if err != nil {
3482+
return nil, err
3483+
}
3484+
useExisting := og.randIntn(100) < og.pctExisting(true)
3485+
var databaseName string
3486+
if useExisting {
3487+
databaseName = database
3488+
} else {
3489+
databaseName = fmt.Sprintf("inspect_db_%s", og.newUniqueSeqNumSuffix())
3490+
stmt.expectedExecErrors.add(pgcode.InvalidCatalogName)
3491+
}
3492+
sb.WriteString("DATABASE ")
3493+
sb.WriteString(databaseName)
3494+
}
3495+
3496+
asof := og.randomInspectAsOfClause()
3497+
sb.WriteString(asof)
3498+
// If we use an ASOF time with inspect, chances are it will conflict with
3499+
// the timestamp chosen for the transaction, and return an "inconsistent AS
3500+
// OF SYSTEM TIME timestamp" error.
3501+
stmt.potentialExecErrors.addAll(codesWithConditions{
3502+
{pgcode.FeatureNotSupported, asof != ""},
3503+
})
3504+
3505+
// Always run DETACHED as this allows us to use INSPECT inside of a
3506+
// transaction. We have post-processing at the end of the run to verify
3507+
// INSPECT didn't find any issues.
3508+
sb.WriteString(" WITH OPTIONS DETACHED")
3509+
stmt.sql = sb.String()
3510+
3511+
return stmt, nil
3512+
}
3513+
3514+
func (og *operationGenerator) randomInspectAsOfClause() string {
3515+
// Use AS OF SYSTEM TIME infrequently (~10% of the time) because transactions
3516+
// are never started with AS OF SYSTEM TIME, and INSPECT with AS OF inside a
3517+
// transaction without AS OF can cause conflicts.
3518+
if og.randIntn(10) == 0 {
3519+
return fmt.Sprintf(" AS OF SYSTEM TIME '-%ds'", og.randIntn(30)+1)
3520+
}
3521+
return ""
3522+
}
3523+
34593524
type column struct {
34603525
name tree.Name
34613526
typ *types.T

pkg/workload/schemachange/optype.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,9 @@ const (
145145
dropView // DROP VIEW <view>
146146
truncateTable
147147

148+
// INSPECT ...
149+
inspect // INSPECT {TABLE|DATABASE} ...
150+
148151
// Unimplemented operations. TODO(sql-foundations): Audit and/or implement these operations.
149152
// alterDatabaseOwner
150153
// alterDatabasePlacement
@@ -206,14 +209,15 @@ const (
206209
)
207210

208211
func isDMLOpType(t opType) bool {
209-
return t == insertRow || t == selectStmt || t == validate
212+
return t == insertRow || t == selectStmt || t == validate || t == inspect
210213
}
211214

212215
var opFuncs = []func(*operationGenerator, context.Context, pgx.Tx) (*opStmt, error){
213216
// Non-DDL
214217
insertRow: (*operationGenerator).insertRow,
215218
selectStmt: (*operationGenerator).selectStmt,
216219
validate: (*operationGenerator).validate,
220+
inspect: (*operationGenerator).inspect,
217221

218222
// DDL Operations
219223
alterDatabaseAddRegion: (*operationGenerator).addRegion,
@@ -273,6 +277,7 @@ var opWeights = []int{
273277
insertRow: 10,
274278
selectStmt: 10,
275279
validate: 2, // validate twice more often
280+
inspect: 1,
276281

277282
// DDL Operations
278283
alterDatabaseAddRegion: 1,
@@ -334,6 +339,7 @@ var opDeclarativeVersion = map[opType]clusterversion.Key{
334339
insertRow: clusterversion.MinSupported,
335340
selectStmt: clusterversion.MinSupported,
336341
validate: clusterversion.MinSupported,
342+
inspect: clusterversion.V25_4,
337343

338344
alterPolicy: clusterversion.V25_2,
339345
alterTableAddColumn: clusterversion.MinSupported,

pkg/workload/schemachange/optype_string.go

Lines changed: 3 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/workload/schemachange/schemachange.go

Lines changed: 94 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,94 @@ func setupSchemaChangePromCounter(reg prometheus.Registerer) schemaChangeCounter
163163
}
164164
}
165165

166+
func (s *schemaChange) logInspectErrors(
167+
ctx context.Context, pool *workload.MultiConnPool, log *atomicLog,
168+
) error {
169+
connPool := pool.Get()
170+
conn, err := connPool.Acquire(ctx)
171+
if err != nil {
172+
log.printLn(fmt.Sprintf("unable to acquire connection for SHOW INSPECT ERRORS: %v", err))
173+
return err
174+
}
175+
defer conn.Release()
176+
177+
rows, err := conn.Query(ctx, `SELECT job_id FROM [SHOW JOBS] WHERE job_type = 'INSPECT' ORDER BY created DESC`)
178+
if err != nil {
179+
log.printLn(fmt.Sprintf("fetching INSPECT jobs failed: %v", err))
180+
return err
181+
}
182+
jobIDs, err := pgx.CollectRows(rows, pgx.RowTo[int64])
183+
rows.Close()
184+
if err != nil {
185+
log.printLn(fmt.Sprintf("collecting INSPECT job IDs failed: %v", err))
186+
return err
187+
}
188+
if len(jobIDs) == 0 {
189+
return nil
190+
}
191+
192+
type InspectJobResult struct {
193+
JobID int64 `json:"jobId"`
194+
Status string `json:"status"`
195+
Errors []map[string]any `json:"errors,omitempty"`
196+
}
197+
198+
type InspectErrorSummary struct {
199+
Message string `json:"message"`
200+
Jobs []InspectJobResult `json:"jobs"`
201+
}
202+
203+
summary := InspectErrorSummary{
204+
Message: "Inspect Job Errors",
205+
Jobs: make([]InspectJobResult, 0, len(jobIDs)),
206+
}
207+
208+
var totalErrors int
209+
for _, jobID := range jobIDs {
210+
query := fmt.Sprintf(`
211+
SELECT error_type, database_name, schema_name, table_name, primary_key, job_id, aost, details
212+
FROM [SHOW INSPECT ERRORS FOR JOB %d WITH DETAILS]`, jobID)
213+
rows, err := conn.Query(ctx, query)
214+
if err != nil {
215+
log.printLn(fmt.Sprintf("%s failed: %v", query, err))
216+
continue
217+
}
218+
results, err := pgx.CollectRows(rows, pgx.RowToMap)
219+
rows.Close()
220+
if err != nil {
221+
log.printLn(fmt.Sprintf("collecting inspect errors for job %d failed: %v", jobID, err))
222+
continue
223+
}
224+
225+
jobResult := InspectJobResult{
226+
JobID: jobID,
227+
}
228+
229+
if len(results) == 0 {
230+
jobResult.Status = "no errors reported"
231+
} else {
232+
jobResult.Status = fmt.Sprintf("%d error rows", len(results))
233+
jobResult.Errors = results
234+
totalErrors += len(results)
235+
}
236+
237+
summary.Jobs = append(summary.Jobs, jobResult)
238+
}
239+
240+
// Output as JSON.
241+
jsonBytes, err := json.MarshalIndent(summary, "", " ")
242+
if err != nil {
243+
log.printLn(fmt.Sprintf("failed to marshal inspect errors to JSON: %v", err))
244+
return err
245+
}
246+
log.printLn(string(jsonBytes))
247+
248+
if totalErrors > 0 {
249+
return errors.Newf("found %d inspect errors across %d jobs", totalErrors, len(jobIDs))
250+
}
251+
return nil
252+
}
253+
166254
// Meta implements the workload.Generator interface.
167255
func (s *schemaChange) Meta() workload.Meta { return schemaChangeMeta }
168256

@@ -246,6 +334,8 @@ func (s *schemaChange) Ops(
246334

247335
ql := workload.QueryLoad{
248336
Close: func(_ context.Context) error {
337+
inspectErr := s.logInspectErrors(ctx, pool, stdoutLog)
338+
249339
// Create a new context for shutting down the tracer provider. The
250340
// provided context may be cancelled depending on why the workload is
251341
// shutting down and we always want to provide a period of time to flush
@@ -259,7 +349,7 @@ func (s *schemaChange) Ops(
259349
closeErr := s.closeJSONLogFile()
260350
shutdownErr := tracerProvider.Shutdown(ctx)
261351
s.schemaWorkloadResultAnnotator.logWorkloadStats(stdoutLog)
262-
return errors.CombineErrors(closeErr, shutdownErr)
352+
return errors.Join(inspectErr, closeErr, shutdownErr)
263353
},
264354
}
265355

@@ -564,6 +654,9 @@ func (w *schemaChangeWorker) run(ctx context.Context) error {
564654
return err
565655
}
566656
}
657+
if _, err := conn.Exec(ctx, "SET enable_inspect_command = true;"); err != nil {
658+
return err
659+
}
567660

568661
tx, err := conn.Begin(ctx)
569662
if err != nil {

0 commit comments

Comments
 (0)