diff --git a/internal/datastore/common/index.go b/internal/datastore/common/index.go new file mode 100644 index 0000000000..2146f5a5c9 --- /dev/null +++ b/internal/datastore/common/index.go @@ -0,0 +1,25 @@ +package common + +import "github.com/authzed/spicedb/pkg/datastore/queryshape" + +// IndexDefinition is a definition of an index for a datastore. +type IndexDefinition struct { + // Name is the unique name for the index. + Name string + + // ColumnsSQL is the SQL fragment of the columns over which this index will apply. + ColumnsSQL string + + // Shapes are those query shapes for which this index should be used. + Shapes []queryshape.Shape +} + +// matchesShape returns true if the index matches the given shape. +func (id IndexDefinition) matchesShape(shape queryshape.Shape) bool { + for _, s := range id.Shapes { + if s == shape { + return true + } + } + return false +} diff --git a/internal/datastore/common/relationships.go b/internal/datastore/common/relationships.go index 4543a46013..6952f1964e 100644 --- a/internal/datastore/common/relationships.go +++ b/internal/datastore/common/relationships.go @@ -39,14 +39,70 @@ type closeRows interface { Close() } +func runExplainIfNecessary[R Rows](ctx context.Context, builder RelationshipsQueryBuilder, tx Querier[R], explainable datastore.Explainable) error { + if builder.sqlExplainCallback == nil { + return nil + } + + // Determine the expected index names via the schema. + expectedIndexes := builder.Schema.expectedIndexesForShape(builder.queryShape) + + // Run any pre-explain statements. + for _, statement := range explainable.PreExplainStatements() { + if err := tx.QueryFunc(ctx, func(ctx context.Context, rows R) error { + rows.Next() + return nil + }, statement); err != nil { + return fmt.Errorf(errUnableToQueryRels, err) + } + } + + // Run the query with EXPLAIN ANALYZE. + sqlString, args, err := builder.SelectSQL() + if err != nil { + return fmt.Errorf(errUnableToQueryRels, err) + } + + explainSQL, explainArgs, err := explainable.BuildExplainQuery(sqlString, args) + if err != nil { + return fmt.Errorf(errUnableToQueryRels, err) + } + + err = tx.QueryFunc(ctx, func(ctx context.Context, rows R) error { + explainString := "" + for rows.Next() { + var explain string + if err := rows.Scan(&explain); err != nil { + return fmt.Errorf(errUnableToQueryRels, fmt.Errorf("scan err: %w", err)) + } + explainString += explain + "\n" + } + if explainString == "" { + return fmt.Errorf("received empty explain") + } + + builder.sqlExplainCallback(ctx, sqlString, args, builder.queryShape, explainString, expectedIndexes) + return nil + }, explainSQL, explainArgs...) + if err != nil { + return fmt.Errorf(errUnableToQueryRels, err) + } + + return nil +} + // QueryRelationships queries relationships for the given query and transaction. -func QueryRelationships[R Rows, C ~map[string]any](ctx context.Context, builder RelationshipsQueryBuilder, tx Querier[R]) (datastore.RelationshipIterator, error) { +func QueryRelationships[R Rows, C ~map[string]any](ctx context.Context, builder RelationshipsQueryBuilder, tx Querier[R], explainable datastore.Explainable) (datastore.RelationshipIterator, error) { span := trace.SpanFromContext(ctx) sqlString, args, err := builder.SelectSQL() if err != nil { return nil, fmt.Errorf(errUnableToQueryRels, err) } + if err := runExplainIfNecessary(ctx, builder, tx, explainable); err != nil { + return nil, err + } + var resourceObjectType string var resourceObjectID string var resourceRelation string diff --git a/internal/datastore/common/schema.go b/internal/datastore/common/schema.go index 542dc5dbf3..2a9514ac64 100644 --- a/internal/datastore/common/schema.go +++ b/internal/datastore/common/schema.go @@ -3,6 +3,8 @@ package common import ( sq "github.com/Masterminds/squirrel" + "github.com/authzed/spicedb/pkg/datastore/options" + "github.com/authzed/spicedb/pkg/datastore/queryshape" "github.com/authzed/spicedb/pkg/spiceerrors" ) @@ -35,6 +37,9 @@ type SchemaInformation struct { ColIntegrityHash string `debugmap:"visible"` ColIntegrityTimestamp string `debugmap:"visible"` + // Indexes are the indexes to use for this schema. + Indexes []IndexDefinition `debugmap:"visible"` + // PaginationFilterType is the type of pagination filter to use for this schema. PaginationFilterType PaginationFilterType `debugmap:"visible"` @@ -54,6 +59,17 @@ type SchemaInformation struct { ExpirationDisabled bool `debugmap:"visible"` } +// expectedIndexesForShape returns the expected index names for a given query shape. +func (si SchemaInformation) expectedIndexesForShape(shape queryshape.Shape) options.SQLIndexInformation { + expectedIndexes := options.SQLIndexInformation{} + for _, index := range si.Indexes { + if index.matchesShape(shape) { + expectedIndexes.ExpectedIndexNames = append(expectedIndexes.ExpectedIndexNames, index.Name) + } + } + return expectedIndexes +} + func (si SchemaInformation) debugValidate() { spiceerrors.DebugAssert(func() bool { si.mustValidate() diff --git a/internal/datastore/common/sql.go b/internal/datastore/common/sql.go index 733ca30bba..3559e46663 100644 --- a/internal/datastore/common/sql.go +++ b/internal/datastore/common/sql.go @@ -16,6 +16,7 @@ import ( log "github.com/authzed/spicedb/internal/logging" "github.com/authzed/spicedb/pkg/datastore" "github.com/authzed/spicedb/pkg/datastore/options" + "github.com/authzed/spicedb/pkg/datastore/queryshape" "github.com/authzed/spicedb/pkg/spiceerrors" ) @@ -663,12 +664,14 @@ func (exc QueryRelationshipsExecutor) ExecuteQuery( query.queryBuilder = query.queryBuilder.From(from) builder := RelationshipsQueryBuilder{ - Schema: query.schema, - SkipCaveats: queryOpts.SkipCaveats, - SkipExpiration: queryOpts.SkipExpiration, - sqlAssertion: queryOpts.SQLAssertion, - filteringValues: query.filteringColumnTracker, - baseQueryBuilder: query, + Schema: query.schema, + SkipCaveats: queryOpts.SkipCaveats, + SkipExpiration: queryOpts.SkipExpiration, + sqlCheckAssertion: queryOpts.SQLCheckAssertion, + sqlExplainCallback: queryOpts.SQLExplainCallback, + filteringValues: query.filteringColumnTracker, + queryShape: queryOpts.QueryShape, + baseQueryBuilder: query, } return exc.Executor(ctx, builder) @@ -681,9 +684,11 @@ type RelationshipsQueryBuilder struct { SkipCaveats bool SkipExpiration bool - filteringValues columnTrackerMap - baseQueryBuilder SchemaQueryFilterer - sqlAssertion options.Assertion + filteringValues columnTrackerMap + baseQueryBuilder SchemaQueryFilterer + sqlCheckAssertion options.SQLCheckAssertion + sqlExplainCallback options.SQLExplainCallback + queryShape queryshape.Shape } // withCaveats returns true if caveats should be included in the query. @@ -752,8 +757,8 @@ func (b RelationshipsQueryBuilder) SelectSQL() (string, []any, error) { return "", nil, err } - if b.sqlAssertion != nil { - b.sqlAssertion(sql) + if b.sqlCheckAssertion != nil { + b.sqlCheckAssertion(sql) } return sql, args, nil diff --git a/internal/datastore/common/zz_generated.schema_options.go b/internal/datastore/common/zz_generated.schema_options.go index 04b6088a36..85b0defad2 100644 --- a/internal/datastore/common/zz_generated.schema_options.go +++ b/internal/datastore/common/zz_generated.schema_options.go @@ -44,6 +44,7 @@ func (s *SchemaInformation) ToOption() SchemaInformationOption { to.ColIntegrityKeyID = s.ColIntegrityKeyID to.ColIntegrityHash = s.ColIntegrityHash to.ColIntegrityTimestamp = s.ColIntegrityTimestamp + to.Indexes = s.Indexes to.PaginationFilterType = s.PaginationFilterType to.PlaceholderFormat = s.PlaceholderFormat to.NowFunction = s.NowFunction @@ -69,6 +70,7 @@ func (s SchemaInformation) DebugMap() map[string]any { debugMap["ColIntegrityKeyID"] = helpers.DebugValue(s.ColIntegrityKeyID, false) debugMap["ColIntegrityHash"] = helpers.DebugValue(s.ColIntegrityHash, false) debugMap["ColIntegrityTimestamp"] = helpers.DebugValue(s.ColIntegrityTimestamp, false) + debugMap["Indexes"] = helpers.DebugValue(s.Indexes, false) debugMap["PaginationFilterType"] = helpers.DebugValue(s.PaginationFilterType, false) debugMap["PlaceholderFormat"] = helpers.DebugValue(s.PlaceholderFormat, false) debugMap["NowFunction"] = helpers.DebugValue(s.NowFunction, false) @@ -185,6 +187,20 @@ func WithColIntegrityTimestamp(colIntegrityTimestamp string) SchemaInformationOp } } +// WithIndexes returns an option that can append Indexess to SchemaInformation.Indexes +func WithIndexes(indexes IndexDefinition) SchemaInformationOption { + return func(s *SchemaInformation) { + s.Indexes = append(s.Indexes, indexes) + } +} + +// SetIndexes returns an option that can set Indexes on a SchemaInformation +func SetIndexes(indexes []IndexDefinition) SchemaInformationOption { + return func(s *SchemaInformation) { + s.Indexes = indexes + } +} + // WithPaginationFilterType returns an option that can set PaginationFilterType on a SchemaInformation func WithPaginationFilterType(paginationFilterType PaginationFilterType) SchemaInformationOption { return func(s *SchemaInformation) { diff --git a/internal/datastore/crdb/caveat.go b/internal/datastore/crdb/caveat.go index 94c74f6553..a89addf1c4 100644 --- a/internal/datastore/crdb/caveat.go +++ b/internal/datastore/crdb/caveat.go @@ -9,6 +9,7 @@ import ( sq "github.com/Masterminds/squirrel" "github.com/jackc/pgx/v5" + "github.com/authzed/spicedb/internal/datastore/crdb/schema" "github.com/authzed/spicedb/internal/datastore/revisions" "github.com/authzed/spicedb/pkg/datastore" core "github.com/authzed/spicedb/pkg/proto/core/v1" @@ -17,14 +18,14 @@ import ( var ( upsertCaveatSuffix = fmt.Sprintf( "ON CONFLICT (%s) DO UPDATE SET %s = excluded.%s", - colCaveatName, - colCaveatDefinition, - colCaveatDefinition, + schema.ColCaveatName, + schema.ColCaveatDefinition, + schema.ColCaveatDefinition, ) - writeCaveat = psql.Insert(tableCaveat).Columns(colCaveatName, colCaveatDefinition).Suffix(upsertCaveatSuffix) - readCaveat = psql.Select(colCaveatDefinition, colTimestamp) - listCaveat = psql.Select(colCaveatName, colCaveatDefinition, colTimestamp).OrderBy(colCaveatName) - deleteCaveat = psql.Delete(tableCaveat) + writeCaveat = psql.Insert(schema.TableCaveat).Columns(schema.ColCaveatName, schema.ColCaveatDefinition).Suffix(upsertCaveatSuffix) + readCaveat = psql.Select(schema.ColCaveatDefinition, schema.ColTimestamp) + listCaveat = psql.Select(schema.ColCaveatName, schema.ColCaveatDefinition, schema.ColTimestamp).OrderBy(schema.ColCaveatName) + deleteCaveat = psql.Delete(schema.TableCaveat) ) const ( @@ -35,7 +36,7 @@ const ( ) func (cr *crdbReader) ReadCaveatByName(ctx context.Context, name string) (*core.CaveatDefinition, datastore.Revision, error) { - query := cr.addFromToQuery(readCaveat.Where(sq.Eq{colCaveatName: name}), tableCaveat) + query := cr.addFromToQuery(readCaveat.Where(sq.Eq{schema.ColCaveatName: name}), schema.TableCaveat) sql, args, err := query.ToSql() if err != nil { return nil, datastore.NoRevision, fmt.Errorf(errReadCaveat, name, err) @@ -80,9 +81,9 @@ type bytesAndTimestamp struct { } func (cr *crdbReader) lookupCaveats(ctx context.Context, caveatNames []string) ([]datastore.RevisionedCaveat, error) { - caveatsWithNames := cr.addFromToQuery(listCaveat, tableCaveat) + caveatsWithNames := cr.addFromToQuery(listCaveat, schema.TableCaveat) if len(caveatNames) > 0 { - caveatsWithNames = caveatsWithNames.Where(sq.Eq{colCaveatName: caveatNames}) + caveatsWithNames = caveatsWithNames.Where(sq.Eq{schema.ColCaveatName: caveatNames}) } sql, args, err := caveatsWithNames.ToSql() @@ -158,7 +159,7 @@ func (rwt *crdbReadWriteTXN) WriteCaveats(ctx context.Context, caveats []*core.C } func (rwt *crdbReadWriteTXN) DeleteCaveats(ctx context.Context, names []string) error { - deleteCaveatClause := deleteCaveat.Where(sq.Eq{colCaveatName: names}) + deleteCaveatClause := deleteCaveat.Where(sq.Eq{schema.ColCaveatName: names}) sql, args, err := deleteCaveatClause.ToSql() if err != nil { return fmt.Errorf(errDeleteCaveats, err) diff --git a/internal/datastore/crdb/crdb.go b/internal/datastore/crdb/crdb.go index ad3bff7047..1d5605fc31 100644 --- a/internal/datastore/crdb/crdb.go +++ b/internal/datastore/crdb/crdb.go @@ -24,6 +24,7 @@ import ( "github.com/authzed/spicedb/internal/datastore/common" "github.com/authzed/spicedb/internal/datastore/crdb/migrations" "github.com/authzed/spicedb/internal/datastore/crdb/pool" + "github.com/authzed/spicedb/internal/datastore/crdb/schema" pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common" "github.com/authzed/spicedb/internal/datastore/revisions" log "github.com/authzed/spicedb/internal/logging" @@ -47,42 +48,7 @@ var ( ) const ( - Engine = "cockroachdb" - tableNamespace = "namespace_config" - tableTuple = "relation_tuple" - tableTupleWithIntegrity = "relation_tuple_with_integrity" - tableTransactions = "transactions" - tableCaveat = "caveat" - tableRelationshipCounter = "relationship_counter" - tableTransactionMetadata = "transaction_metadata" - - colNamespace = "namespace" - colConfig = "serialized_config" - colTimestamp = "timestamp" - colTransactionKey = "key" - - colObjectID = "object_id" - colRelation = "relation" - - colUsersetNamespace = "userset_namespace" - colUsersetObjectID = "userset_object_id" - colUsersetRelation = "userset_relation" - - colCaveatName = "name" - colCaveatDefinition = "definition" - colCaveatContextName = "caveat_name" - colCaveatContext = "caveat_context" - colExpiration = "expires_at" - - colIntegrityHash = "integrity_hash" - colIntegrityKeyID = "integrity_key_id" - - colCounterName = "name" - colCounterSerializedFilter = "serialized_filter" - colCounterCurrentCount = "current_count" - colCounterUpdatedAt = "updated_at_timestamp" - colExpiresAt = "expires_at" - colMetadata = "metadata" + Engine = "cockroachdb" errUnableToInstantiate = "unable to instantiate datastore" errRevision = "unable to find revision: %w" @@ -200,33 +166,6 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas return nil, fmt.Errorf("invalid head migration found for cockroach: %w", err) } - relTableName := tableTuple - if config.withIntegrity { - relTableName = tableTupleWithIntegrity - } - - schema := common.NewSchemaInformationWithOptions( - common.WithRelationshipTableName(relTableName), - common.WithColNamespace(colNamespace), - common.WithColObjectID(colObjectID), - common.WithColRelation(colRelation), - common.WithColUsersetNamespace(colUsersetNamespace), - common.WithColUsersetObjectID(colUsersetObjectID), - common.WithColUsersetRelation(colUsersetRelation), - common.WithColCaveatName(colCaveatContextName), - common.WithColCaveatContext(colCaveatContext), - common.WithColExpiration(colExpiration), - common.WithColIntegrityKeyID(colIntegrityKeyID), - common.WithColIntegrityHash(colIntegrityHash), - common.WithColIntegrityTimestamp(colTimestamp), - common.WithPaginationFilterType(common.ExpandedLogicComparison), - common.WithPlaceholderFormat(sq.Dollar), - common.WithNowFunction("NOW"), - common.WithColumnOptimization(config.columnOptimizationOption), - common.WithIntegrityEnabled(config.withIntegrity), - common.WithExpirationDisabled(config.expirationDisabled), - ) - ds := &crdbDatastore{ RemoteClockRevisions: revisions.NewRemoteClockRevisions( config.gcWindow, @@ -248,7 +187,7 @@ func newCRDBDatastore(ctx context.Context, url string, options ...Option) (datas filterMaximumIDCount: config.filterMaximumIDCount, supportsIntegrity: config.withIntegrity, gcWindow: config.gcWindow, - schema: *schema, + schema: *schema.Schema(config.columnOptimizationOption, config.withIntegrity, config.expirationDisabled), } ds.RemoteClockRevisions.SetNowFunc(ds.headRevisionInternal) @@ -349,7 +288,7 @@ type crdbDatastore struct { func (cds *crdbDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader { executor := common.QueryRelationshipsExecutor{ - Executor: pgxcommon.NewPGXQueryRelationshipsExecutor(cds.readPool), + Executor: pgxcommon.NewPGXQueryRelationshipsExecutor(cds.readPool, cds), } return &crdbReader{ schema: cds.schema, @@ -378,7 +317,7 @@ func (cds *crdbDatastore) ReadWriteTx( err := cds.writePool.BeginFunc(ctx, func(tx pgx.Tx) error { querier := pgxcommon.QuerierFuncsFor(tx) executor := common.QueryRelationshipsExecutor{ - Executor: pgxcommon.NewPGXQueryRelationshipsExecutor(querier), + Executor: pgxcommon.NewPGXQueryRelationshipsExecutor(querier, cds), } // Write metadata onto the transaction. @@ -389,8 +328,8 @@ func (cds *crdbDatastore) ReadWriteTx( metadata[spicedbTransactionKey] = true expiresAt := time.Now().Add(cds.gcWindow).Add(1 * time.Minute) - insertTransactionMetadata := psql.Insert(tableTransactionMetadata). - Columns(colExpiresAt, colMetadata). + insertTransactionMetadata := psql.Insert(schema.TableTransactionMetadata). + Columns(schema.ColExpiresAt, schema.ColMetadata). Values(expiresAt, metadata) sql, args, err := insertTransactionMetadata.ToSql() diff --git a/internal/datastore/crdb/crdb_test.go b/internal/datastore/crdb/crdb_test.go index e874cced01..39d9efaf58 100644 --- a/internal/datastore/crdb/crdb_test.go +++ b/internal/datastore/crdb/crdb_test.go @@ -30,6 +30,7 @@ import ( crdbmigrations "github.com/authzed/spicedb/internal/datastore/crdb/migrations" "github.com/authzed/spicedb/internal/datastore/crdb/pool" + "github.com/authzed/spicedb/internal/datastore/crdb/schema" "github.com/authzed/spicedb/internal/datastore/proxy" "github.com/authzed/spicedb/internal/datastore/revisions" "github.com/authzed/spicedb/internal/testfixtures" @@ -247,10 +248,10 @@ func TestWatchFeatureDetection(t *testing.T) { _, err = adminConn.Exec(ctx, `SET CLUSTER SETTING kv.rangefeed.enabled = true;`) require.NoError(t, err) - _, err = adminConn.Exec(ctx, fmt.Sprintf(`GRANT CHANGEFEED ON TABLE testspicedb.%s TO unprivileged;`, tableTuple)) + _, err = adminConn.Exec(ctx, fmt.Sprintf(`GRANT CHANGEFEED ON TABLE testspicedb.%s TO unprivileged;`, schema.TableTuple)) require.NoError(t, err) - _, err = adminConn.Exec(ctx, fmt.Sprintf(`GRANT SELECT ON TABLE testspicedb.%s TO unprivileged;`, tableTuple)) + _, err = adminConn.Exec(ctx, fmt.Sprintf(`GRANT SELECT ON TABLE testspicedb.%s TO unprivileged;`, schema.TableTuple)) require.NoError(t, err) }, expectEnabled: true, diff --git a/internal/datastore/crdb/debug.go b/internal/datastore/crdb/debug.go new file mode 100644 index 0000000000..0552246993 --- /dev/null +++ b/internal/datastore/crdb/debug.go @@ -0,0 +1,38 @@ +package crdb + +import ( + "regexp" + + "github.com/authzed/spicedb/pkg/datastore" + "github.com/authzed/spicedb/pkg/genutil/mapz" +) + +// See: https://www.cockroachlabs.com/docs/stable/explain + +var indexRegex = regexp.MustCompile(`table: relation_tuple@(.+)`) + +func (cds *crdbDatastore) PreExplainStatements() []string { + return nil +} + +func (cds *crdbDatastore) BuildExplainQuery(sql string, args []interface{}) (string, []any, error) { + return "EXPLAIN " + sql, args, nil +} + +func (cds *crdbDatastore) ParseExplain(explain string) (datastore.ParsedExplain, error) { + parts := indexRegex.FindAllStringSubmatch(explain, -1) + if len(parts) == 0 { + return datastore.ParsedExplain{}, nil + } + + indexes := mapz.NewSet[string]() + for _, part := range parts { + indexes.Add(part[1]) + } + + return datastore.ParsedExplain{ + IndexesUsed: indexes.AsSlice(), + }, nil +} + +var _ datastore.SQLDatastore = &crdbDatastore{} diff --git a/internal/datastore/crdb/debug_test.go b/internal/datastore/crdb/debug_test.go new file mode 100644 index 0000000000..10be27d18b --- /dev/null +++ b/internal/datastore/crdb/debug_test.go @@ -0,0 +1,63 @@ +package crdb + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/authzed/spicedb/pkg/datastore" +) + +func TestParseExplain(t *testing.T) { + tcs := []struct { + name string + input string + output datastore.ParsedExplain + }{ + { + name: "empty", + input: "", + output: datastore.ParsedExplain{}, + }, + { + name: "no indexes used", + input: ` • sort + │ estimated row count: 12,385 + │ order: +revenue + │ + └── • filter + │ estimated row count: 12,385 + │ filter: revenue > 90 + │ + └── • scan + estimated row count: 125,000 (100% of the table; stats collected 19 minutes ago) + table: relation_tuple`, + output: datastore.ParsedExplain{}, + }, + { + name: "index used", + input: ` • sort + │ estimated row count: 12,385 + │ order: +revenue + │ + └── • filter + │ estimated row count: 12,385 + │ filter: revenue > 90 + │ + └── • scan + estimated row count: 125,000 (100% of the table; stats collected 19 minutes ago) + table: relation_tuple@idx_relation_tuple_namespace`, + output: datastore.ParsedExplain{ + IndexesUsed: []string{"idx_relation_tuple_namespace"}, + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + parsed, err := (&crdbDatastore{}).ParseExplain(tc.input) + require.NoError(t, err) + require.Equal(t, tc.output, parsed) + }) + } +} diff --git a/internal/datastore/crdb/reader.go b/internal/datastore/crdb/reader.go index db44aaadd6..762e032d40 100644 --- a/internal/datastore/crdb/reader.go +++ b/internal/datastore/crdb/reader.go @@ -12,6 +12,7 @@ import ( "github.com/shopspring/decimal" "github.com/authzed/spicedb/internal/datastore/common" + "github.com/authzed/spicedb/internal/datastore/crdb/schema" pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common" "github.com/authzed/spicedb/internal/datastore/revisions" "github.com/authzed/spicedb/pkg/datastore" @@ -27,15 +28,15 @@ const ( ) var ( - queryReadNamespace = psql.Select(colConfig, colTimestamp) + queryReadNamespace = psql.Select(schema.ColConfig, schema.ColTimestamp) countRels = psql.Select("count(*)") queryCounters = psql.Select( - colCounterName, - colCounterSerializedFilter, - colCounterCurrentCount, - colCounterUpdatedAt, + schema.ColCounterName, + schema.ColCounterSerializedFilter, + schema.ColCounterCurrentCount, + schema.ColCounterUpdatedAt, ) ) @@ -123,9 +124,9 @@ func (cr *crdbReader) LookupCounters(ctx context.Context) ([]datastore.Relations } func (cr *crdbReader) lookupCounters(ctx context.Context, optionalFilterName string) ([]datastore.RelationshipCounter, error) { - query := cr.addFromToQuery(queryCounters, tableRelationshipCounter) + query := cr.addFromToQuery(queryCounters, schema.TableRelationshipCounter) if optionalFilterName != noFilterOnCounterName { - query = query.Where(sq.Eq{colCounterName: optionalFilterName}) + query = query.Where(sq.Eq{schema.ColCounterName: optionalFilterName}) } sql, args, err := query.ToSql() @@ -226,7 +227,7 @@ func (cr *crdbReader) QueryRelationships( } if spiceerrors.DebugAssertionsEnabled { - opts = append(opts, options.WithSQLAssertion(cr.assertHasExpectedAsOfSystemTime)) + opts = append(opts, options.WithSQLCheckAssertion(cr.assertHasExpectedAsOfSystemTime)) } return cr.executor.ExecuteQuery(ctx, qBuilder, opts...) @@ -255,10 +256,12 @@ func (cr *crdbReader) ReverseQueryRelationships( options.WithLimit(queryOpts.LimitForReverse), options.WithAfter(queryOpts.AfterForReverse), options.WithSort(queryOpts.SortForReverse), + options.WithQueryShape(queryOpts.QueryShapeForReverse), + options.WithSQLExplainCallback(queryOpts.SQLExplainCallbackForReverse), } if spiceerrors.DebugAssertionsEnabled { - eopts = append(eopts, options.WithSQLAssertion(cr.assertHasExpectedAsOfSystemTime)) + eopts = append(eopts, options.WithSQLCheckAssertion(cr.assertHasExpectedAsOfSystemTime)) } return cr.executor.ExecuteQuery( @@ -269,7 +272,7 @@ func (cr *crdbReader) ReverseQueryRelationships( } func (cr crdbReader) loadNamespace(ctx context.Context, tx pgxcommon.DBFuncQuerier, nsName string) (*core.NamespaceDefinition, time.Time, error) { - query := cr.addFromToQuery(queryReadNamespace, tableNamespace).Where(sq.Eq{colNamespace: nsName}) + query := cr.addFromToQuery(queryReadNamespace, schema.TableNamespace).Where(sq.Eq{schema.ColNamespace: nsName}) sql, args, err := query.ToSql() if err != nil { return nil, time.Time{}, err @@ -300,10 +303,10 @@ func (cr crdbReader) loadNamespace(ctx context.Context, tx pgxcommon.DBFuncQueri func (cr crdbReader) lookupNamespaces(ctx context.Context, tx pgxcommon.DBFuncQuerier, nsNames []string) ([]datastore.RevisionedNamespace, error) { clause := sq.Or{} for _, nsName := range nsNames { - clause = append(clause, sq.Eq{colNamespace: nsName}) + clause = append(clause, sq.Eq{schema.ColNamespace: nsName}) } - query := cr.addFromToQuery(queryReadNamespace, tableNamespace).Where(clause) + query := cr.addFromToQuery(queryReadNamespace, schema.TableNamespace).Where(clause) sql, args, err := query.ToSql() if err != nil { return nil, err @@ -344,7 +347,7 @@ func (cr crdbReader) lookupNamespaces(ctx context.Context, tx pgxcommon.DBFuncQu } func loadAllNamespaces(ctx context.Context, tx pgxcommon.DBFuncQuerier, fromBuilder func(sq.SelectBuilder, string) sq.SelectBuilder) ([]datastore.RevisionedNamespace, string, error) { - query := fromBuilder(queryReadNamespace, tableNamespace) + query := fromBuilder(queryReadNamespace, schema.TableNamespace) sql, args, err := query.ToSql() if err != nil { return nil, sql, err diff --git a/internal/datastore/crdb/readwrite.go b/internal/datastore/crdb/readwrite.go index acfaf97709..27261b4aa1 100644 --- a/internal/datastore/crdb/readwrite.go +++ b/internal/datastore/crdb/readwrite.go @@ -12,6 +12,7 @@ import ( "github.com/jackc/pgx/v5" "github.com/jzelinskie/stringz" + "github.com/authzed/spicedb/internal/datastore/crdb/schema" pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common" "github.com/authzed/spicedb/internal/datastore/revisions" log "github.com/authzed/spicedb/internal/logging" @@ -33,16 +34,16 @@ const ( var ( upsertNamespaceSuffix = fmt.Sprintf( "ON CONFLICT (%s) DO UPDATE SET %s = excluded.%s", - colNamespace, - colConfig, - colConfig, + schema.ColNamespace, + schema.ColConfig, + schema.ColConfig, ) - queryWriteNamespace = psql.Insert(tableNamespace).Columns( - colNamespace, - colConfig, + queryWriteNamespace = psql.Insert(schema.TableNamespace).Columns( + schema.ColNamespace, + schema.ColConfig, ).Suffix(upsertNamespaceSuffix) - queryDeleteNamespace = psql.Delete(tableNamespace) + queryDeleteNamespace = psql.Delete(schema.TableNamespace) ) type crdbReadWriteTXN struct { @@ -54,72 +55,72 @@ type crdbReadWriteTXN struct { var ( upsertTupleSuffixWithoutIntegrity = fmt.Sprintf( "ON CONFLICT (%s,%s,%s,%s,%s,%s) DO UPDATE SET %s = now(), %s = excluded.%s, %s = excluded.%s, %s = excluded.%s WHERE (relation_tuple.%s <> excluded.%s OR relation_tuple.%s <> excluded.%s OR relation_tuple.%s <> excluded.%s)", - colNamespace, - colObjectID, - colRelation, - colUsersetNamespace, - colUsersetObjectID, - colUsersetRelation, - colTimestamp, - colCaveatContextName, - colCaveatContextName, - colCaveatContext, - colCaveatContext, - colExpiration, - colExpiration, - colCaveatContextName, - colCaveatContextName, - colCaveatContext, - colCaveatContext, - colExpiration, - colExpiration, + schema.ColNamespace, + schema.ColObjectID, + schema.ColRelation, + schema.ColUsersetNamespace, + schema.ColUsersetObjectID, + schema.ColUsersetRelation, + schema.ColTimestamp, + schema.ColCaveatContextName, + schema.ColCaveatContextName, + schema.ColCaveatContext, + schema.ColCaveatContext, + schema.ColExpiration, + schema.ColExpiration, + schema.ColCaveatContextName, + schema.ColCaveatContextName, + schema.ColCaveatContext, + schema.ColCaveatContext, + schema.ColExpiration, + schema.ColExpiration, ) upsertTupleSuffixWithIntegrity = fmt.Sprintf( "ON CONFLICT (%s,%s,%s,%s,%s,%s) DO UPDATE SET %s = now(), %s = excluded.%s, %s = excluded.%s, %s = excluded.%s, %s = excluded.%s, %s = excluded.%s WHERE (relation_tuple_with_integrity.%s <> excluded.%s OR relation_tuple_with_integrity.%s <> excluded.%s OR relation_tuple_with_integrity.%s <> excluded.%s)", - colNamespace, - colObjectID, - colRelation, - colUsersetNamespace, - colUsersetObjectID, - colUsersetRelation, - colTimestamp, - colCaveatContextName, - colCaveatContextName, - colCaveatContext, - colCaveatContext, - colIntegrityKeyID, - colIntegrityKeyID, - colIntegrityHash, - colIntegrityHash, - colExpiration, - colExpiration, - colCaveatContextName, - colCaveatContextName, - colCaveatContext, - colCaveatContext, - colExpiration, - colExpiration, + schema.ColNamespace, + schema.ColObjectID, + schema.ColRelation, + schema.ColUsersetNamespace, + schema.ColUsersetObjectID, + schema.ColUsersetRelation, + schema.ColTimestamp, + schema.ColCaveatContextName, + schema.ColCaveatContextName, + schema.ColCaveatContext, + schema.ColCaveatContext, + schema.ColIntegrityKeyID, + schema.ColIntegrityKeyID, + schema.ColIntegrityHash, + schema.ColIntegrityHash, + schema.ColExpiration, + schema.ColExpiration, + schema.ColCaveatContextName, + schema.ColCaveatContextName, + schema.ColCaveatContext, + schema.ColCaveatContext, + schema.ColExpiration, + schema.ColExpiration, ) queryTouchTransaction = fmt.Sprintf( "INSERT INTO %s (%s) VALUES ($1::text) ON CONFLICT (%s) DO UPDATE SET %s = now()", - tableTransactions, - colTransactionKey, - colTransactionKey, - colTimestamp, + schema.TableTransactions, + schema.ColTransactionKey, + schema.ColTransactionKey, + schema.ColTimestamp, ) - queryWriteCounter = psql.Insert(tableRelationshipCounter).Columns( - colCounterName, - colCounterSerializedFilter, - colCounterCurrentCount, - colCounterUpdatedAt, + queryWriteCounter = psql.Insert(schema.TableRelationshipCounter).Columns( + schema.ColCounterName, + schema.ColCounterSerializedFilter, + schema.ColCounterCurrentCount, + schema.ColCounterUpdatedAt, ) - queryUpdateCounter = psql.Update(tableRelationshipCounter) + queryUpdateCounter = psql.Update(schema.TableRelationshipCounter) - queryDeleteCounter = psql.Delete(tableRelationshipCounter) + queryDeleteCounter = psql.Delete(schema.TableRelationshipCounter) ) func (rwt *crdbReadWriteTXN) insertQuery() sq.InsertBuilder { @@ -133,30 +134,30 @@ func (rwt *crdbReadWriteTXN) queryDeleteTuples() sq.DeleteBuilder { func (rwt *crdbReadWriteTXN) queryWriteTuple() sq.InsertBuilder { if rwt.withIntegrity { return rwt.insertQuery().Columns( - colNamespace, - colObjectID, - colRelation, - colUsersetNamespace, - colUsersetObjectID, - colUsersetRelation, - colCaveatContextName, - colCaveatContext, - colExpiration, - colIntegrityKeyID, - colIntegrityHash, + schema.ColNamespace, + schema.ColObjectID, + schema.ColRelation, + schema.ColUsersetNamespace, + schema.ColUsersetObjectID, + schema.ColUsersetRelation, + schema.ColCaveatContextName, + schema.ColCaveatContext, + schema.ColExpiration, + schema.ColIntegrityKeyID, + schema.ColIntegrityHash, ) } return rwt.insertQuery().Columns( - colNamespace, - colObjectID, - colRelation, - colUsersetNamespace, - colUsersetObjectID, - colUsersetRelation, - colCaveatContextName, - colCaveatContext, - colExpiration, + schema.ColNamespace, + schema.ColObjectID, + schema.ColRelation, + schema.ColUsersetNamespace, + schema.ColUsersetObjectID, + schema.ColUsersetRelation, + schema.ColCaveatContextName, + schema.ColCaveatContext, + schema.ColExpiration, ) } @@ -218,7 +219,7 @@ func (rwt *crdbReadWriteTXN) UnregisterCounter(ctx context.Context, name string) } // Remove the counter from the table. - sql, args, err := queryDeleteCounter.Where(sq.Eq{colCounterName: name}).ToSql() + sql, args, err := queryDeleteCounter.Where(sq.Eq{schema.ColCounterName: name}).ToSql() if err != nil { return fmt.Errorf("unable to unregister counter: %w", err) } @@ -248,9 +249,9 @@ func (rwt *crdbReadWriteTXN) StoreCounterValue(ctx context.Context, name string, // Update the counter in the table. sql, args, err := queryUpdateCounter. - Set(colCounterCurrentCount, value). - Set(colCounterUpdatedAt, computedAtRevisionTimestamp). - Where(sq.Eq{colCounterName: name}). + Set(schema.ColCounterCurrentCount, value). + Set(schema.ColCounterUpdatedAt, computedAtRevisionTimestamp). + Where(sq.Eq{schema.ColCounterName: name}). ToSql() if err != nil { return fmt.Errorf("unable to store counter value: %w", err) @@ -377,12 +378,12 @@ func (rwt *crdbReadWriteTXN) WriteRelationships(ctx context.Context, mutations [ func exactRelationshipClause(r tuple.Relationship) sq.Eq { return sq.Eq{ - colNamespace: r.Resource.ObjectType, - colObjectID: r.Resource.ObjectID, - colRelation: r.Resource.Relation, - colUsersetNamespace: r.Subject.ObjectType, - colUsersetObjectID: r.Subject.ObjectID, - colUsersetRelation: r.Subject.Relation, + schema.ColNamespace: r.Resource.ObjectType, + schema.ColObjectID: r.Resource.ObjectID, + schema.ColRelation: r.Resource.Relation, + schema.ColUsersetNamespace: r.Subject.ObjectType, + schema.ColUsersetObjectID: r.Subject.ObjectID, + schema.ColUsersetRelation: r.Subject.Relation, } } @@ -391,32 +392,32 @@ func (rwt *crdbReadWriteTXN) DeleteRelationships(ctx context.Context, filter *v1 query := rwt.queryDeleteTuples() if filter.ResourceType != "" { - query = query.Where(sq.Eq{colNamespace: filter.ResourceType}) + query = query.Where(sq.Eq{schema.ColNamespace: filter.ResourceType}) } if filter.OptionalResourceId != "" { - query = query.Where(sq.Eq{colObjectID: filter.OptionalResourceId}) + query = query.Where(sq.Eq{schema.ColObjectID: filter.OptionalResourceId}) } if filter.OptionalRelation != "" { - query = query.Where(sq.Eq{colRelation: filter.OptionalRelation}) + query = query.Where(sq.Eq{schema.ColRelation: filter.OptionalRelation}) } if filter.OptionalResourceIdPrefix != "" { if strings.Contains(filter.OptionalResourceIdPrefix, "%") { return false, fmt.Errorf("unable to delete relationships with a prefix containing the %% character") } - query = query.Where(sq.Like{colObjectID: filter.OptionalResourceIdPrefix + "%"}) + query = query.Where(sq.Like{schema.ColObjectID: filter.OptionalResourceIdPrefix + "%"}) } rwt.addOverlapKey(filter.ResourceType) // Add clauses for the SubjectFilter if subjectFilter := filter.OptionalSubjectFilter; subjectFilter != nil { - query = query.Where(sq.Eq{colUsersetNamespace: subjectFilter.SubjectType}) + query = query.Where(sq.Eq{schema.ColUsersetNamespace: subjectFilter.SubjectType}) if subjectFilter.OptionalSubjectId != "" { - query = query.Where(sq.Eq{colUsersetObjectID: subjectFilter.OptionalSubjectId}) + query = query.Where(sq.Eq{schema.ColUsersetObjectID: subjectFilter.OptionalSubjectId}) } if relationFilter := subjectFilter.OptionalRelation; relationFilter != nil { - query = query.Where(sq.Eq{colUsersetRelation: stringz.DefaultEmpty(relationFilter.Relation, datastore.Ellipsis)}) + query = query.Where(sq.Eq{schema.ColUsersetRelation: stringz.DefaultEmpty(relationFilter.Relation, datastore.Ellipsis)}) } rwt.addOverlapKey(subjectFilter.SubjectType) } @@ -481,7 +482,7 @@ func (rwt *crdbReadWriteTXN) WriteNamespaces(ctx context.Context, newConfigs ... func (rwt *crdbReadWriteTXN) DeleteNamespaces(ctx context.Context, nsNames ...string) error { querier := pgxcommon.QuerierFuncsFor(rwt.tx) - // For each namespace, check they exist and collect predicates for the + // For each namespace, check they exist and schema.Collect predicates for the // "WHERE" clause to delete the namespaces and associated tuples. nsClauses := make([]sq.Sqlizer, 0, len(nsNames)) tplClauses := make([]sq.Sqlizer, 0, len(nsNames)) @@ -495,8 +496,8 @@ func (rwt *crdbReadWriteTXN) DeleteNamespaces(ctx context.Context, nsNames ...st } for _, nsName := range nsNames { - nsClauses = append(nsClauses, sq.Eq{colNamespace: nsName, colTimestamp: timestamp}) - tplClauses = append(tplClauses, sq.Eq{colNamespace: nsName}) + nsClauses = append(nsClauses, sq.Eq{schema.ColNamespace: nsName, schema.ColTimestamp: timestamp}) + tplClauses = append(tplClauses, sq.Eq{schema.ColNamespace: nsName}) } } @@ -527,30 +528,30 @@ func (rwt *crdbReadWriteTXN) DeleteNamespaces(ctx context.Context, nsNames ...st } var copyCols = []string{ - colNamespace, - colObjectID, - colRelation, - colUsersetNamespace, - colUsersetObjectID, - colUsersetRelation, - colCaveatContextName, - colCaveatContext, - colExpiration, + schema.ColNamespace, + schema.ColObjectID, + schema.ColRelation, + schema.ColUsersetNamespace, + schema.ColUsersetObjectID, + schema.ColUsersetRelation, + schema.ColCaveatContextName, + schema.ColCaveatContext, + schema.ColExpiration, } var copyColsWithIntegrity = []string{ - colNamespace, - colObjectID, - colRelation, - colUsersetNamespace, - colUsersetObjectID, - colUsersetRelation, - colCaveatContextName, - colCaveatContext, - colExpiration, - colIntegrityKeyID, - colIntegrityHash, - colTimestamp, + schema.ColNamespace, + schema.ColObjectID, + schema.ColRelation, + schema.ColUsersetNamespace, + schema.ColUsersetObjectID, + schema.ColUsersetRelation, + schema.ColCaveatContextName, + schema.ColCaveatContext, + schema.ColExpiration, + schema.ColIntegrityKeyID, + schema.ColIntegrityHash, + schema.ColTimestamp, } func (rwt *crdbReadWriteTXN) BulkLoad(ctx context.Context, iter datastore.BulkWriteRelationshipSource) (uint64, error) { diff --git a/internal/datastore/crdb/schema/indexes.go b/internal/datastore/crdb/schema/indexes.go new file mode 100644 index 0000000000..f49000feba --- /dev/null +++ b/internal/datastore/crdb/schema/indexes.go @@ -0,0 +1,50 @@ +package schema + +import ( + "github.com/authzed/spicedb/internal/datastore/common" + "github.com/authzed/spicedb/pkg/datastore/queryshape" +) + +// IndexPrimaryKey is a synthetic index that represents the primary key of the relation_tuple table. +var IndexPrimaryKey = common.IndexDefinition{ + Name: "pk_relation_tuple", + ColumnsSQL: `PRIMARY KEY (namespace, object_id, relation, userset_namespace, userset_object_id, userset_relation)`, + Shapes: []queryshape.Shape{ + queryshape.CheckPermissionSelectDirectSubjects, + queryshape.CheckPermissionSelectIndirectSubjects, + }, +} + +// IndexRelationshipBySubject is an index for looking up relationships by subject. +var IndexRelationshipBySubject = common.IndexDefinition{ + Name: "ix_relation_tuple_by_subject", + ColumnsSQL: `relation_tuple (userset_object_id, userset_namespace, userset_relation, namespace, relation)`, + Shapes: []queryshape.Shape{ + queryshape.CheckPermissionSelectDirectSubjects, + queryshape.CheckPermissionSelectIndirectSubjects, + }, +} + +// IndexRelationshipBySubjectRelation is an index for looking up relationships by subject type and relation. +// Used by schema delta checking. +var IndexRelationshipBySubjectRelation = common.IndexDefinition{ + Name: "ix_relation_tuple_by_subject_relation", + ColumnsSQL: `relation_tuple (userset_namespace, userset_relation, namespace, relation)`, +} + +// IndexRelationshipWithIntegrity is an index for looking up relationships with integrity. +var IndexRelationshipWithIntegrity = common.IndexDefinition{ + Name: "ix_relation_tuple_with_integrity", + ColumnsSQL: `relation_tuple_with_integrity (namespace, object_id, relation, userset_namespace, userset_object_id, userset_relation) STORING (integrity_key_id, integrity_hash, timestamp, caveat_name, caveat_context)`, + Shapes: []queryshape.Shape{ + queryshape.CheckPermissionSelectDirectSubjects, + queryshape.CheckPermissionSelectIndirectSubjects, + }, +} + +var crdbIndexes = []common.IndexDefinition{ + IndexPrimaryKey, + IndexRelationshipBySubject, + IndexRelationshipBySubjectRelation, + IndexRelationshipWithIntegrity, +} diff --git a/internal/datastore/crdb/schema/schema.go b/internal/datastore/crdb/schema/schema.go new file mode 100644 index 0000000000..2f231deefd --- /dev/null +++ b/internal/datastore/crdb/schema/schema.go @@ -0,0 +1,75 @@ +package schema + +import ( + sq "github.com/Masterminds/squirrel" + + "github.com/authzed/spicedb/internal/datastore/common" +) + +const ( + TableNamespace = "namespace_config" + TableTuple = "relation_tuple" + TableTupleWithIntegrity = "relation_tuple_with_integrity" + TableTransactions = "transactions" + TableCaveat = "caveat" + TableRelationshipCounter = "relationship_counter" + TableTransactionMetadata = "transaction_metadata" + + ColNamespace = "namespace" + ColConfig = "serialized_config" + ColTimestamp = "timestamp" + ColTransactionKey = "key" + + ColObjectID = "object_id" + ColRelation = "relation" + + ColUsersetNamespace = "userset_namespace" + ColUsersetObjectID = "userset_object_id" + ColUsersetRelation = "userset_relation" + + ColCaveatName = "name" + ColCaveatDefinition = "definition" + ColCaveatContextName = "caveat_name" + ColCaveatContext = "caveat_context" + ColExpiration = "expires_at" + + ColIntegrityHash = "integrity_hash" + ColIntegrityKeyID = "integrity_key_id" + + ColCounterName = "name" + ColCounterSerializedFilter = "serialized_filter" + ColCounterCurrentCount = "current_count" + ColCounterUpdatedAt = "updated_at_timestamp" + ColExpiresAt = "expires_at" + ColMetadata = "metadata" +) + +func Schema(colOptimizationOpt common.ColumnOptimizationOption, withIntegrity bool, expirationDisabled bool) *common.SchemaInformation { + relTableName := TableTuple + if withIntegrity { + relTableName = TableTupleWithIntegrity + } + + return common.NewSchemaInformationWithOptions( + common.WithRelationshipTableName(relTableName), + common.WithColNamespace(ColNamespace), + common.WithColObjectID(ColObjectID), + common.WithColRelation(ColRelation), + common.WithColUsersetNamespace(ColUsersetNamespace), + common.WithColUsersetObjectID(ColUsersetObjectID), + common.WithColUsersetRelation(ColUsersetRelation), + common.WithColCaveatName(ColCaveatContextName), + common.WithColCaveatContext(ColCaveatContext), + common.WithColExpiration(ColExpiration), + common.WithColIntegrityKeyID(ColIntegrityKeyID), + common.WithColIntegrityHash(ColIntegrityHash), + common.WithColIntegrityTimestamp(ColTimestamp), + common.WithPaginationFilterType(common.ExpandedLogicComparison), + common.WithPlaceholderFormat(sq.Dollar), + common.WithNowFunction("NOW"), + common.WithColumnOptimization(colOptimizationOpt), + common.WithIntegrityEnabled(withIntegrity), + common.WithExpirationDisabled(expirationDisabled), + common.SetIndexes(crdbIndexes), + ) +} diff --git a/internal/datastore/crdb/watch.go b/internal/datastore/crdb/watch.go index 2008a743ad..5d83066e8b 100644 --- a/internal/datastore/crdb/watch.go +++ b/internal/datastore/crdb/watch.go @@ -17,6 +17,7 @@ import ( "github.com/authzed/spicedb/internal/datastore/common" "github.com/authzed/spicedb/internal/datastore/crdb/pool" + "github.com/authzed/spicedb/internal/datastore/crdb/schema" pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common" "github.com/authzed/spicedb/internal/datastore/revisions" "github.com/authzed/spicedb/pkg/datastore" @@ -126,13 +127,13 @@ func (cds *crdbDatastore) watch( defer func() { _ = conn.Close(ctx) }() tableNames := make([]string, 0, 4) - tableNames = append(tableNames, tableTransactionMetadata) + tableNames = append(tableNames, schema.TableTransactionMetadata) if opts.Content&datastore.WatchRelationships == datastore.WatchRelationships { tableNames = append(tableNames, cds.schema.RelationshipTableName) } if opts.Content&datastore.WatchSchema == datastore.WatchSchema { - tableNames = append(tableNames, tableNamespace) - tableNames = append(tableNames, tableCaveat) + tableNames = append(tableNames, schema.TableNamespace) + tableNames = append(tableNames, schema.TableCaveat) } if len(tableNames) == 0 { @@ -510,7 +511,7 @@ func (cds *crdbDatastore) processChanges(ctx context.Context, changes pgx.Rows, } } - case tableNamespace: + case schema.TableNamespace: if len(pkValues) != 1 { sendError(spiceerrors.MustBugf("expected a single definition name for the primary key in change feed. found: %s", string(primaryKeyValuesJSON))) return @@ -549,7 +550,7 @@ func (cds *crdbDatastore) processChanges(ctx context.Context, changes pgx.Rows, } } - case tableCaveat: + case schema.TableCaveat: if len(pkValues) != 1 { sendError(spiceerrors.MustBugf("expected a single definition name for the primary key in change feed. found: %s", string(primaryKeyValuesJSON))) return @@ -589,7 +590,7 @@ func (cds *crdbDatastore) processChanges(ctx context.Context, changes pgx.Rows, } } - case tableTransactionMetadata: + case schema.TableTransactionMetadata: if details.After != nil { rev, err := revisions.HLCRevisionFromString(details.Updated) if err != nil { diff --git a/internal/datastore/mysql/datastore.go b/internal/datastore/mysql/datastore.go index 875c61d661..7c48f04c45 100644 --- a/internal/datastore/mysql/datastore.go +++ b/internal/datastore/mysql/datastore.go @@ -262,6 +262,7 @@ func newMySQLDatastore(ctx context.Context, uri string, replicaIndex int, option common.WithNowFunction("NOW"), common.WithColumnOptimization(config.columnOptimizationOption), common.WithExpirationDisabled(config.expirationDisabled), + common.SetIndexes(indexes), ) store := &Datastore{ @@ -336,7 +337,7 @@ func (mds *Datastore) SnapshotReader(rev datastore.Revision) datastore.Reader { } executor := common.QueryRelationshipsExecutor{ - Executor: newMySQLExecutor(mds.db), + Executor: newMySQLExecutor(mds.db, mds), } return &mysqlReader{ @@ -379,7 +380,7 @@ func (mds *Datastore) ReadWriteTx( } executor := common.QueryRelationshipsExecutor{ - Executor: newMySQLExecutor(tx), + Executor: newMySQLExecutor(tx, mds), } rwt := &mysqlReadWriteTXN{ @@ -455,7 +456,7 @@ func (aqt asQueryableTx) QueryFunc(ctx context.Context, f func(context.Context, return f(ctx, rows) } -func newMySQLExecutor(tx querier) common.ExecuteReadRelsQueryFunc { +func newMySQLExecutor(tx querier, explainable datastore.Explainable) common.ExecuteReadRelsQueryFunc { // This implementation does not create a transaction because it's redundant for single statements, and it avoids // the network overhead and reduce contention on the connection pool. From MySQL docs: // @@ -472,7 +473,7 @@ func newMySQLExecutor(tx querier) common.ExecuteReadRelsQueryFunc { // Prepared statements are also not used given they perform poorly on environments where connections have // short lifetime (e.g. to gracefully handle load-balancer connection drain) return func(ctx context.Context, builder common.RelationshipsQueryBuilder) (datastore.RelationshipIterator, error) { - return common.QueryRelationships[common.Rows, structpbWrapper](ctx, builder, asQueryableTx{tx}) + return common.QueryRelationships[common.Rows, structpbWrapper](ctx, builder, asQueryableTx{tx}, explainable) } } diff --git a/internal/datastore/mysql/debug.go b/internal/datastore/mysql/debug.go new file mode 100644 index 0000000000..4939c52d41 --- /dev/null +++ b/internal/datastore/mysql/debug.go @@ -0,0 +1,52 @@ +package mysql + +import ( + "encoding/json" + "fmt" + + "github.com/authzed/spicedb/pkg/datastore" + "github.com/authzed/spicedb/pkg/genutil/mapz" +) + +type explain struct { + Query string `json:"query"` + Inputs []input `json:"inputs"` +} + +type input struct { + IndexName string `json:"index_name"` + AccessType string `json:"access_type"` +} + +// See: https://dev.mysql.com/doc/refman/8.4/en/explain.html +func (mds *Datastore) PreExplainStatements() []string { + return []string{ + "SET @@explain_json_format_version = 2;", + } +} + +func (mds *Datastore) BuildExplainQuery(sql string, args []interface{}) (string, []any, error) { + return "EXPLAIN FORMAT=JSON " + sql, args, nil +} + +func (mds *Datastore) ParseExplain(explainJSON string) (datastore.ParsedExplain, error) { + // Unmarshal the explain JSON. + parsed := explain{} + if err := json.Unmarshal([]byte(explainJSON), &parsed); err != nil { + return datastore.ParsedExplain{}, fmt.Errorf("could not parse explain: %w", err) + } + + // Extract the index name(s) used. + indexesUsed := mapz.NewSet[string]() + for _, input := range parsed.Inputs { + if input.AccessType == "index" && input.IndexName != "" && input.IndexName != "PRIMARY" { + indexesUsed.Add(input.IndexName) + } + } + + return datastore.ParsedExplain{ + IndexesUsed: indexesUsed.AsSlice(), + }, nil +} + +var _ datastore.SQLDatastore = &Datastore{} diff --git a/internal/datastore/mysql/debug_test.go b/internal/datastore/mysql/debug_test.go new file mode 100644 index 0000000000..22d1048c6a --- /dev/null +++ b/internal/datastore/mysql/debug_test.go @@ -0,0 +1,93 @@ +package mysql + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/authzed/spicedb/pkg/datastore" +) + +func TestParseExplain(t *testing.T) { + tcs := []struct { + name string + input string + output datastore.ParsedExplain + expectedError string + }{ + { + name: "empty", + input: "", + expectedError: "could not parse explain", + }, + { + name: "no index used", + input: `{ + "query": "select * from something", + "inputs": [ + { + "covering": false, + "table_name": "country", + "access_type": "some_other_type", + "estimated_rows": 17.0, + "estimated_total_cost": 3.668778400708174 + } + ] + }`, + output: datastore.ParsedExplain{}, + }, + { + name: "primary index used", + input: `{ + "query": "select * from something", + "inputs": [ + { + "covering": false, + "index_name": "PRIMARY", + "table_name": "country", + "access_type": "index", + "estimated_rows": 17.0, + "index_access_type": "index_range_scan", + "estimated_total_cost": 3.668778400708174 + } + ] + }`, + output: datastore.ParsedExplain{}, + }, + { + name: "index used", + input: `{ + "query": "select * from something", + "inputs": [ + { + "covering": false, + "index_name": "ix_some_index", + "table_name": "country", + "access_type": "index", + "estimated_rows": 17.0, + "index_access_type": "index_range_scan", + "estimated_total_cost": 3.668778400708174 + } + ] + }`, + output: datastore.ParsedExplain{ + IndexesUsed: []string{"ix_some_index"}, + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + parsed, err := (&Datastore{}).ParseExplain(tc.input) + + if tc.expectedError != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tc.expectedError) + return + } + + require.NoError(t, err) + require.Equal(t, tc.output, parsed) + }) + } +} diff --git a/internal/datastore/mysql/indexes.go b/internal/datastore/mysql/indexes.go new file mode 100644 index 0000000000..762146a448 --- /dev/null +++ b/internal/datastore/mysql/indexes.go @@ -0,0 +1,81 @@ +package mysql + +import ( + "github.com/authzed/spicedb/internal/datastore/common" + "github.com/authzed/spicedb/pkg/datastore/queryshape" +) + +// IndexUniqueLivingRelationships is the UNIQUE constraint index on +// living relationships. +var IndexUniqueLivingRelationships = common.IndexDefinition{ + Name: `uq_relation_tuple_living`, + ColumnsSQL: `UNIQUE (namespace, object_id, relation, userset_namespace, userset_object_id, userset_relation, deleted_transaction)`, + Shapes: []queryshape.Shape{ + queryshape.CheckPermissionSelectDirectSubjects, + }, +} + +// IndexUniqueRelationships is the UNIQUE constraint index on all relationships. +var IndexUniqueRelationships = common.IndexDefinition{ + Name: `uq_relation_tuple_namespace`, + ColumnsSQL: `UNIQUE (namespace, object_id, relation, userset_namespace, userset_object_id, userset_relation, created_transaction, deleted_transaction)`, +} + +// IndexRelationshipBySubject is the index on the relationship table for +// looking up relationships by subject. +var IndexRelationshipBySubject = common.IndexDefinition{ + Name: `ix_relation_tuple_by_subject`, + ColumnsSQL: `INDEX ix_relation_tuple_by_subject (userset_object_id, userset_namespace, userset_relation, namespace, relation)`, + Shapes: []queryshape.Shape{ + // TODO: this index isn't great to use for this query shape, so we probably need to fix + // the indexes + queryshape.CheckPermissionSelectDirectSubjects, + + queryshape.CheckPermissionSelectIndirectSubjects, + }, +} + +// IndexRelationshipBySubjectRelation is the index on the relationship table for +// looking up relationships by subject and relation. +var IndexRelationshipBySubjectRelation = common.IndexDefinition{ + Name: `ix_relation_tuple_by_subject_relation`, + ColumnsSQL: `INDEX ix_relation_tuple_by_subject_relation (userset_namespace, userset_relation, namespace, relation)`, + Shapes: []queryshape.Shape{ + // TODO: this index isn't great to use for these query shape, so we probably need to fix + // the indexes + queryshape.CheckPermissionSelectDirectSubjects, + queryshape.CheckPermissionSelectIndirectSubjects, + }, +} + +// IndexRelationshipByDeletedTransaction is the index on the relationship table for +// looking up relationships by deleted transaction. +var IndexRelationshipByDeletedTransaction = common.IndexDefinition{ + Name: `ix_relation_tuple_by_deleted_transaction`, + ColumnsSQL: `INDEX ix_relation_tuple_by_deleted_transaction (deleted_transaction)`, +} + +// IndexRelationTupleWatch is the index on the relationship table for +// watching relationships. +var IndexRelationTupleWatch = common.IndexDefinition{ + Name: `ix_relation_tuple_watch`, + ColumnsSQL: `INDEX ix_relation_tuple_watch (created_transaction, deleted_transaction DESC)`, +} + +// IndexRelationTupleExpired is the index on the relationship table for +// expired relationships. This is used for garbage collection of expired +// relationships. +var IndexRelationTupleExpired = common.IndexDefinition{ + Name: `ix_relation_tuple_expired`, + ColumnsSQL: `INDEX ix_relation_tuple_expired (expiration)`, +} + +var indexes = []common.IndexDefinition{ + IndexUniqueLivingRelationships, + IndexUniqueRelationships, + IndexRelationshipBySubject, + IndexRelationshipBySubjectRelation, + IndexRelationshipByDeletedTransaction, + IndexRelationTupleWatch, + IndexRelationTupleExpired, +} diff --git a/internal/datastore/mysql/reader.go b/internal/datastore/mysql/reader.go index c963fb808f..fa4ead57fc 100644 --- a/internal/datastore/mysql/reader.go +++ b/internal/datastore/mysql/reader.go @@ -201,6 +201,8 @@ func (mr *mysqlReader) ReverseQueryRelationships( options.WithLimit(queryOpts.LimitForReverse), options.WithAfter(queryOpts.AfterForReverse), options.WithSort(queryOpts.SortForReverse), + options.WithQueryShape(queryOpts.QueryShapeForReverse), + options.WithSQLExplainCallback(queryOpts.SQLExplainCallbackForReverse), ) } diff --git a/internal/datastore/mysql/version/version.go b/internal/datastore/mysql/version/version.go index be776323ce..c47fcdf7b8 100644 --- a/internal/datastore/mysql/version/version.go +++ b/internal/datastore/mysql/version/version.go @@ -3,4 +3,4 @@ package version // MinimumSupportedMySQLVersion is the minimum version of MySQL supported for this driver. // // NOTE: must match a tag on DockerHub for the `mysql` image. -const MinimumSupportedMySQLVersion = "8.0" +const MinimumSupportedMySQLVersion = "8.4" diff --git a/internal/datastore/postgres/caveat.go b/internal/datastore/postgres/caveat.go index 9c483c3b93..52e966ab06 100644 --- a/internal/datastore/postgres/caveat.go +++ b/internal/datastore/postgres/caveat.go @@ -7,6 +7,7 @@ import ( "github.com/jackc/pgx/v5" + "github.com/authzed/spicedb/internal/datastore/postgres/schema" "github.com/authzed/spicedb/pkg/datastore" "github.com/authzed/spicedb/pkg/genutil/mapz" core "github.com/authzed/spicedb/pkg/proto/core/v1" @@ -15,15 +16,15 @@ import ( ) var ( - writeCaveat = psql.Insert(tableCaveat).Columns(colCaveatName, colCaveatDefinition) + writeCaveat = psql.Insert(schema.TableCaveat).Columns(schema.ColCaveatName, schema.ColCaveatDefinition) listCaveat = psql. - Select(colCaveatDefinition, colCreatedXid). - From(tableCaveat). - OrderBy(colCaveatName) + Select(schema.ColCaveatDefinition, schema.ColCreatedXid). + From(schema.TableCaveat). + OrderBy(schema.ColCaveatName) readCaveat = psql. - Select(colCaveatDefinition, colCreatedXid). - From(tableCaveat) - deleteCaveat = psql.Update(tableCaveat).Where(sq.Eq{colDeletedXid: liveDeletedTxnID}) + Select(schema.ColCaveatDefinition, schema.ColCreatedXid). + From(schema.TableCaveat) + deleteCaveat = psql.Update(schema.TableCaveat).Where(sq.Eq{schema.ColDeletedXid: liveDeletedTxnID}) ) const ( @@ -35,7 +36,7 @@ const ( func (r *pgReader) ReadCaveatByName(ctx context.Context, name string) (*core.CaveatDefinition, datastore.Revision, error) { filteredReadCaveat := r.aliveFilter(readCaveat) - sql, args, err := filteredReadCaveat.Where(sq.Eq{colCaveatName: name}).ToSql() + sql, args, err := filteredReadCaveat.Where(sq.Eq{schema.ColCaveatName: name}).ToSql() if err != nil { return nil, datastore.NoRevision, fmt.Errorf(errReadCaveat, err) } @@ -76,7 +77,7 @@ func (r *pgReader) ListAllCaveats(ctx context.Context) ([]datastore.RevisionedCa func (r *pgReader) lookupCaveats(ctx context.Context, caveatNames []string) ([]datastore.RevisionedCaveat, error) { caveatsWithNames := listCaveat if len(caveatNames) > 0 { - caveatsWithNames = caveatsWithNames.Where(sq.Eq{colCaveatName: caveatNames}) + caveatsWithNames = caveatsWithNames.Where(sq.Eq{schema.ColCaveatName: caveatNames}) } filteredListCaveat := r.aliveFilter(caveatsWithNames) @@ -154,8 +155,8 @@ func (rwt *pgReadWriteTXN) DeleteCaveats(ctx context.Context, names []string) er func (rwt *pgReadWriteTXN) deleteCaveatsFromNames(ctx context.Context, names []string) error { sql, args, err := deleteCaveat. - Set(colDeletedXid, rwt.newXID). - Where(sq.Eq{colCaveatName: names}). + Set(schema.ColDeletedXid, rwt.newXID). + Where(sq.Eq{schema.ColCaveatName: names}). ToSql() if err != nil { return fmt.Errorf(errDeleteCaveats, err) diff --git a/internal/datastore/postgres/common/pgx.go b/internal/datastore/postgres/common/pgx.go index f0530cb754..2311751241 100644 --- a/internal/datastore/postgres/common/pgx.go +++ b/internal/datastore/postgres/common/pgx.go @@ -21,9 +21,9 @@ import ( ) // NewPGXQueryRelationshipsExecutor creates an executor that uses the pgx library to make the specified queries. -func NewPGXQueryRelationshipsExecutor(querier DBFuncQuerier) common.ExecuteReadRelsQueryFunc { +func NewPGXQueryRelationshipsExecutor(querier DBFuncQuerier, explainable datastore.Explainable) common.ExecuteReadRelsQueryFunc { return func(ctx context.Context, builder common.RelationshipsQueryBuilder) (datastore.RelationshipIterator, error) { - return common.QueryRelationships[pgx.Rows, map[string]any](ctx, builder, querier) + return common.QueryRelationships[pgx.Rows, map[string]any](ctx, builder, querier, explainable) } } diff --git a/internal/datastore/postgres/debug.go b/internal/datastore/postgres/debug.go new file mode 100644 index 0000000000..dde0c565e5 --- /dev/null +++ b/internal/datastore/postgres/debug.go @@ -0,0 +1,49 @@ +package postgres + +import ( + "encoding/json" + "fmt" + + "github.com/authzed/spicedb/pkg/datastore" + "github.com/authzed/spicedb/pkg/genutil/mapz" +) + +// See: https://www.postgresql.org/docs/current/sql-explain.html +type explain struct { + Plan plan `json:"Plan"` +} + +type plan struct { + NodeType string `json:"Node Type"` + IndexName string `json:"Index Name"` +} + +func (pgd *pgDatastore) PreExplainStatements() []string { + return nil +} + +func (pgd *pgDatastore) BuildExplainQuery(sql string, args []interface{}) (string, []any, error) { + return "EXPLAIN (FORMAT JSON) " + sql, args, nil +} + +func (pgd *pgDatastore) ParseExplain(explainJSON string) (datastore.ParsedExplain, error) { + // Unmarshal the explain JSON. + parsed := []explain{} + if err := json.Unmarshal([]byte(explainJSON), &parsed); err != nil { + return datastore.ParsedExplain{}, fmt.Errorf("could not parse explain: %w", err) + } + + // Extract the index name(s) used. + indexesUsed := mapz.NewSet[string]() + for _, p := range parsed { + if p.Plan.IndexName != "" { + indexesUsed.Add(p.Plan.IndexName) + } + } + + return datastore.ParsedExplain{ + IndexesUsed: indexesUsed.AsSlice(), + }, nil +} + +var _ datastore.SQLDatastore = &pgDatastore{} diff --git a/internal/datastore/postgres/debug_test.go b/internal/datastore/postgres/debug_test.go new file mode 100644 index 0000000000..ad9b227d69 --- /dev/null +++ b/internal/datastore/postgres/debug_test.go @@ -0,0 +1,72 @@ +package postgres + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/authzed/spicedb/pkg/datastore" +) + +func TestParseExplain(t *testing.T) { + tcs := []struct { + name string + input string + output datastore.ParsedExplain + expectedError string + }{ + { + name: "empty", + input: "", + expectedError: "could not parse explain", + }, + { + name: "no indexes used", + input: `[ + { + "Plan": { + "Node Type": "Seq Scan", + "Relation Name": "relation_tuple" + } + } + ]`, + output: datastore.ParsedExplain{}, + }, + { + name: "index used", + input: `[ + { + "Plan": { + "Node Type": "Seq Scan", + "Relation Name": "relation_tuple", + "Index Name": "idx_relation_tuple_namespace" + } + }, + { + "Plan": { + "Node Type": "Index Scan", + "Relation Name": "relation_tuple" + } + } + ]`, + output: datastore.ParsedExplain{ + IndexesUsed: []string{"idx_relation_tuple_namespace"}, + }, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + parsed, err := (&pgDatastore{}).ParseExplain(tc.input) + + if tc.expectedError != "" { + require.Error(t, err) + require.Contains(t, err.Error(), tc.expectedError) + return + } + + require.NoError(t, err) + require.Equal(t, tc.output, parsed) + }) + } +} diff --git a/internal/datastore/postgres/gc.go b/internal/datastore/postgres/gc.go index 5b478cb0e1..a860e1666f 100644 --- a/internal/datastore/postgres/gc.go +++ b/internal/datastore/postgres/gc.go @@ -9,6 +9,7 @@ import ( sq "github.com/Masterminds/squirrel" "github.com/authzed/spicedb/internal/datastore/common" + "github.com/authzed/spicedb/internal/datastore/postgres/schema" "github.com/authzed/spicedb/pkg/datastore" ) @@ -17,7 +18,7 @@ var ( // we are using "tableoid" to globally identify the row through the "ctid" in partitioned environments // as it's not guaranteed 2 rows in different partitions have different "ctid" values - // See https://www.postgresql.org/docs/current/ddl-system-columns.html#DDL-SYSTEM-COLUMNS-TABLEOID + // See https://www.postgresql.org/docs/current/ddl-system-schema.Columns.html#DDL-SYSTEM-COLUMNS-TABLEOID gcPKCols = []string{"tableoid", "ctid"} ) @@ -61,7 +62,7 @@ func (pgd *pgDatastore) Now(ctx context.Context) (time.Time, error) { func (pgd *pgDatastore) TxIDBefore(ctx context.Context, before time.Time) (datastore.Revision, error) { // Find the highest transaction ID before the GC window. - sql, args, err := getRevision.Where(sq.Lt{colTimestamp: before}).ToSql() + sql, args, err := getRevision.Where(sq.Lt{schema.ColTimestamp: before}).ToSql() if err != nil { return datastore.NoRevision, err } @@ -88,9 +89,9 @@ func (pgd *pgDatastore) DeleteExpiredRels(ctx context.Context) (int64, error) { return pgd.batchDelete( ctx, - tableTuple, + schema.TableTuple, gcPKCols, - sq.Lt{colExpiration: now.Add(-1 * pgd.gcWindow)}, + sq.Lt{schema.ColExpiration: now.Add(-1 * pgd.gcWindow)}, ) } @@ -103,12 +104,12 @@ func (pgd *pgDatastore) DeleteBeforeTx(ctx context.Context, txID datastore.Revis // Delete any relationship rows that were already dead when this transaction started removed.Relationships, err = pgd.batchDelete( ctx, - tableTuple, + schema.TableTuple, gcPKCols, - sq.Lt{colDeletedXid: minTxAlive}, + sq.Lt{schema.ColDeletedXid: minTxAlive}, ) if err != nil { - return removed, fmt.Errorf("failed to GC relationships table: %w", err) + return removed, fmt.Errorf("failed to GC relationships schema.Table: %w", err) } // Delete all transaction rows with ID < the transaction ID. @@ -117,23 +118,23 @@ func (pgd *pgDatastore) DeleteBeforeTx(ctx context.Context, txID datastore.Revis // one transaction present. removed.Transactions, err = pgd.batchDelete( ctx, - tableTransaction, + schema.TableTransaction, gcPKCols, - sq.Lt{colXID: minTxAlive}, + sq.Lt{schema.ColXID: minTxAlive}, ) if err != nil { - return removed, fmt.Errorf("failed to GC transactions table: %w", err) + return removed, fmt.Errorf("failed to GC transactions schema.Table: %w", err) } // Delete any namespace rows with deleted_transaction <= the transaction ID. removed.Namespaces, err = pgd.batchDelete( ctx, - tableNamespace, + schema.TableNamespace, gcPKCols, - sq.Lt{colDeletedXid: minTxAlive}, + sq.Lt{schema.ColDeletedXid: minTxAlive}, ) if err != nil { - return removed, fmt.Errorf("failed to GC namespaces table: %w", err) + return removed, fmt.Errorf("failed to GC namespaces schema.Table: %w", err) } return removed, err diff --git a/internal/datastore/postgres/migrations/index.go b/internal/datastore/postgres/migrations/index.go index 98df39753e..53371b0649 100644 --- a/internal/datastore/postgres/migrations/index.go +++ b/internal/datastore/postgres/migrations/index.go @@ -2,56 +2,19 @@ package migrations import ( "context" - "fmt" "github.com/jackc/pgx/v5" - "github.com/authzed/spicedb/internal/datastore/postgres/common" + "github.com/authzed/spicedb/internal/datastore/common" + "github.com/authzed/spicedb/internal/datastore/postgres/schema" ) -const createIndexTemplate = ` -CREATE INDEX CONCURRENTLY - %s - ON - %s` - -const dropIndexTemplate = ` - DROP INDEX CONCURRENTLY IF EXISTS - %s; -` - -const timeoutMessage = "This typically indicates that your database global statement_timeout needs to be increased and/or spicedb migrate command needs --migration-timeout increased (1h by default)" - -// createIndexConcurrently creates an index concurrently, dropping the existing index if it exists to ensure -// that indexes are not left in a partially constructed state. -// See: https://www.shayon.dev/post/2024/225/stop-relying-on-if-not-exists-for-concurrent-index-creation-in-postgresql/ -func createIndexConcurrently(ctx context.Context, conn *pgx.Conn, indexName, creationClause string) error { - dropIndexSQL := fmt.Sprintf(dropIndexTemplate, indexName) - if _, err := conn.Exec(ctx, dropIndexSQL); err != nil { - if common.IsQueryCanceledError(err) { - return fmt.Errorf( - "timed out while trying to drop index %s before recreating it: %w. %s", - indexName, - err, - timeoutMessage, - ) - } - - return fmt.Errorf("failed to drop index %s before creating it: %w", indexName, err) - } - - createIndexSQL := fmt.Sprintf(createIndexTemplate, indexName, creationClause) - if _, err := conn.Exec(ctx, createIndexSQL); err != nil { - if common.IsQueryCanceledError(err) { - return fmt.Errorf( - "timed out while trying to create index %s: %w. %s", - indexName, - err, - timeoutMessage, - ) - } - - return fmt.Errorf("failed to create index %s: %w", indexName, err) +func registerIndexMigration(index common.IndexDefinition, currentStep string, previousStep string) { + if err := DatabaseMigrations.Register(currentStep, previousStep, + func(ctx context.Context, conn *pgx.Conn) error { + return schema.CreateIndexConcurrently(ctx, conn, index) + }, + noTxMigration); err != nil { + panic("failed to register migration: " + err.Error()) } - return nil } diff --git a/internal/datastore/postgres/migrations/zz_migration.0020_add_watch_api_index.go b/internal/datastore/postgres/migrations/zz_migration.0020_add_watch_api_index.go index a2e99544d9..96a0d3ecf6 100644 --- a/internal/datastore/postgres/migrations/zz_migration.0020_add_watch_api_index.go +++ b/internal/datastore/postgres/migrations/zz_migration.0020_add_watch_api_index.go @@ -1,20 +1,11 @@ package migrations -import ( - "context" - - "github.com/jackc/pgx/v5" -) - -const watchAPIIndexToRelationTupleTable = ` - relation_tuple (created_xid);` +import "github.com/authzed/spicedb/internal/datastore/postgres/schema" func init() { - if err := DatabaseMigrations.Register("add-watch-api-index-to-relation-tuple-table", "add-metadata-to-transaction-table", - func(ctx context.Context, conn *pgx.Conn) error { - return createIndexConcurrently(ctx, conn, "ix_watch_index", watchAPIIndexToRelationTupleTable) - }, - noTxMigration); err != nil { - panic("failed to register migration: " + err.Error()) - } + registerIndexMigration( + schema.IndexWatchAPI, + "add-watch-api-index-to-relation-tuple-table", + "add-metadata-to-transaction-table", + ) } diff --git a/internal/datastore/postgres/migrations/zz_migration.0022_add_expiration_index.go b/internal/datastore/postgres/migrations/zz_migration.0022_add_expiration_index.go index b1e3f06b2c..8b2f39623f 100644 --- a/internal/datastore/postgres/migrations/zz_migration.0022_add_expiration_index.go +++ b/internal/datastore/postgres/migrations/zz_migration.0022_add_expiration_index.go @@ -1,23 +1,13 @@ package migrations import ( - "context" - - "github.com/jackc/pgx/v5" + "github.com/authzed/spicedb/internal/datastore/postgres/schema" ) -// Used for cleaning up expired relationships. -const expiredRelationshipsIndex = ` - relation_tuple (expiration) - WHERE expiration IS NOT NULL; -` - func init() { - if err := DatabaseMigrations.Register("add-expiration-cleanup-index", "add-expiration-support", - func(ctx context.Context, conn *pgx.Conn) error { - return createIndexConcurrently(ctx, conn, "ix_relation_tuple_expired", expiredRelationshipsIndex) - }, - noTxMigration); err != nil { - panic("failed to register migration: " + err.Error()) - } + registerIndexMigration( + schema.IndexExpiringRelationships, + "add-expiration-cleanup-index", + "add-expiration-support", + ) } diff --git a/internal/datastore/postgres/migrations/zz_migration.0023_add_index_for_transaction_gc.go b/internal/datastore/postgres/migrations/zz_migration.0023_add_index_for_transaction_gc.go index 03d979cc83..840c768347 100644 --- a/internal/datastore/postgres/migrations/zz_migration.0023_add_index_for_transaction_gc.go +++ b/internal/datastore/postgres/migrations/zz_migration.0023_add_index_for_transaction_gc.go @@ -1,34 +1,10 @@ package migrations -import ( - "context" - - "github.com/jackc/pgx/v5" -) - -// indexForRelationTupleTransaction adds a missing index to relation_tuple_transaction table -// to support garbage collection. This is in support of the query for selecting the most recent -// transaction: `SELECT xid, snapshot FROM relation_tuple_transaction WHERE timestamp < $1 ORDER BY xid DESC LIMIT 1` -// -// EXPLAIN before the index: -// Limit (cost=0.56..1.78 rows=1 width=558) (actual time=5706.155..5706.156 rows=1 loops=1) -// -> Index Scan Backward using pk_rttx on relation_tuple_transaction (cost=0.56..30428800.04 rows=25023202 width=558) (actual time=5706.154..5706.155 rows=1 loops=1) -// -// Filter: ("timestamp" < (now() - '04:00:00'::interval)) -// Rows Removed by Filter: 6638121 -// -// Planning Time: 0.098 ms -// Execution Time: 5706.192 ms -// (6 rows) -const indexForRelationTupleTransaction = ` - relation_tuple_transaction (xid DESC, timestamp);` +import "github.com/authzed/spicedb/internal/datastore/postgres/schema" func init() { - if err := DatabaseMigrations.Register("add-index-for-transaction-gc", "add-expiration-cleanup-index", - func(ctx context.Context, conn *pgx.Conn) error { - return createIndexConcurrently(ctx, conn, "ix_relation_tuple_transaction_xid_desc_timestamp", indexForRelationTupleTransaction) - }, - noTxMigration); err != nil { - panic("failed to register migration: " + err.Error()) - } + registerIndexMigration(schema.IndexSortedRelationTupleTransaction, + "add-index-for-transaction-gc", + "add-expiration-cleanup-index", + ) } diff --git a/internal/datastore/postgres/migrations/zz_migration.0024_add_new_forward_relationship_index.go b/internal/datastore/postgres/migrations/zz_migration.0024_add_new_forward_relationship_index.go new file mode 100644 index 0000000000..c2b89c178a --- /dev/null +++ b/internal/datastore/postgres/migrations/zz_migration.0024_add_new_forward_relationship_index.go @@ -0,0 +1,10 @@ +package migrations + +import "github.com/authzed/spicedb/internal/datastore/postgres/schema" + +func init() { + registerIndexMigration(schema.IndexForwardRelationships, + "add-new-forward-relationship-index", + "add-index-for-transaction-gc", + ) +} diff --git a/internal/datastore/postgres/migrations/zz_migration.0025_add_new_reverse_relationship_index.go b/internal/datastore/postgres/migrations/zz_migration.0025_add_new_reverse_relationship_index.go new file mode 100644 index 0000000000..064fddd1b5 --- /dev/null +++ b/internal/datastore/postgres/migrations/zz_migration.0025_add_new_reverse_relationship_index.go @@ -0,0 +1,10 @@ +package migrations + +import "github.com/authzed/spicedb/internal/datastore/postgres/schema" + +func init() { + registerIndexMigration(schema.IndexBackwardRelationships, + "add-new-reverse-relationship-index", + "add-new-forward-relationship-index", + ) +} diff --git a/internal/datastore/postgres/migrations/zz_migration.0026_drop_old_indexes.go b/internal/datastore/postgres/migrations/zz_migration.0026_drop_old_indexes.go new file mode 100644 index 0000000000..ee825f4a56 --- /dev/null +++ b/internal/datastore/postgres/migrations/zz_migration.0026_drop_old_indexes.go @@ -0,0 +1,46 @@ +package migrations + +import ( + "context" + "fmt" + + "github.com/jackc/pgx/v5" +) + +const dropOldTransactionIndex = ` + DROP INDEX CONCURRENTLY IF EXISTS ix_relation_tuple_transaction_by_timestamp; +` + +const dropOldRelationTupleIndex1 = ` + DROP INDEX CONCURRENTLY IF EXISTS ix_relation_tuple_by_subject; +` + +const dropOldRelationTupleIndex2 = ` + DROP INDEX CONCURRENTLY IF EXISTS ix_relation_tuple_by_subject_relation; +` + +const dropOldRelationTupleIndex3 = ` + DROP INDEX CONCURRENTLY IF EXISTS ix_relation_tuple_alive_by_resource_rel_subject_covering; +` + +func init() { + if err := DatabaseMigrations.Register("drop-old-indexes", "add-new-reverse-relationship-index", + func(ctx context.Context, conn *pgx.Conn) error { + if _, err := conn.Exec(ctx, dropOldTransactionIndex); err != nil { + return fmt.Errorf("failed to drop old relation tuple index 3: %w", err) + } + if _, err := conn.Exec(ctx, dropOldRelationTupleIndex1); err != nil { + return fmt.Errorf("failed to drop old relation tuple index 1: %w", err) + } + if _, err := conn.Exec(ctx, dropOldRelationTupleIndex2); err != nil { + return fmt.Errorf("failed to drop old relation tuple index 2: %w", err) + } + if _, err := conn.Exec(ctx, dropOldRelationTupleIndex3); err != nil { + return fmt.Errorf("failed to drop old relation tuple index 3: %w", err) + } + return nil + }, + noTxMigration); err != nil { + panic("failed to register migration: " + err.Error()) + } +} diff --git a/internal/datastore/postgres/postgres.go b/internal/datastore/postgres/postgres.go index 8f5d66b068..8a59b1cba2 100644 --- a/internal/datastore/postgres/postgres.go +++ b/internal/datastore/postgres/postgres.go @@ -28,6 +28,7 @@ import ( "github.com/authzed/spicedb/internal/datastore/common" pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common" "github.com/authzed/spicedb/internal/datastore/postgres/migrations" + "github.com/authzed/spicedb/internal/datastore/postgres/schema" "github.com/authzed/spicedb/internal/datastore/revisions" log "github.com/authzed/spicedb/internal/logging" "github.com/authzed/spicedb/pkg/datastore" @@ -40,36 +41,7 @@ func init() { } const ( - Engine = "postgres" - tableNamespace = "namespace_config" - tableTransaction = "relation_tuple_transaction" - tableTuple = "relation_tuple" - tableCaveat = "caveat" - tableRelationshipCounter = "relationship_counter" - - colXID = "xid" - colTimestamp = "timestamp" - colMetadata = "metadata" - colNamespace = "namespace" - colConfig = "serialized_config" - colCreatedXid = "created_xid" - colDeletedXid = "deleted_xid" - colSnapshot = "snapshot" - colObjectID = "object_id" - colRelation = "relation" - colUsersetNamespace = "userset_namespace" - colUsersetObjectID = "userset_object_id" - colUsersetRelation = "userset_relation" - colCaveatName = "name" - colCaveatDefinition = "definition" - colCaveatContextName = "caveat_name" - colCaveatContext = "caveat_context" - colExpiration = "expiration" - - colCounterName = "name" - colCounterFilter = "serialized_filter" - colCounterCurrentCount = "current_count" - colCounterSnapshot = "updated_revision_snapshot" + Engine = "postgres" errUnableToInstantiate = "unable to instantiate datastore" @@ -99,12 +71,12 @@ var ( psql = sq.StatementBuilder.PlaceholderFormat(sq.Dollar) getRevision = psql. - Select(colXID, colSnapshot). - From(tableTransaction). - OrderByClause(fmt.Sprintf("%s DESC", colXID)). + Select(schema.ColXID, schema.ColSnapshot). + From(schema.TableTransaction). + OrderByClause(fmt.Sprintf("%s DESC", schema.ColXID)). Limit(1) - createTxn = psql.Insert(tableTransaction).Columns(colMetadata) + createTxn = psql.Insert(schema.TableTransaction).Columns(schema.ColMetadata) getNow = psql.Select("NOW()") @@ -297,43 +269,25 @@ func newPostgresDatastore( } revisionQuery := fmt.Sprintf( querySelectRevision, - colXID, - tableTransaction, - colTimestamp, + schema.ColXID, + schema.TableTransaction, + schema.ColTimestamp, quantizationPeriodNanos, - colSnapshot, + schema.ColSnapshot, ) validTransactionQuery := fmt.Sprintf( queryValidTransaction, - colXID, - tableTransaction, - colTimestamp, + schema.ColXID, + schema.TableTransaction, + schema.ColTimestamp, config.gcWindow.Seconds(), - colSnapshot, + schema.ColSnapshot, ) maxRevisionStaleness := time.Duration(float64(config.revisionQuantization.Nanoseconds())* config.maxRevisionStalenessPercent) * time.Nanosecond - schema := common.NewSchemaInformationWithOptions( - common.WithRelationshipTableName(tableTuple), - common.WithColNamespace(colNamespace), - common.WithColObjectID(colObjectID), - common.WithColRelation(colRelation), - common.WithColUsersetNamespace(colUsersetNamespace), - common.WithColUsersetObjectID(colUsersetObjectID), - common.WithColUsersetRelation(colUsersetRelation), - common.WithColCaveatName(colCaveatContextName), - common.WithColCaveatContext(colCaveatContext), - common.WithColExpiration(colExpiration), - common.WithPaginationFilterType(common.TupleComparison), - common.WithPlaceholderFormat(sq.Dollar), - common.WithNowFunction("NOW"), - common.WithColumnOptimization(config.columnOptimizationOption), - common.WithExpirationDisabled(config.expirationDisabled), - ) - datastore := &pgDatastore{ CachedOptimizedRevisions: revisions.NewCachedOptimizedRevisions( maxRevisionStaleness, @@ -359,7 +313,7 @@ func newPostgresDatastore( isPrimary: isPrimary, inStrictReadMode: config.readStrictMode, filterMaximumIDCount: config.filterMaximumIDCount, - schema: *schema, + schema: *schema.Schema(config.columnOptimizationOption, config.expirationDisabled), } if isPrimary && config.readStrictMode { @@ -437,7 +391,7 @@ func (pgd *pgDatastore) SnapshotReader(revRaw datastore.Revision) datastore.Read } executor := common.QueryRelationshipsExecutor{ - Executor: pgxcommon.NewPGXQueryRelationshipsExecutor(queryFuncs), + Executor: pgxcommon.NewPGXQueryRelationshipsExecutor(queryFuncs, pgd), } return &pgReader{ @@ -480,7 +434,7 @@ func (pgd *pgDatastore) ReadWriteTx( queryFuncs := pgxcommon.QuerierFuncsFor(pgd.readPool) executor := common.QueryRelationshipsExecutor{ - Executor: pgxcommon.NewPGXQueryRelationshipsExecutor(queryFuncs), + Executor: pgxcommon.NewPGXQueryRelationshipsExecutor(queryFuncs, pgd), } rwt := &pgReadWriteTXN{ @@ -738,12 +692,12 @@ func (pgd *pgDatastore) OfflineFeatures() (*datastore.Features, error) { func buildLivingObjectFilterForRevision(revision postgresRevision) queryFilterer { createdBeforeTXN := sq.Expr(fmt.Sprintf( snapshotAlive, - colCreatedXid, + schema.ColCreatedXid, ), revision.snapshot, true) deletedAfterTXN := sq.Expr(fmt.Sprintf( snapshotAlive, - colDeletedXid, + schema.ColDeletedXid, ), revision.snapshot, false) return func(original sq.SelectBuilder) sq.SelectBuilder { @@ -752,7 +706,7 @@ func buildLivingObjectFilterForRevision(revision postgresRevision) queryFilterer } func currentlyLivingObjects(original sq.SelectBuilder) sq.SelectBuilder { - return original.Where(sq.Eq{colDeletedXid: liveDeletedTxnID}) + return original.Where(sq.Eq{schema.ColDeletedXid: liveDeletedTxnID}) } // DefaultQueryExecMode parses a Postgres URI and determines if a default_query_exec_mode diff --git a/internal/datastore/postgres/postgres_shared_test.go b/internal/datastore/postgres/postgres_shared_test.go index e6561b6cd1..7eb430443b 100644 --- a/internal/datastore/postgres/postgres_shared_test.go +++ b/internal/datastore/postgres/postgres_shared_test.go @@ -24,6 +24,7 @@ import ( "github.com/authzed/spicedb/internal/datastore/common" pgcommon "github.com/authzed/spicedb/internal/datastore/postgres/common" + "github.com/authzed/spicedb/internal/datastore/postgres/schema" pgversion "github.com/authzed/spicedb/internal/datastore/postgres/version" "github.com/authzed/spicedb/internal/datastore/proxy" "github.com/authzed/spicedb/internal/testfixtures" @@ -42,7 +43,7 @@ const ( veryLargeGCInterval = 90000 * time.Second ) -// Implement the TestableDatastore interface +// Implement the interface for testing datastores func (pgd *pgDatastore) ExampleRetryableError() error { return &pgconn.PgError{ Code: pgSerializationFailure, @@ -587,7 +588,7 @@ func TransactionTimestampsTest(t *testing.T, ds datastore.Datastore) { require.NoError(err) var ts time.Time - sql, args, err := psql.Select("timestamp").From(tableTransaction).Where(sq.Eq{"xid": txXID}).ToSql() + sql, args, err := psql.Select("timestamp").From(schema.TableTransaction).Where(sq.Eq{"xid": txXID}).ToSql() require.NoError(err) err = pgd.readPool.QueryRow(ctx, sql, args...).Scan(&ts) require.NoError(err) @@ -833,7 +834,7 @@ func QuantizedRevisionTest(t *testing.T, b testdatastore.RunningEngineForTest) { if len(tc.relativeTimes) > 0 { psql := sq.StatementBuilder.PlaceholderFormat(sq.Dollar) - insertTxn := psql.Insert(tableTransaction).Columns(colTimestamp) + insertTxn := psql.Insert(schema.TableTransaction).Columns(schema.ColTimestamp) for _, offset := range tc.relativeTimes { sql, args, err := insertTxn.Values(dbNow.Add(offset)).ToSql() @@ -846,11 +847,11 @@ func QuantizedRevisionTest(t *testing.T, b testdatastore.RunningEngineForTest) { queryRevision := fmt.Sprintf( querySelectRevision, - colXID, - tableTransaction, - colTimestamp, + schema.ColXID, + schema.TableTransaction, + schema.ColTimestamp, tc.quantization.Nanoseconds(), - colSnapshot, + schema.ColSnapshot, ) var revision xid8 @@ -860,8 +861,8 @@ func QuantizedRevisionTest(t *testing.T, b testdatastore.RunningEngineForTest) { require.NoError(err) queryFmt := "SELECT COUNT(%[1]s) FROM %[2]s WHERE pg_visible_in_snapshot(%[1]s, $1) = %[3]s;" - numLowerQuery := fmt.Sprintf(queryFmt, colXID, tableTransaction, "true") - numHigherQuery := fmt.Sprintf(queryFmt, colXID, tableTransaction, "false") + numLowerQuery := fmt.Sprintf(queryFmt, schema.ColXID, schema.TableTransaction, "true") + numHigherQuery := fmt.Sprintf(queryFmt, schema.ColXID, schema.TableTransaction, "false") var numLower, numHigher uint64 require.NoError(conn.QueryRow(ctx, numLowerQuery, snapshot).Scan(&numLower), "%s - %s", revision, snapshot) @@ -1082,9 +1083,9 @@ func OverlappingRevisionWatchTest(t *testing.T, ds datastore.Datastore) { err = pgx.BeginTxFunc(ctx, pds.writePool, pgx.TxOptions{IsoLevel: pgx.Serializable}, func(tx pgx.Tx) error { _, err := tx.Exec(ctx, fmt.Sprintf( `INSERT INTO %s ("%s", "%s") VALUES ('%d', '%d:%d:')`, - tableTransaction, - colXID, - colSnapshot, + schema.TableTransaction, + schema.ColXID, + schema.ColSnapshot, nexttx, nexttx, nexttx, @@ -1095,16 +1096,16 @@ func OverlappingRevisionWatchTest(t *testing.T, ds datastore.Datastore) { _, err = tx.Exec(ctx, fmt.Sprintf( `INSERT INTO %s ("%s", "%s", "%s", "%s", "%s", "%s", "%s", "%s", "%s") VALUES ('somenamespace', '123', 'viewer', 'user', '456', '...', '', null, '%d'::xid8)`, - tableTuple, - colNamespace, - colObjectID, - colRelation, - colUsersetNamespace, - colUsersetObjectID, - colUsersetRelation, - colCaveatContextName, - colCaveatContext, - colCreatedXid, + schema.TableTuple, + schema.ColNamespace, + schema.ColObjectID, + schema.ColRelation, + schema.ColUsersetNamespace, + schema.ColUsersetObjectID, + schema.ColUsersetRelation, + schema.ColCaveatContextName, + schema.ColCaveatContext, + schema.ColCreatedXid, nexttx, )) if err != nil { @@ -1113,7 +1114,7 @@ func OverlappingRevisionWatchTest(t *testing.T, ds datastore.Datastore) { _, err = tx.Exec(ctx, fmt.Sprintf( `INSERT INTO %s ("xid", "snapshot") VALUES ('%d', '%d:%d:')`, - tableTransaction, + schema.TableTransaction, nexttx+1, nexttx, nexttx, @@ -1124,16 +1125,16 @@ func OverlappingRevisionWatchTest(t *testing.T, ds datastore.Datastore) { _, err = tx.Exec(ctx, fmt.Sprintf( `INSERT INTO %s ("%s", "%s", "%s", "%s", "%s", "%s", "%s", "%s", "%s") VALUES ('somenamespace', '456', 'viewer', 'user', '456', '...', '', null, '%d'::xid8)`, - tableTuple, - colNamespace, - colObjectID, - colRelation, - colUsersetNamespace, - colUsersetObjectID, - colUsersetRelation, - colCaveatContextName, - colCaveatContext, - colCreatedXid, + schema.TableTuple, + schema.ColNamespace, + schema.ColObjectID, + schema.ColRelation, + schema.ColUsersetNamespace, + schema.ColUsersetObjectID, + schema.ColUsersetRelation, + schema.ColCaveatContextName, + schema.ColCaveatContext, + schema.ColCreatedXid, nexttx+1, )) @@ -1524,7 +1525,7 @@ func RepairTransactionsTest(t *testing.T, ds datastore.Datastore) { createLaterTxn := fmt.Sprintf( "INSERT INTO %s (\"xid\") VALUES (12345::text::xid8)", - tableTransaction, + schema.TableTransaction, ) _, err = pds.writePool.Exec(context.Background(), createLaterTxn) @@ -1534,7 +1535,7 @@ func RepairTransactionsTest(t *testing.T, ds datastore.Datastore) { err = pds.repairTransactionIDs(context.Background(), false) require.NoError(t, err) - // Ensure the current transaction ID is greater than the max specified in the transactions table. + // Ensure the current transaction ID is greater than the max specified in the transactions schema.Table. currentMaximumID := 0 err = pds.writePool.QueryRow(context.Background(), queryCurrentTransactionID).Scan(¤tMaximumID) require.NoError(t, err) diff --git a/internal/datastore/postgres/reader.go b/internal/datastore/postgres/reader.go index ed0ad792d1..2895897c98 100644 --- a/internal/datastore/postgres/reader.go +++ b/internal/datastore/postgres/reader.go @@ -10,6 +10,7 @@ import ( "github.com/authzed/spicedb/internal/datastore/common" pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common" + "github.com/authzed/spicedb/internal/datastore/postgres/schema" "github.com/authzed/spicedb/pkg/datastore" "github.com/authzed/spicedb/pkg/datastore/options" core "github.com/authzed/spicedb/pkg/proto/core/v1" @@ -26,15 +27,15 @@ type pgReader struct { type queryFilterer func(original sq.SelectBuilder) sq.SelectBuilder var ( - countRels = psql.Select("COUNT(*)").From(tableTuple) + countRels = psql.Select("COUNT(*)").From(schema.TableTuple) readNamespace = psql. - Select(colConfig, colCreatedXid). - From(tableNamespace) + Select(schema.ColConfig, schema.ColCreatedXid). + From(schema.TableNamespace) readCounters = psql. - Select(colCounterName, colCounterFilter, colCounterCurrentCount, colCounterSnapshot). - From(tableRelationshipCounter) + Select(schema.ColCounterName, schema.ColCounterFilter, schema.ColCounterCurrentCount, schema.ColCounterSnapshot). + From(schema.TableRelationshipCounter) ) const ( @@ -98,7 +99,7 @@ func (r *pgReader) LookupCounters(ctx context.Context) ([]datastore.Relationship func (r *pgReader) lookupCounters(ctx context.Context, optionalName string) ([]datastore.RelationshipCounter, error) { query := readCounters if optionalName != noFilterOnCounterName { - query = query.Where(sq.Eq{colCounterName: optionalName}) + query = query.Where(sq.Eq{schema.ColCounterName: optionalName}) } sql, args, err := r.aliveFilter(query).ToSql() @@ -184,6 +185,8 @@ func (r *pgReader) ReverseQueryRelationships( options.WithLimit(queryOpts.LimitForReverse), options.WithAfter(queryOpts.AfterForReverse), options.WithSort(queryOpts.SortForReverse), + options.WithQueryShape(queryOpts.QueryShapeForReverse), + options.WithSQLExplainCallback(queryOpts.SQLExplainCallbackForReverse), ) } @@ -204,7 +207,7 @@ func (r *pgReader) loadNamespace(ctx context.Context, namespace string, tx pgxco defer span.End() defs, err := loadAllNamespaces(ctx, tx, func(original sq.SelectBuilder) sq.SelectBuilder { - return filterer(original).Where(sq.Eq{colNamespace: namespace}) + return filterer(original).Where(sq.Eq{schema.ColNamespace: namespace}) }) if err != nil { return nil, postgresRevision{}, err @@ -233,7 +236,7 @@ func (r *pgReader) LookupNamespacesWithNames(ctx context.Context, nsNames []stri clause := sq.Or{} for _, nsName := range nsNames { - clause = append(clause, sq.Eq{colNamespace: nsName}) + clause = append(clause, sq.Eq{schema.ColNamespace: nsName}) } nsDefsWithRevisions, err := loadAllNamespaces(ctx, r.query, func(original sq.SelectBuilder) sq.SelectBuilder { diff --git a/internal/datastore/postgres/readwrite.go b/internal/datastore/postgres/readwrite.go index e135b158ad..f5aa75b586 100644 --- a/internal/datastore/postgres/readwrite.go +++ b/internal/datastore/postgres/readwrite.go @@ -20,6 +20,7 @@ import ( "github.com/authzed/spicedb/internal/datastore/common" pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common" + "github.com/authzed/spicedb/internal/datastore/postgres/schema" "github.com/authzed/spicedb/pkg/datastore" core "github.com/authzed/spicedb/pkg/proto/core/v1" "github.com/authzed/spicedb/pkg/tuple" @@ -35,48 +36,48 @@ const ( ) var ( - writeNamespace = psql.Insert(tableNamespace).Columns( - colNamespace, - colConfig, + writeNamespace = psql.Insert(schema.TableNamespace).Columns( + schema.ColNamespace, + schema.ColConfig, ) - deleteNamespace = psql.Update(tableNamespace).Where(sq.Eq{colDeletedXid: liveDeletedTxnID}) + deleteNamespace = psql.Update(schema.TableNamespace).Where(sq.Eq{schema.ColDeletedXid: liveDeletedTxnID}) - deleteNamespaceTuples = psql.Update(tableTuple).Where(sq.Eq{colDeletedXid: liveDeletedTxnID}) + deleteNamespaceTuples = psql.Update(schema.TableTuple).Where(sq.Eq{schema.ColDeletedXid: liveDeletedTxnID}) - writeTuple = psql.Insert(tableTuple).Columns( - colNamespace, - colObjectID, - colRelation, - colUsersetNamespace, - colUsersetObjectID, - colUsersetRelation, - colCaveatContextName, - colCaveatContext, - colExpiration, + writeTuple = psql.Insert(schema.TableTuple).Columns( + schema.ColNamespace, + schema.ColObjectID, + schema.ColRelation, + schema.ColUsersetNamespace, + schema.ColUsersetObjectID, + schema.ColUsersetRelation, + schema.ColCaveatContextName, + schema.ColCaveatContext, + schema.ColExpiration, ) - deleteTuple = psql.Update(tableTuple).Where(sq.Eq{colDeletedXid: liveDeletedTxnID}) + deleteTuple = psql.Update(schema.TableTuple).Where(sq.Eq{schema.ColDeletedXid: liveDeletedTxnID}) selectForDelete = psql.Select( - colNamespace, - colObjectID, - colRelation, - colUsersetNamespace, - colUsersetObjectID, - colUsersetRelation, - colCreatedXid, - ).From(tableTuple).Where(sq.Eq{colDeletedXid: liveDeletedTxnID}) - - writeRelationshipCounter = psql.Insert(tableRelationshipCounter).Columns( - colCounterName, - colCounterFilter, - colCounterCurrentCount, - colCounterSnapshot, + schema.ColNamespace, + schema.ColObjectID, + schema.ColRelation, + schema.ColUsersetNamespace, + schema.ColUsersetObjectID, + schema.ColUsersetRelation, + schema.ColCreatedXid, + ).From(schema.TableTuple).Where(sq.Eq{schema.ColDeletedXid: liveDeletedTxnID}) + + writeRelationshipCounter = psql.Insert(schema.TableRelationshipCounter).Columns( + schema.ColCounterName, + schema.ColCounterFilter, + schema.ColCounterCurrentCount, + schema.ColCounterSnapshot, ) - updateRelationshipCounter = psql.Update(tableRelationshipCounter).Where(sq.Eq{colDeletedXid: liveDeletedTxnID}) + updateRelationshipCounter = psql.Update(schema.TableRelationshipCounter).Where(sq.Eq{schema.ColDeletedXid: liveDeletedTxnID}) - deleteRelationshipCounter = psql.Update(tableRelationshipCounter).Where(sq.Eq{colDeletedXid: liveDeletedTxnID}) + deleteRelationshipCounter = psql.Update(schema.TableRelationshipCounter).Where(sq.Eq{schema.ColDeletedXid: liveDeletedTxnID}) ) type pgReadWriteTXN struct { @@ -101,7 +102,7 @@ func appendForInsertion(builder sq.InsertBuilder, tpl tuple.Relationship) sq.Ins tpl.Subject.ObjectID, tpl.Subject.Relation, caveatName, - caveatContext, // PGX driver serializes map[string]any to JSONB type columns, + caveatContext, // PGX driver serializes map[string]any to JSONB type schema.Columns, tpl.OptionalExpiration, } @@ -227,12 +228,12 @@ func (rwt *pgReadWriteTXN) WriteRelationships(ctx context.Context, mutations []t // that the operations over existing relationships no-op. if len(touchMutationsByNonCaveat) > 0 { touchInserts = touchInserts.Suffix(fmt.Sprintf("ON CONFLICT DO NOTHING RETURNING %s, %s, %s, %s, %s, %s", - colNamespace, - colObjectID, - colRelation, - colUsersetNamespace, - colUsersetObjectID, - colUsersetRelation, + schema.ColNamespace, + schema.ColObjectID, + schema.ColRelation, + schema.ColUsersetNamespace, + schema.ColUsersetObjectID, + schema.ColUsersetRelation, )) sql, args, err := touchInserts.ToSql() @@ -320,16 +321,16 @@ func (rwt *pgReadWriteTXN) WriteRelationships(ctx context.Context, mutations []t builder := deleteTuple. Where(deleteClauses). Suffix(fmt.Sprintf("RETURNING %s, %s, %s, %s, %s, %s", - colNamespace, - colObjectID, - colRelation, - colUsersetNamespace, - colUsersetObjectID, - colUsersetRelation, + schema.ColNamespace, + schema.ColObjectID, + schema.ColRelation, + schema.ColUsersetNamespace, + schema.ColUsersetObjectID, + schema.ColUsersetRelation, )) sql, args, err := builder. - Set(colDeletedXid, rwt.newXID). + Set(schema.ColDeletedXid, rwt.newXID). ToSql() if err != nil { return handleWriteError(err) @@ -439,30 +440,30 @@ func (rwt *pgReadWriteTXN) deleteRelationshipsWithLimit(ctx context.Context, fil query := selectForDelete if filter.ResourceType != "" { - query = query.Where(sq.Eq{colNamespace: filter.ResourceType}) + query = query.Where(sq.Eq{schema.ColNamespace: filter.ResourceType}) } if filter.OptionalResourceId != "" { - query = query.Where(sq.Eq{colObjectID: filter.OptionalResourceId}) + query = query.Where(sq.Eq{schema.ColObjectID: filter.OptionalResourceId}) } if filter.OptionalRelation != "" { - query = query.Where(sq.Eq{colRelation: filter.OptionalRelation}) + query = query.Where(sq.Eq{schema.ColRelation: filter.OptionalRelation}) } if filter.OptionalResourceIdPrefix != "" { if strings.Contains(filter.OptionalResourceIdPrefix, "%") { return false, fmt.Errorf("unable to delete relationships with a prefix containing the %% character") } - query = query.Where(sq.Like{colObjectID: filter.OptionalResourceIdPrefix + "%"}) + query = query.Where(sq.Like{schema.ColObjectID: filter.OptionalResourceIdPrefix + "%"}) } // Add clauses for the SubjectFilter if subjectFilter := filter.OptionalSubjectFilter; subjectFilter != nil { - query = query.Where(sq.Eq{colUsersetNamespace: subjectFilter.SubjectType}) + query = query.Where(sq.Eq{schema.ColUsersetNamespace: subjectFilter.SubjectType}) if subjectFilter.OptionalSubjectId != "" { - query = query.Where(sq.Eq{colUsersetObjectID: subjectFilter.OptionalSubjectId}) + query = query.Where(sq.Eq{schema.ColUsersetObjectID: subjectFilter.OptionalSubjectId}) } if relationFilter := subjectFilter.OptionalRelation; relationFilter != nil { - query = query.Where(sq.Eq{colUsersetRelation: stringz.DefaultEmpty(relationFilter.Relation, datastore.Ellipsis)}) + query = query.Where(sq.Eq{schema.ColUsersetRelation: stringz.DefaultEmpty(relationFilter.Relation, datastore.Ellipsis)}) } } @@ -479,16 +480,16 @@ func (rwt *pgReadWriteTXN) deleteRelationshipsWithLimit(ctx context.Context, fil cteSQL := fmt.Sprintf( "WITH found_tuples AS (%s)\nUPDATE %s SET %s = $%d WHERE (%s, %s, %s, %s, %s, %s, %s) IN (select * from found_tuples)", selectSQL, - tableTuple, - colDeletedXid, + schema.TableTuple, + schema.ColDeletedXid, len(args), - colNamespace, - colObjectID, - colRelation, - colUsersetNamespace, - colUsersetObjectID, - colUsersetRelation, - colCreatedXid, + schema.ColNamespace, + schema.ColObjectID, + schema.ColRelation, + schema.ColUsersetNamespace, + schema.ColUsersetObjectID, + schema.ColUsersetRelation, + schema.ColCreatedXid, ) result, err := rwt.tx.Exec(ctx, cteSQL, args...) @@ -503,34 +504,34 @@ func (rwt *pgReadWriteTXN) deleteRelationships(ctx context.Context, filter *v1.R // Add clauses for the ResourceFilter query := deleteTuple if filter.ResourceType != "" { - query = query.Where(sq.Eq{colNamespace: filter.ResourceType}) + query = query.Where(sq.Eq{schema.ColNamespace: filter.ResourceType}) } if filter.OptionalResourceId != "" { - query = query.Where(sq.Eq{colObjectID: filter.OptionalResourceId}) + query = query.Where(sq.Eq{schema.ColObjectID: filter.OptionalResourceId}) } if filter.OptionalRelation != "" { - query = query.Where(sq.Eq{colRelation: filter.OptionalRelation}) + query = query.Where(sq.Eq{schema.ColRelation: filter.OptionalRelation}) } if filter.OptionalResourceIdPrefix != "" { if strings.Contains(filter.OptionalResourceIdPrefix, "%") { return fmt.Errorf("unable to delete relationships with a prefix containing the %% character") } - query = query.Where(sq.Like{colObjectID: filter.OptionalResourceIdPrefix + "%"}) + query = query.Where(sq.Like{schema.ColObjectID: filter.OptionalResourceIdPrefix + "%"}) } // Add clauses for the SubjectFilter if subjectFilter := filter.OptionalSubjectFilter; subjectFilter != nil { - query = query.Where(sq.Eq{colUsersetNamespace: subjectFilter.SubjectType}) + query = query.Where(sq.Eq{schema.ColUsersetNamespace: subjectFilter.SubjectType}) if subjectFilter.OptionalSubjectId != "" { - query = query.Where(sq.Eq{colUsersetObjectID: subjectFilter.OptionalSubjectId}) + query = query.Where(sq.Eq{schema.ColUsersetObjectID: subjectFilter.OptionalSubjectId}) } if relationFilter := subjectFilter.OptionalRelation; relationFilter != nil { - query = query.Where(sq.Eq{colUsersetRelation: stringz.DefaultEmpty(relationFilter.Relation, datastore.Ellipsis)}) + query = query.Where(sq.Eq{schema.ColUsersetRelation: stringz.DefaultEmpty(relationFilter.Relation, datastore.Ellipsis)}) } } - sql, args, err := query.Set(colDeletedXid, rwt.newXID).ToSql() + sql, args, err := query.Set(schema.ColDeletedXid, rwt.newXID).ToSql() if err != nil { return fmt.Errorf(errUnableToDeleteRelationships, err) } @@ -552,7 +553,7 @@ func (rwt *pgReadWriteTXN) WriteNamespaces(ctx context.Context, newConfigs ...*c return fmt.Errorf(errUnableToWriteConfig, err) } - deletedNamespaceClause = append(deletedNamespaceClause, sq.Eq{colNamespace: newNamespace.Name}) + deletedNamespaceClause = append(deletedNamespaceClause, sq.Eq{schema.ColNamespace: newNamespace.Name}) valuesToWrite := []interface{}{newNamespace.Name, serialized} @@ -560,7 +561,7 @@ func (rwt *pgReadWriteTXN) WriteNamespaces(ctx context.Context, newConfigs ...*c } delSQL, delArgs, err := deleteNamespace. - Set(colDeletedXid, rwt.newXID). + Set(schema.ColDeletedXid, rwt.newXID). Where(deletedNamespaceClause). ToSql() if err != nil { @@ -586,7 +587,7 @@ func (rwt *pgReadWriteTXN) WriteNamespaces(ctx context.Context, newConfigs ...*c func (rwt *pgReadWriteTXN) DeleteNamespaces(ctx context.Context, nsNames ...string) error { aliveFilter := func(original sq.SelectBuilder) sq.SelectBuilder { - return original.Where(sq.Eq{colDeletedXid: liveDeletedTxnID}) + return original.Where(sq.Eq{schema.ColDeletedXid: liveDeletedTxnID}) } nsClauses := make([]sq.Sqlizer, 0, len(nsNames)) @@ -599,8 +600,8 @@ func (rwt *pgReadWriteTXN) DeleteNamespaces(ctx context.Context, nsNames ...stri return err case err == nil: - nsClauses = append(nsClauses, sq.Eq{colNamespace: nsName}) - tplClauses = append(tplClauses, sq.Eq{colNamespace: nsName}) + nsClauses = append(nsClauses, sq.Eq{schema.ColNamespace: nsName}) + tplClauses = append(tplClauses, sq.Eq{schema.ColNamespace: nsName}) default: return fmt.Errorf(errUnableToDeleteConfig, err) @@ -608,7 +609,7 @@ func (rwt *pgReadWriteTXN) DeleteNamespaces(ctx context.Context, nsNames ...stri } delSQL, delArgs, err := deleteNamespace. - Set(colDeletedXid, rwt.newXID). + Set(schema.ColDeletedXid, rwt.newXID). Where(sq.Or(nsClauses)). ToSql() if err != nil { @@ -621,7 +622,7 @@ func (rwt *pgReadWriteTXN) DeleteNamespaces(ctx context.Context, nsNames ...stri } deleteTupleSQL, deleteTupleArgs, err := deleteNamespaceTuples. - Set(colDeletedXid, rwt.newXID). + Set(schema.ColDeletedXid, rwt.newXID). Where(sq.Or(tplClauses)). ToSql() if err != nil { @@ -683,10 +684,10 @@ func (rwt *pgReadWriteTXN) UnregisterCounter(ctx context.Context, name string) e return datastore.NewCounterNotRegisteredErr(name) } - deleteQuery := deleteRelationshipCounter.Where(sq.Eq{colCounterName: name}) + deleteQuery := deleteRelationshipCounter.Where(sq.Eq{schema.ColCounterName: name}) delSQL, delArgs, err := deleteQuery. - Set(colDeletedXid, rwt.newXID). + Set(schema.ColDeletedXid, rwt.newXID). ToSql() if err != nil { return fmt.Errorf(errUnableToDeleteConfig, err) @@ -715,11 +716,11 @@ func (rwt *pgReadWriteTXN) StoreCounterValue(ctx context.Context, name string, v // Update the counter. updateQuery := updateRelationshipCounter. - Set(colCounterCurrentCount, value). - Set(colCounterSnapshot, computedAtRevisionSnapshot) + Set(schema.ColCounterCurrentCount, value). + Set(schema.ColCounterSnapshot, computedAtRevisionSnapshot) sql, args, err := updateQuery. - Where(sq.Eq{colCounterName: name}). + Where(sq.Eq{schema.ColCounterName: name}). ToSql() if err != nil { return fmt.Errorf(errUnableToWriteRelationshipsCounter, err) @@ -733,29 +734,29 @@ func (rwt *pgReadWriteTXN) StoreCounterValue(ctx context.Context, name string, v } var copyCols = []string{ - colNamespace, - colObjectID, - colRelation, - colUsersetNamespace, - colUsersetObjectID, - colUsersetRelation, - colCaveatContextName, - colCaveatContext, - colExpiration, + schema.ColNamespace, + schema.ColObjectID, + schema.ColRelation, + schema.ColUsersetNamespace, + schema.ColUsersetObjectID, + schema.ColUsersetRelation, + schema.ColCaveatContextName, + schema.ColCaveatContext, + schema.ColExpiration, } func (rwt *pgReadWriteTXN) BulkLoad(ctx context.Context, iter datastore.BulkWriteRelationshipSource) (uint64, error) { - return pgxcommon.BulkLoad(ctx, rwt.tx, tableTuple, copyCols, iter) + return pgxcommon.BulkLoad(ctx, rwt.tx, schema.TableTuple, copyCols, iter) } func exactRelationshipClause(r tuple.Relationship) sq.Eq { return sq.Eq{ - colNamespace: r.Resource.ObjectType, - colObjectID: r.Resource.ObjectID, - colRelation: r.Resource.Relation, - colUsersetNamespace: r.Subject.ObjectType, - colUsersetObjectID: r.Subject.ObjectID, - colUsersetRelation: r.Subject.Relation, + schema.ColNamespace: r.Resource.ObjectType, + schema.ColObjectID: r.Resource.ObjectID, + schema.ColRelation: r.Resource.Relation, + schema.ColUsersetNamespace: r.Subject.ObjectType, + schema.ColUsersetObjectID: r.Subject.ObjectID, + schema.ColUsersetRelation: r.Subject.Relation, } } @@ -770,18 +771,18 @@ func exactRelationshipDifferentCaveatAndExpirationClause(r tuple.Relationship) s expiration := r.OptionalExpiration return sq.And{ sq.Eq{ - colNamespace: r.Resource.ObjectType, - colObjectID: r.Resource.ObjectID, - colRelation: r.Resource.Relation, - colUsersetNamespace: r.Subject.ObjectType, - colUsersetObjectID: r.Subject.ObjectID, - colUsersetRelation: r.Subject.Relation, + schema.ColNamespace: r.Resource.ObjectType, + schema.ColObjectID: r.Resource.ObjectID, + schema.ColRelation: r.Resource.Relation, + schema.ColUsersetNamespace: r.Subject.ObjectType, + schema.ColUsersetObjectID: r.Subject.ObjectID, + schema.ColUsersetRelation: r.Subject.Relation, }, sq.Or{ - sq.Expr(fmt.Sprintf(`%s IS DISTINCT FROM ?`, colCaveatContextName), caveatName), - sq.Expr(fmt.Sprintf(`%s IS DISTINCT FROM ?`, colExpiration), expiration), + sq.Expr(fmt.Sprintf(`%s IS DISTINCT FROM ?`, schema.ColCaveatContextName), caveatName), + sq.Expr(fmt.Sprintf(`%s IS DISTINCT FROM ?`, schema.ColExpiration), expiration), sq.NotEq{ - colCaveatContext: caveatContext, + schema.ColCaveatContext: caveatContext, }, }, } diff --git a/internal/datastore/postgres/revisions.go b/internal/datastore/postgres/revisions.go index 0e524aabef..3ce882d4ca 100644 --- a/internal/datastore/postgres/revisions.go +++ b/internal/datastore/postgres/revisions.go @@ -13,6 +13,7 @@ import ( "github.com/jackc/pgx/v5" "github.com/authzed/spicedb/internal/datastore/postgres/common" + "github.com/authzed/spicedb/internal/datastore/postgres/schema" "github.com/authzed/spicedb/pkg/datastore" implv1 "github.com/authzed/spicedb/pkg/proto/impl/v1" "github.com/authzed/spicedb/pkg/spiceerrors" @@ -274,7 +275,7 @@ func createNewTransaction(ctx context.Context, tx pgx.Tx, metadata map[string]an metadata = emptyMetadata } - sql, args, err := createTxn.Values(metadata).Suffix("RETURNING " + colXID + ", " + colSnapshot).ToSql() + sql, args, err := createTxn.Values(metadata).Suffix("RETURNING " + schema.ColXID + ", " + schema.ColSnapshot).ToSql() if err != nil { return } diff --git a/internal/datastore/postgres/schema/indexes.go b/internal/datastore/postgres/schema/indexes.go new file mode 100644 index 0000000000..e9deb66686 --- /dev/null +++ b/internal/datastore/postgres/schema/indexes.go @@ -0,0 +1,77 @@ +package schema + +import ( + "github.com/authzed/spicedb/internal/datastore/common" + "github.com/authzed/spicedb/pkg/datastore/queryshape" +) + +// IndexForwardRelationships is an index for forward relationships. It is used for +// the CheckPermission, Expand and LookupSubjects APIs, as well as reading relationships, +// and the forward checks for schema diffs. +var IndexForwardRelationships = common.IndexDefinition{ + Name: `ix_relationship_covering_index_by_resource`, + ColumnsSQL: `relation_tuple (namespace, relation, object_id, userset_namespace, userset_relation, userset_object_id) INCLUDE (expiration, created_xid, deleted_xid)`, + Shapes: []queryshape.Shape{ + queryshape.CheckPermissionSelectDirectSubjects, + queryshape.CheckPermissionSelectIndirectSubjects, + }, +} + +// IndexBackwardRelationships is an index for backward relationships. It is used for +// the LookupResources API, as well as reading relationships, and the backward checks for schema diffs. +var IndexBackwardRelationships = common.IndexDefinition{ + Name: `ix_relationship_covering_index_by_subject`, + ColumnsSQL: `relation_tuple (userset_namespace, userset_relation, userset_object_id, namespace, relation, object_id) INCLUDE (expiration, created_xid, deleted_xid)`, + Shapes: []queryshape.Shape{ + queryshape.CheckPermissionSelectDirectSubjects, + queryshape.CheckPermissionSelectIndirectSubjects, + }, +} + +// IndexWatchAPI is an index for the Watch API. It is used for the Watch API, and provides +// filtering of alive relationships. +var IndexWatchAPI = common.IndexDefinition{ + Name: `ix_watch_api_index`, + ColumnsSQL: `relation_tuple (created_xid)`, +} + +// IndexExpiringRelationships is an index for (possibly) expiring relationships. It is used +// for the GC process which checks for expired relationships. +var IndexExpiringRelationships = common.IndexDefinition{ + Name: `ix_relation_tuple_expired`, + ColumnsSQL: `relation_tuple (expiration) WHERE expiration IS NOT NULL`, +} + +// IndexSortedRelationTupleTransaction adds an index to relation_tuple_transaction table +// to support garbage collection. This is in support of the query for selecting the most recent +// transaction: `SELECT xid, snapshot FROM relation_tuple_transaction WHERE timestamp < $1 ORDER BY xid DESC LIMIT 1` +// +// EXPLAIN before the index: +// Limit (cost=0.56..1.78 rows=1 width=558) (actual time=5706.155..5706.156 rows=1 loops=1) +// -> Index Scan Backward using pk_rttx on relation_tuple_transaction (cost=0.56..30428800.04 rows=25023202 width=558) (actual time=5706.154..5706.155 rows=1 loops=1) +// +// Filter: ("timestamp" < (now() - '04:00:00'::interval)) +// Rows Removed by Filter: 6638121 +// +// Planning Time: 0.098 ms +// Execution Time: 5706.192 ms +var IndexSortedRelationTupleTransaction = common.IndexDefinition{ + Name: `ix_relation_tuple_transaction_xid_desc_timestamp`, + ColumnsSQL: `relation_tuple_transaction (xid DESC, timestamp)`, +} + +// IndexGCDeadRelationships is an index for the GC process to quickly find dead relationships +// to be garbage collected. +var IndexGCDeadRelationships = common.IndexDefinition{ + Name: `ix_gc_index`, + ColumnsSQL: `relation_tuple (deleted_xid DESC) WHERE deleted_xid < '9223372036854775807'::xid8`, +} + +var pgIndexes = []common.IndexDefinition{ + IndexForwardRelationships, + IndexBackwardRelationships, + IndexWatchAPI, + IndexExpiringRelationships, + IndexSortedRelationTupleTransaction, + IndexGCDeadRelationships, +} diff --git a/internal/datastore/postgres/schema/indexutil.go b/internal/datastore/postgres/schema/indexutil.go new file mode 100644 index 0000000000..d71577f334 --- /dev/null +++ b/internal/datastore/postgres/schema/indexutil.go @@ -0,0 +1,58 @@ +package schema + +import ( + "context" + "fmt" + + "github.com/jackc/pgx/v5" + + "github.com/authzed/spicedb/internal/datastore/common" + pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common" +) + +const createIndexTemplate = ` +CREATE INDEX CONCURRENTLY + %s + ON + %s` + +const dropIndexTemplate = ` + DROP INDEX CONCURRENTLY IF EXISTS + %s; +` + +const timeoutMessage = "This typically indicates that your database global statement_timeout needs to be increased and/or spicedb migrate command needs --migration-timeout increased (1h by default)" + +// CreateIndexConcurrently creates an index concurrently, dropping the existing index if it exists to ensure +// that indexes are not left in a partially constructed state. +// See: https://www.shayon.dev/post/2024/225/stop-relying-on-if-not-exists-for-concurrent-index-creation-in-postgresql/ +func CreateIndexConcurrently(ctx context.Context, conn *pgx.Conn, index common.IndexDefinition) error { + dropIndexSQL := fmt.Sprintf(dropIndexTemplate, index.Name) + if _, err := conn.Exec(ctx, dropIndexSQL); err != nil { + if pgxcommon.IsQueryCanceledError(err) { + return fmt.Errorf( + "timed out while trying to drop index %s before recreating it: %w. %s", + index.Name, + err, + timeoutMessage, + ) + } + + return fmt.Errorf("failed to drop index %s before creating it: %w", index.Name, err) + } + + createIndexSQL := fmt.Sprintf(createIndexTemplate, index.Name, index.ColumnsSQL) + if _, err := conn.Exec(ctx, createIndexSQL); err != nil { + if pgxcommon.IsQueryCanceledError(err) { + return fmt.Errorf( + "timed out while trying to create index %s: %w. %s", + index.Name, + err, + timeoutMessage, + ) + } + + return fmt.Errorf("failed to create index %s: %w", index.Name, err) + } + return nil +} diff --git a/internal/datastore/postgres/schema/schema.go b/internal/datastore/postgres/schema/schema.go new file mode 100644 index 0000000000..b210f1d893 --- /dev/null +++ b/internal/datastore/postgres/schema/schema.go @@ -0,0 +1,60 @@ +package schema + +import ( + sq "github.com/Masterminds/squirrel" + + "github.com/authzed/spicedb/internal/datastore/common" +) + +const ( + TableNamespace = "namespace_config" + TableTransaction = "relation_tuple_transaction" + TableTuple = "relation_tuple" + TableCaveat = "caveat" + TableRelationshipCounter = "relationship_counter" + + ColXID = "xid" + ColTimestamp = "timestamp" + ColMetadata = "metadata" + ColNamespace = "namespace" + ColConfig = "serialized_config" + ColCreatedXid = "created_xid" + ColDeletedXid = "deleted_xid" + ColSnapshot = "snapshot" + ColObjectID = "object_id" + ColRelation = "relation" + ColUsersetNamespace = "userset_namespace" + ColUsersetObjectID = "userset_object_id" + ColUsersetRelation = "userset_relation" + ColCaveatName = "name" + ColCaveatDefinition = "definition" + ColCaveatContextName = "caveat_name" + ColCaveatContext = "caveat_context" + ColExpiration = "expiration" + + ColCounterName = "name" + ColCounterFilter = "serialized_filter" + ColCounterCurrentCount = "current_count" + ColCounterSnapshot = "updated_revision_snapshot" +) + +func Schema(colOptimizationOpt common.ColumnOptimizationOption, expirationDisabled bool) *common.SchemaInformation { + return common.NewSchemaInformationWithOptions( + common.WithRelationshipTableName(TableTuple), + common.WithColNamespace(ColNamespace), + common.WithColObjectID(ColObjectID), + common.WithColRelation(ColRelation), + common.WithColUsersetNamespace(ColUsersetNamespace), + common.WithColUsersetObjectID(ColUsersetObjectID), + common.WithColUsersetRelation(ColUsersetRelation), + common.WithColCaveatName(ColCaveatContextName), + common.WithColCaveatContext(ColCaveatContext), + common.WithColExpiration(ColExpiration), + common.WithPaginationFilterType(common.TupleComparison), + common.WithPlaceholderFormat(sq.Dollar), + common.WithNowFunction("NOW"), + common.WithColumnOptimization(colOptimizationOpt), + common.WithExpirationDisabled(expirationDisabled), + common.SetIndexes(pgIndexes), + ) +} diff --git a/internal/datastore/postgres/stats.go b/internal/datastore/postgres/stats.go index 0e0bea63f6..e15fb2d836 100644 --- a/internal/datastore/postgres/stats.go +++ b/internal/datastore/postgres/stats.go @@ -8,6 +8,7 @@ import ( "github.com/jackc/pgx/v5" pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common" + "github.com/authzed/spicedb/internal/datastore/postgres/schema" "github.com/authzed/spicedb/pkg/datastore" ) @@ -25,7 +26,7 @@ var ( queryEstimatedRowCount = psql. Select(colReltuples). From(tablePGClass). - Where(sq.Eq{colRelname: tableTuple}) + Where(sq.Eq{colRelname: schema.TableTuple}) ) func (pgd *pgDatastore) datastoreUniqueID(ctx context.Context) (string, error) { @@ -52,7 +53,7 @@ func (pgd *pgDatastore) Statistics(ctx context.Context) (datastore.Stats, error) } aliveFilter := func(original sq.SelectBuilder) sq.SelectBuilder { - return original.Where(sq.Eq{colDeletedXid: liveDeletedTxnID}) + return original.Where(sq.Eq{schema.ColDeletedXid: liveDeletedTxnID}) } var uniqueID string @@ -60,7 +61,7 @@ func (pgd *pgDatastore) Statistics(ctx context.Context) (datastore.Stats, error) var relCount float64 if err := pgx.BeginTxFunc(ctx, pgd.readPool, pgd.readTxOptions, func(tx pgx.Tx) error { if pgd.analyzeBeforeStatistics { - if _, err := tx.Exec(ctx, "ANALYZE "+tableTuple); err != nil { + if _, err := tx.Exec(ctx, "ANALYZE "+schema.TableTuple); err != nil { return fmt.Errorf("unable to analyze tuple table: %w", err) } } diff --git a/internal/datastore/postgres/watch.go b/internal/datastore/postgres/watch.go index 45b3bb64dd..090aebffe4 100644 --- a/internal/datastore/postgres/watch.go +++ b/internal/datastore/postgres/watch.go @@ -13,6 +13,7 @@ import ( "github.com/authzed/spicedb/internal/datastore/common" pgxcommon "github.com/authzed/spicedb/internal/datastore/postgres/common" + "github.com/authzed/spicedb/internal/datastore/postgres/schema" "github.com/authzed/spicedb/pkg/datastore" core "github.com/authzed/spicedb/pkg/proto/core/v1" "github.com/authzed/spicedb/pkg/spiceerrors" @@ -26,39 +27,39 @@ const ( var ( // This query must cast an xid8 to xid, which is a safe operation as long as the // xid8 is one of the last ~2 billion transaction IDs generated. We should be garbage - // collecting these transactions long before we get to that point. + // schema.Collecting these transactions long before we get to that point. newRevisionsQuery = fmt.Sprintf(` SELECT %[1]s, %[2]s, %[3]s, %[4]s FROM %[5]s WHERE %[1]s >= pg_snapshot_xmax($1) OR ( %[1]s >= pg_snapshot_xmin($1) AND NOT pg_visible_in_snapshot(%[1]s, $1) - ) ORDER BY pg_xact_commit_timestamp(%[1]s::xid), %[1]s;`, colXID, colSnapshot, colMetadata, colTimestamp, tableTransaction) + ) ORDER BY pg_xact_commit_timestamp(%[1]s::xid), %[1]s;`, schema.ColXID, schema.ColSnapshot, schema.ColMetadata, schema.ColTimestamp, schema.TableTransaction) queryChangedTuples = psql.Select( - colNamespace, - colObjectID, - colRelation, - colUsersetNamespace, - colUsersetObjectID, - colUsersetRelation, - colCaveatContextName, - colCaveatContext, - colExpiration, - colCreatedXid, - colDeletedXid, - ).From(tableTuple) + schema.ColNamespace, + schema.ColObjectID, + schema.ColRelation, + schema.ColUsersetNamespace, + schema.ColUsersetObjectID, + schema.ColUsersetRelation, + schema.ColCaveatContextName, + schema.ColCaveatContext, + schema.ColExpiration, + schema.ColCreatedXid, + schema.ColDeletedXid, + ).From(schema.TableTuple) queryChangedNamespaces = psql.Select( - colConfig, - colCreatedXid, - colDeletedXid, - ).From(tableNamespace) + schema.ColConfig, + schema.ColCreatedXid, + schema.ColDeletedXid, + ).From(schema.TableNamespace) queryChangedCaveats = psql.Select( - colCaveatName, - colCaveatDefinition, - colCreatedXid, - colDeletedXid, - ).From(tableCaveat) + schema.ColCaveatName, + schema.ColCaveatDefinition, + schema.ColCreatedXid, + schema.ColDeletedXid, + ).From(schema.TableCaveat) ) func (pgd *pgDatastore) Watch( @@ -329,12 +330,12 @@ func (pgd *pgDatastore) loadChanges(ctx context.Context, revisions []postgresRev func (pgd *pgDatastore) loadRelationshipChanges(ctx context.Context, xmin uint64, xmax uint64, txidToRevision map[uint64]postgresRevision, filter map[uint64]int, tracked *common.Changes[postgresRevision, uint64]) error { sql, args, err := queryChangedTuples.Where(sq.Or{ sq.And{ - sq.LtOrEq{colCreatedXid: xmax}, - sq.GtOrEq{colCreatedXid: xmin}, + sq.LtOrEq{schema.ColCreatedXid: xmax}, + sq.GtOrEq{schema.ColCreatedXid: xmin}, }, sq.And{ - sq.LtOrEq{colDeletedXid: xmax}, - sq.GtOrEq{colDeletedXid: xmin}, + sq.LtOrEq{schema.ColDeletedXid: xmax}, + sq.GtOrEq{schema.ColDeletedXid: xmin}, }, }).ToSql() if err != nil { @@ -425,12 +426,12 @@ func (pgd *pgDatastore) loadRelationshipChanges(ctx context.Context, xmin uint64 func (pgd *pgDatastore) loadNamespaceChanges(ctx context.Context, xmin uint64, xmax uint64, txidToRevision map[uint64]postgresRevision, filter map[uint64]int, tracked *common.Changes[postgresRevision, uint64]) error { sql, args, err := queryChangedNamespaces.Where(sq.Or{ sq.And{ - sq.LtOrEq{colCreatedXid: xmax}, - sq.GtOrEq{colCreatedXid: xmin}, + sq.LtOrEq{schema.ColCreatedXid: xmax}, + sq.GtOrEq{schema.ColCreatedXid: xmin}, }, sq.And{ - sq.LtOrEq{colDeletedXid: xmax}, - sq.GtOrEq{colDeletedXid: xmin}, + sq.LtOrEq{schema.ColDeletedXid: xmax}, + sq.GtOrEq{schema.ColDeletedXid: xmin}, }, }).ToSql() if err != nil { @@ -482,12 +483,12 @@ func (pgd *pgDatastore) loadNamespaceChanges(ctx context.Context, xmin uint64, x func (pgd *pgDatastore) loadCaveatChanges(ctx context.Context, xmin uint64, xmax uint64, txidToRevision map[uint64]postgresRevision, filter map[uint64]int, tracked *common.Changes[postgresRevision, uint64]) error { sql, args, err := queryChangedCaveats.Where(sq.Or{ sq.And{ - sq.LtOrEq{colCreatedXid: xmax}, - sq.GtOrEq{colCreatedXid: xmin}, + sq.LtOrEq{schema.ColCreatedXid: xmax}, + sq.GtOrEq{schema.ColCreatedXid: xmin}, }, sq.And{ - sq.LtOrEq{colDeletedXid: xmax}, - sq.GtOrEq{colDeletedXid: xmin}, + sq.LtOrEq{schema.ColDeletedXid: xmax}, + sq.GtOrEq{schema.ColDeletedXid: xmin}, }, }).ToSql() if err != nil { diff --git a/internal/datastore/proxy/indexcheck.go b/internal/datastore/proxy/indexcheck.go new file mode 100644 index 0000000000..c40a504f5c --- /dev/null +++ b/internal/datastore/proxy/indexcheck.go @@ -0,0 +1,215 @@ +package proxy + +import ( + "context" + "fmt" + + v1 "github.com/authzed/authzed-go/proto/authzed/api/v1" + + "github.com/authzed/spicedb/pkg/datastore" + "github.com/authzed/spicedb/pkg/datastore/options" + "github.com/authzed/spicedb/pkg/datastore/queryshape" + "github.com/authzed/spicedb/pkg/genutil/mapz" + core "github.com/authzed/spicedb/pkg/proto/core/v1" + "github.com/authzed/spicedb/pkg/tuple" +) + +// NewIndexCheckingDatastoreProxy returns a datastore proxy that runs EXPLAIN ANALYZE on all +// relationships queries and ensures that the index(es) used within match those defined in the +// schema for the datastore. +func NewIndexCheckingDatastoreProxy(d datastore.SQLDatastore) datastore.Datastore { + return &indexcheckingProxy{delegate: d} +} + +// WrapWithIndexCheckingDatastoreProxyIfApplicable wraps the provided datastore with an +// index-checking proxy if the datastore is an SQLDatastore. +func WrapWithIndexCheckingDatastoreProxyIfApplicable(ds datastore.Datastore) datastore.Datastore { + if unwrapped, ok := ds.(datastore.UnwrappableDatastore); ok { + if sqlds, ok := unwrapped.Unwrap().(datastore.SQLDatastore); ok { + return NewIndexCheckingDatastoreProxy(sqlds) + } + } + + return ds +} + +type indexcheckingProxy struct{ delegate datastore.SQLDatastore } + +func (p *indexcheckingProxy) SnapshotReader(rev datastore.Revision) datastore.Reader { + delegateReader := p.delegate.SnapshotReader(rev) + return &indexcheckingReader{p.delegate, delegateReader} +} + +func (p *indexcheckingProxy) ReadWriteTx( + ctx context.Context, + f datastore.TxUserFunc, + opts ...options.RWTOptionsOption, +) (datastore.Revision, error) { + return p.delegate.ReadWriteTx(ctx, func(ctx context.Context, delegateRWT datastore.ReadWriteTransaction) error { + return f(ctx, &indexcheckingRWT{&indexcheckingReader{p.delegate, delegateRWT}, delegateRWT}) + }, opts...) +} + +func (p *indexcheckingProxy) OptimizedRevision(ctx context.Context) (datastore.Revision, error) { + return p.delegate.OptimizedRevision(ctx) +} + +func (p *indexcheckingProxy) CheckRevision(ctx context.Context, revision datastore.Revision) error { + return p.delegate.CheckRevision(ctx, revision) +} + +func (p *indexcheckingProxy) HeadRevision(ctx context.Context) (datastore.Revision, error) { + return p.delegate.HeadRevision(ctx) +} + +func (p *indexcheckingProxy) RevisionFromString(serialized string) (datastore.Revision, error) { + return p.delegate.RevisionFromString(serialized) +} + +func (p *indexcheckingProxy) Watch(ctx context.Context, afterRevision datastore.Revision, options datastore.WatchOptions) (<-chan *datastore.RevisionChanges, <-chan error) { + return p.delegate.Watch(ctx, afterRevision, options) +} + +func (p *indexcheckingProxy) Features(ctx context.Context) (*datastore.Features, error) { + return p.delegate.Features(ctx) +} + +func (p *indexcheckingProxy) OfflineFeatures() (*datastore.Features, error) { + return p.delegate.OfflineFeatures() +} + +func (p *indexcheckingProxy) Statistics(ctx context.Context) (datastore.Stats, error) { + return p.delegate.Statistics(ctx) +} + +func (p *indexcheckingProxy) Unwrap() datastore.Datastore { + return p.delegate +} + +func (p *indexcheckingProxy) ReadyState(ctx context.Context) (datastore.ReadyState, error) { + return p.delegate.ReadyState(ctx) +} + +func (p *indexcheckingProxy) Close() error { return p.delegate.Close() } + +type indexcheckingReader struct { + parent datastore.SQLDatastore + delegate datastore.Reader +} + +func (r *indexcheckingReader) CountRelationships(ctx context.Context, name string) (int, error) { + return r.delegate.CountRelationships(ctx, name) +} + +func (r *indexcheckingReader) LookupCounters(ctx context.Context) ([]datastore.RelationshipCounter, error) { + return r.delegate.LookupCounters(ctx) +} + +func (r *indexcheckingReader) ReadCaveatByName(ctx context.Context, name string) (*core.CaveatDefinition, datastore.Revision, error) { + return r.delegate.ReadCaveatByName(ctx, name) +} + +func (r *indexcheckingReader) LookupCaveatsWithNames(ctx context.Context, caveatNames []string) ([]datastore.RevisionedCaveat, error) { + return r.delegate.LookupCaveatsWithNames(ctx, caveatNames) +} + +func (r *indexcheckingReader) ListAllCaveats(ctx context.Context) ([]datastore.RevisionedCaveat, error) { + return r.delegate.ListAllCaveats(ctx) +} + +func (r *indexcheckingReader) ListAllNamespaces(ctx context.Context) ([]datastore.RevisionedNamespace, error) { + return r.delegate.ListAllNamespaces(ctx) +} + +func (r *indexcheckingReader) LookupNamespacesWithNames(ctx context.Context, nsNames []string) ([]datastore.RevisionedNamespace, error) { + return r.delegate.LookupNamespacesWithNames(ctx, nsNames) +} + +func (r *indexcheckingReader) ReadNamespaceByName(ctx context.Context, nsName string) (*core.NamespaceDefinition, datastore.Revision, error) { + return r.delegate.ReadNamespaceByName(ctx, nsName) +} + +func (r *indexcheckingReader) mustEnsureIndexes(ctx context.Context, sql string, args []any, shape queryshape.Shape, explain string, expectedIndexes options.SQLIndexInformation) { + // If no indexes are expected, there is nothing to check. + if len(expectedIndexes.ExpectedIndexNames) == 0 { + return + } + + parsed, err := r.parent.ParseExplain(explain) + if err != nil { + panic(fmt.Sprintf("failed to parse explain output: %s", err)) + } + + // If an index is not used (perhaps because the data is too small), the query is still valid. + if len(parsed.IndexesUsed) == 0 { + return + } + + // Otherwise, ensure the index used is one of the expected indexes. + indexesUsed := mapz.NewSet(parsed.IndexesUsed...) + indexesExpected := mapz.NewSet(expectedIndexes.ExpectedIndexNames...) + if indexesExpected.Intersect(indexesUsed).IsEmpty() { + panic(fmt.Sprintf("expected index(es) %v for query shape %v not used: %s", expectedIndexes.ExpectedIndexNames, shape, explain)) + } +} + +func (r *indexcheckingReader) QueryRelationships(ctx context.Context, filter datastore.RelationshipsFilter, opts ...options.QueryOptionsOption) (datastore.RelationshipIterator, error) { + opts = append(opts, options.WithSQLExplainCallback(r.mustEnsureIndexes)) + return r.delegate.QueryRelationships(ctx, filter, opts...) +} + +func (r *indexcheckingReader) ReverseQueryRelationships(ctx context.Context, subjectsFilter datastore.SubjectsFilter, opts ...options.ReverseQueryOptionsOption) (datastore.RelationshipIterator, error) { + opts = append(opts, options.WithSQLExplainCallbackForReverse(r.mustEnsureIndexes)) + return r.delegate.ReverseQueryRelationships(ctx, subjectsFilter, opts...) +} + +type indexcheckingRWT struct { + *indexcheckingReader + delegate datastore.ReadWriteTransaction +} + +func (rwt *indexcheckingRWT) RegisterCounter(ctx context.Context, name string, filter *core.RelationshipFilter) error { + return rwt.delegate.RegisterCounter(ctx, name, filter) +} + +func (rwt *indexcheckingRWT) UnregisterCounter(ctx context.Context, name string) error { + return rwt.delegate.UnregisterCounter(ctx, name) +} + +func (rwt *indexcheckingRWT) StoreCounterValue(ctx context.Context, name string, value int, computedAtRevision datastore.Revision) error { + return rwt.delegate.StoreCounterValue(ctx, name, value, computedAtRevision) +} + +func (rwt *indexcheckingRWT) WriteCaveats(ctx context.Context, caveats []*core.CaveatDefinition) error { + return rwt.delegate.WriteCaveats(ctx, caveats) +} + +func (rwt *indexcheckingRWT) DeleteCaveats(ctx context.Context, names []string) error { + return rwt.delegate.DeleteCaveats(ctx, names) +} + +func (rwt *indexcheckingRWT) WriteRelationships(ctx context.Context, mutations []tuple.RelationshipUpdate) error { + return rwt.delegate.WriteRelationships(ctx, mutations) +} + +func (rwt *indexcheckingRWT) WriteNamespaces(ctx context.Context, newConfigs ...*core.NamespaceDefinition) error { + return rwt.delegate.WriteNamespaces(ctx, newConfigs...) +} + +func (rwt *indexcheckingRWT) DeleteNamespaces(ctx context.Context, nsNames ...string) error { + return rwt.delegate.DeleteNamespaces(ctx, nsNames...) +} + +func (rwt *indexcheckingRWT) DeleteRelationships(ctx context.Context, filter *v1.RelationshipFilter, options ...options.DeleteOptionsOption) (bool, error) { + return rwt.delegate.DeleteRelationships(ctx, filter, options...) +} + +func (rwt *indexcheckingRWT) BulkLoad(ctx context.Context, iter datastore.BulkWriteRelationshipSource) (uint64, error) { + return rwt.delegate.BulkLoad(ctx, iter) +} + +var ( + _ datastore.Datastore = (*indexcheckingProxy)(nil) + _ datastore.Reader = (*indexcheckingReader)(nil) + _ datastore.ReadWriteTransaction = (*indexcheckingRWT)(nil) +) diff --git a/internal/datastore/spanner/reader.go b/internal/datastore/spanner/reader.go index 2dc8ee5022..a78f03e271 100644 --- a/internal/datastore/spanner/reader.go +++ b/internal/datastore/spanner/reader.go @@ -167,6 +167,8 @@ func (sr spannerReader) ReverseQueryRelationships( options.WithLimit(queryOpts.LimitForReverse), options.WithAfter(queryOpts.AfterForReverse), options.WithSort(queryOpts.SortForReverse), + options.WithQueryShape(queryOpts.QueryShapeForReverse), + options.WithSQLExplainCallback(queryOpts.SQLExplainCallbackForReverse), ) } diff --git a/internal/graph/check.go b/internal/graph/check.go index 410152fba9..7bbdfacf9a 100644 --- a/internal/graph/check.go +++ b/internal/graph/check.go @@ -20,6 +20,7 @@ import ( "github.com/authzed/spicedb/internal/taskrunner" "github.com/authzed/spicedb/pkg/datastore" "github.com/authzed/spicedb/pkg/datastore/options" + "github.com/authzed/spicedb/pkg/datastore/queryshape" "github.com/authzed/spicedb/pkg/genutil/mapz" "github.com/authzed/spicedb/pkg/middleware/nodeid" nspkg "github.com/authzed/spicedb/pkg/namespace" @@ -427,6 +428,7 @@ func (cc *ConcurrentChecker) checkDirect(ctx context.Context, crc currentRequest it, err := ds.QueryRelationships(ctx, filter, options.WithSkipCaveats(!directSubjectOrWildcardCanHaveCaveats), options.WithSkipExpiration(!directSubjectOrWildcardCanHaveExpiration), + options.WithQueryShape(queryshape.CheckPermissionSelectDirectSubjects), ) if err != nil { return checkResultError(NewCheckFailureErr(err), emptyMetadata) @@ -479,6 +481,7 @@ func (cc *ConcurrentChecker) checkDirect(ctx context.Context, crc currentRequest it, err := ds.QueryRelationships(ctx, filter, options.WithSkipCaveats(!nonTerminalsCanHaveCaveats), options.WithSkipExpiration(!nonTerminalsCanHaveExpiration), + options.WithQueryShape(queryshape.CheckPermissionSelectIndirectSubjects), ) if err != nil { return checkResultError(NewCheckFailureErr(err), emptyMetadata) diff --git a/internal/services/integrationtesting/consistency_datastore_test.go b/internal/services/integrationtesting/consistency_datastore_test.go index d941390a3a..71b0200915 100644 --- a/internal/services/integrationtesting/consistency_datastore_test.go +++ b/internal/services/integrationtesting/consistency_datastore_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/require" + "github.com/authzed/spicedb/internal/datastore/proxy" "github.com/authzed/spicedb/internal/dispatch/graph" "github.com/authzed/spicedb/internal/services/integrationtesting/consistencytestutil" testdatastore "github.com/authzed/spicedb/internal/testserver/datastore" @@ -50,12 +51,13 @@ func TestConsistencyPerDatastore(t *testing.T) { } rde := testdatastore.RunDatastoreEngine(t, engineID) - ds := rde.NewDatastore(t, config.DatastoreConfigInitFunc(t, + baseds := rde.NewDatastore(t, config.DatastoreConfigInitFunc(t, dsconfig.WithWatchBufferLength(0), dsconfig.WithGCWindow(time.Duration(90_000_000_000_000)), dsconfig.WithRevisionQuantization(10), dsconfig.WithMaxRetries(50), dsconfig.WithRequestHedgingEnabled(false))) + ds := proxy.WrapWithIndexCheckingDatastoreProxyIfApplicable(baseds) cad := consistencytestutil.BuildDataAndCreateClusterForTesting(t, filePath, ds) dispatcher := graph.NewLocalOnlyDispatcher(10, 100) diff --git a/internal/services/steelthreadtesting/steelthread_test.go b/internal/services/steelthreadtesting/steelthread_test.go index 39733522a1..825c81d64c 100644 --- a/internal/services/steelthreadtesting/steelthread_test.go +++ b/internal/services/steelthreadtesting/steelthread_test.go @@ -18,6 +18,7 @@ import ( v1 "github.com/authzed/authzed-go/proto/authzed/api/v1" "github.com/authzed/spicedb/internal/datastore/dsfortesting" + "github.com/authzed/spicedb/internal/datastore/proxy" "github.com/authzed/spicedb/internal/testserver" testdatastore "github.com/authzed/spicedb/internal/testserver/datastore" "github.com/authzed/spicedb/internal/testserver/datastore/config" @@ -57,6 +58,7 @@ func TestNonMemdbSteelThreads(t *testing.T) { dsconfig.WithMaxRetries(50), dsconfig.WithRequestHedgingEnabled(false))) + ds = proxy.WrapWithIndexCheckingDatastoreProxyIfApplicable(ds) runSteelThreadTest(t, tc, ds) }) } diff --git a/pkg/datastore/datastore.go b/pkg/datastore/datastore.go index 6505db7bd4..232e2ba220 100644 --- a/pkg/datastore/datastore.go +++ b/pkg/datastore/datastore.go @@ -683,6 +683,30 @@ type Datastore interface { ReadWriteTx(context.Context, TxUserFunc, ...options.RWTOptionsOption) (Revision, error) } +// ParsedExplain represents the parsed output of an EXPLAIN statement. +type ParsedExplain struct { + // IndexesUsed is the list of indexes used in the query. + IndexesUsed []string +} + +// Explainable is an interface for datastores that support EXPLAIN statements. +type Explainable interface { + // BuildExplainQuery builds an EXPLAIN statement for the given SQL and arguments. + BuildExplainQuery(sql string, args []any) (string, []any, error) + + // ParseExplain parses the output of an EXPLAIN statement. + ParseExplain(explain string) (ParsedExplain, error) + + // PreExplainStatements returns any statements that should be run before the EXPLAIN statement. + PreExplainStatements() []string +} + +// SQLDatastore is an interface for datastores that support SQL-based operations. +type SQLDatastore interface { + Datastore + Explainable +} + // StrictReadDatastore is an interface for datastores that support strict read mode. type StrictReadDatastore interface { Datastore diff --git a/pkg/datastore/options/options.go b/pkg/datastore/options/options.go index 5bf6bbd40e..d312761703 100644 --- a/pkg/datastore/options/options.go +++ b/pkg/datastore/options/options.go @@ -1,8 +1,11 @@ package options import ( + "context" + "google.golang.org/protobuf/types/known/structpb" + "github.com/authzed/spicedb/pkg/datastore/queryshape" "github.com/authzed/spicedb/pkg/spiceerrors" "github.com/authzed/spicedb/pkg/tuple" ) @@ -41,7 +44,19 @@ func ToRelationship(c Cursor) *tuple.Relationship { return (*tuple.Relationship)(c) } -type Assertion func(sql string) +// SQLCheckAssertion is a function that can be used to assert a condition on the SQL query string. +// Assertions will only be run during testing and only apply in datastores that support SQL. +type SQLCheckAssertion func(sql string) + +// SQLIndexInformation holds the expected index names for a SQL query. +type SQLIndexInformation struct { + // ExpectedIndexNames are the name(s) of the index(es) that are expected to be used by this + // SQL query. + ExpectedIndexNames []string +} + +// SQLExplainCallback is a callback invoked with the explain plan of the SQL query string. +type SQLExplainCallback func(ctx context.Context, sql string, args []any, shape queryshape.Shape, explain string, expectedIndexes SQLIndexInformation) // QueryOptions are the options that can affect the results of a normal forward query. type QueryOptions struct { @@ -50,7 +65,18 @@ type QueryOptions struct { After Cursor `debugmap:"visible"` SkipCaveats bool `debugmap:"visible"` SkipExpiration bool `debugmap:"visible"` - SQLAssertion Assertion `debugmap:"visible"` + + // SQLCheckAssertion is a function that can be used to assert a condition on the SQL query string. + // For testing and validation only. + SQLCheckAssertion SQLCheckAssertion `debugmap:"visible"` + + // SQLExplainCallback is a callback invoked with the explain plan of the SQL query string. + // For testing and validation only. + SQLExplainCallback SQLExplainCallback `debugmap:"visible"` + + // QueryShape is the marked shape of the query. + // For testing and validation only. + QueryShape queryshape.Shape `debugmap:"visible"` } // ReverseQueryOptions are the options that can affect the results of a reverse query. @@ -60,6 +86,14 @@ type ReverseQueryOptions struct { LimitForReverse *uint64 `debugmap:"visible"` SortForReverse SortOrder `debugmap:"visible"` AfterForReverse Cursor `debugmap:"visible"` + + // SQLExplainCallbackForReverse is a callback invoked with the explain plan of the SQL query string. + // For testing and validation only. + SQLExplainCallbackForReverse SQLExplainCallback `debugmap:"visible"` + + // QueryShapeForReverse is the marked shape of the reverse query. + // For testing and validation only. + QueryShapeForReverse queryshape.Shape `debugmap:"visible"` } // ResourceRelation combines a resource object type and relation. diff --git a/pkg/datastore/options/zz_generated.query_options.go b/pkg/datastore/options/zz_generated.query_options.go index 348c53639f..6deb0088e0 100644 --- a/pkg/datastore/options/zz_generated.query_options.go +++ b/pkg/datastore/options/zz_generated.query_options.go @@ -2,6 +2,7 @@ package options import ( + queryshape "github.com/authzed/spicedb/pkg/datastore/queryshape" defaults "github.com/creasty/defaults" helpers "github.com/ecordell/optgen/helpers" structpb "google.golang.org/protobuf/types/known/structpb" @@ -36,7 +37,9 @@ func (q *QueryOptions) ToOption() QueryOptionsOption { to.After = q.After to.SkipCaveats = q.SkipCaveats to.SkipExpiration = q.SkipExpiration - to.SQLAssertion = q.SQLAssertion + to.SQLCheckAssertion = q.SQLCheckAssertion + to.SQLExplainCallback = q.SQLExplainCallback + to.QueryShape = q.QueryShape } } @@ -48,7 +51,9 @@ func (q QueryOptions) DebugMap() map[string]any { debugMap["After"] = helpers.DebugValue(q.After, false) debugMap["SkipCaveats"] = helpers.DebugValue(q.SkipCaveats, false) debugMap["SkipExpiration"] = helpers.DebugValue(q.SkipExpiration, false) - debugMap["SQLAssertion"] = helpers.DebugValue(q.SQLAssertion, false) + debugMap["SQLCheckAssertion"] = helpers.DebugValue(q.SQLCheckAssertion, false) + debugMap["SQLExplainCallback"] = helpers.DebugValue(q.SQLExplainCallback, false) + debugMap["QueryShape"] = helpers.DebugValue(q.QueryShape, false) return debugMap } @@ -103,10 +108,24 @@ func WithSkipExpiration(skipExpiration bool) QueryOptionsOption { } } -// WithSQLAssertion returns an option that can set SQLAssertion on a QueryOptions -func WithSQLAssertion(sQLAssertion Assertion) QueryOptionsOption { +// WithSQLCheckAssertion returns an option that can set SQLCheckAssertion on a QueryOptions +func WithSQLCheckAssertion(sQLCheckAssertion SQLCheckAssertion) QueryOptionsOption { return func(q *QueryOptions) { - q.SQLAssertion = sQLAssertion + q.SQLCheckAssertion = sQLCheckAssertion + } +} + +// WithSQLExplainCallback returns an option that can set SQLExplainCallback on a QueryOptions +func WithSQLExplainCallback(sQLExplainCallback SQLExplainCallback) QueryOptionsOption { + return func(q *QueryOptions) { + q.SQLExplainCallback = sQLExplainCallback + } +} + +// WithQueryShape returns an option that can set QueryShape on a QueryOptions +func WithQueryShape(queryShape queryshape.Shape) QueryOptionsOption { + return func(q *QueryOptions) { + q.QueryShape = queryShape } } @@ -138,6 +157,8 @@ func (r *ReverseQueryOptions) ToOption() ReverseQueryOptionsOption { to.LimitForReverse = r.LimitForReverse to.SortForReverse = r.SortForReverse to.AfterForReverse = r.AfterForReverse + to.SQLExplainCallbackForReverse = r.SQLExplainCallbackForReverse + to.QueryShapeForReverse = r.QueryShapeForReverse } } @@ -148,6 +169,8 @@ func (r ReverseQueryOptions) DebugMap() map[string]any { debugMap["LimitForReverse"] = helpers.DebugValue(r.LimitForReverse, false) debugMap["SortForReverse"] = helpers.DebugValue(r.SortForReverse, false) debugMap["AfterForReverse"] = helpers.DebugValue(r.AfterForReverse, false) + debugMap["SQLExplainCallbackForReverse"] = helpers.DebugValue(r.SQLExplainCallbackForReverse, false) + debugMap["QueryShapeForReverse"] = helpers.DebugValue(r.QueryShapeForReverse, false) return debugMap } @@ -195,6 +218,20 @@ func WithAfterForReverse(afterForReverse Cursor) ReverseQueryOptionsOption { } } +// WithSQLExplainCallbackForReverse returns an option that can set SQLExplainCallbackForReverse on a ReverseQueryOptions +func WithSQLExplainCallbackForReverse(sQLExplainCallbackForReverse SQLExplainCallback) ReverseQueryOptionsOption { + return func(r *ReverseQueryOptions) { + r.SQLExplainCallbackForReverse = sQLExplainCallbackForReverse + } +} + +// WithQueryShapeForReverse returns an option that can set QueryShapeForReverse on a ReverseQueryOptions +func WithQueryShapeForReverse(queryShapeForReverse queryshape.Shape) ReverseQueryOptionsOption { + return func(r *ReverseQueryOptions) { + r.QueryShapeForReverse = queryShapeForReverse + } +} + type RWTOptionsOption func(r *RWTOptions) // NewRWTOptionsWithOptions creates a new RWTOptions with the passed in options set diff --git a/pkg/datastore/queryshape/queryshape.go b/pkg/datastore/queryshape/queryshape.go new file mode 100644 index 0000000000..d82f108092 --- /dev/null +++ b/pkg/datastore/queryshape/queryshape.go @@ -0,0 +1,21 @@ +package queryshape + +// Shape represents the different ways a query can be shaped. +type Shape string + +const ( + // Unspecified indicates that the shape is not specified. + Unspecified Shape = "unspecified" + + // Varying indicates that the shape can vary. This is used + // for queries whose shape is not known ahead of time. + Varying = "varying" + + // CheckPermissionSelectDirectSubjects indicates that the query is a permission check + // that selects direct subjects. + CheckPermissionSelectDirectSubjects = "check-permission-select-direct-subjects" + + // CheckPermissionSelectIndirectSubjects indicates that the query is a permission check + // that selects indirect subjects. + CheckPermissionSelectIndirectSubjects = "check-permission-select-indirect-subjects" +)