From d945c735d96c3df145c9330df9ca4e05510ec81f Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Mon, 3 Feb 2025 12:19:41 -0500 Subject: [PATCH] Add a metric tracking the selected replica --- internal/datastore/common/url.go | 19 ++++++++ internal/datastore/common/url_test.go | 45 +++++++++++++++++++ internal/datastore/context.go | 4 ++ internal/datastore/crdb/crdb.go | 4 ++ internal/datastore/memdb/memdb.go | 4 ++ internal/datastore/mysql/datastore.go | 4 ++ internal/datastore/postgres/postgres.go | 4 ++ .../datastore/proxy/checkingreplicated.go | 25 +++++++++-- .../proxy/checkingreplicated_test.go | 4 ++ internal/datastore/proxy/observable.go | 4 ++ internal/datastore/proxy/proxy_test/mock.go | 4 ++ .../datastore/proxy/relationshipintegrity.go | 4 ++ .../proxy/schemacaching/watchingcache_test.go | 4 ++ internal/datastore/proxy/singleflight.go | 4 ++ internal/datastore/proxy/strictreplicated.go | 28 +++++++----- internal/datastore/spanner/spanner.go | 4 ++ pkg/datastore/datastore.go | 5 +++ pkg/datastore/datastore_test.go | 4 ++ 18 files changed, 161 insertions(+), 13 deletions(-) create mode 100644 internal/datastore/common/url.go create mode 100644 internal/datastore/common/url_test.go diff --git a/internal/datastore/common/url.go b/internal/datastore/common/url.go new file mode 100644 index 0000000000..be665ed20a --- /dev/null +++ b/internal/datastore/common/url.go @@ -0,0 +1,19 @@ +package common + +import ( + "errors" + "net/url" +) + +// MetricsIDFromURL extracts the metrics ID from a given datastore URL. +func MetricsIDFromURL(dsURL string) (string, error) { + if dsURL == "" { + return "", errors.New("datastore URL is empty") + } + + u, err := url.Parse(dsURL) + if err != nil { + return "", errors.New("could not parse datastore URL") + } + return u.Host + u.Path, nil +} diff --git a/internal/datastore/common/url_test.go b/internal/datastore/common/url_test.go new file mode 100644 index 0000000000..80691294de --- /dev/null +++ b/internal/datastore/common/url_test.go @@ -0,0 +1,45 @@ +package common + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestMetricsIDFromURL(t *testing.T) { + tcs := []struct { + dsURL string + expected string + expectedError string + }{ + { + "", + "", + "datastore URL is empty", + }, + { + "postgres://user:password@localhost:5432/dbname", + "localhost:5432/dbname", + "", + }, + { + "mysql://user:password@localhost:3306/dbname", + "localhost:3306/dbname", + "", + }, + } + + for _, tc := range tcs { + t.Run(tc.dsURL, func(t *testing.T) { + result, err := MetricsIDFromURL(tc.dsURL) + if tc.expectedError != "" { + require.Error(t, err) + require.ErrorContains(t, err, tc.expectedError) + return + } + + require.NoError(t, err) + require.Equal(t, tc.expected, result) + }) + } +} diff --git a/internal/datastore/context.go b/internal/datastore/context.go index ef452a8417..f96e73dfc5 100644 --- a/internal/datastore/context.go +++ b/internal/datastore/context.go @@ -39,6 +39,10 @@ func NewSeparatingContextDatastoreProxy(d datastore.Datastore) datastore.StrictR type ctxProxy struct{ delegate datastore.Datastore } +func (p *ctxProxy) MetricsID() (string, error) { + return p.delegate.MetricsID() +} + func (p *ctxProxy) ReadWriteTx( ctx context.Context, f datastore.TxUserFunc, diff --git a/internal/datastore/crdb/crdb.go b/internal/datastore/crdb/crdb.go index ad3bff7047..f5b59665da 100644 --- a/internal/datastore/crdb/crdb.go +++ b/internal/datastore/crdb/crdb.go @@ -363,6 +363,10 @@ func (cds *crdbDatastore) SnapshotReader(rev datastore.Revision) datastore.Reade } } +func (cds *crdbDatastore) MetricsID() (string, error) { + return common.MetricsIDFromURL(cds.dburl) +} + func (cds *crdbDatastore) ReadWriteTx( ctx context.Context, f datastore.TxUserFunc, diff --git a/internal/datastore/memdb/memdb.go b/internal/datastore/memdb/memdb.go index 796b1b2bc2..182827cf0f 100644 --- a/internal/datastore/memdb/memdb.go +++ b/internal/datastore/memdb/memdb.go @@ -100,6 +100,10 @@ type snapshot struct { db *memdb.MemDB } +func (mdb *memdbDatastore) MetricsID() (string, error) { + return "memdb-" + mdb.uniqueID, nil +} + func (mdb *memdbDatastore) SnapshotReader(dr datastore.Revision) datastore.Reader { mdb.RLock() defer mdb.RUnlock() diff --git a/internal/datastore/mysql/datastore.go b/internal/datastore/mysql/datastore.go index 875c61d661..ddc92b1f16 100644 --- a/internal/datastore/mysql/datastore.go +++ b/internal/datastore/mysql/datastore.go @@ -325,6 +325,10 @@ func newMySQLDatastore(ctx context.Context, uri string, replicaIndex int, option return store, nil } +func (mds *Datastore) MetricsID() (string, error) { + return common.MetricsIDFromURL(mds.url) +} + func (mds *Datastore) SnapshotReader(rev datastore.Revision) datastore.Reader { createTxFunc := func(ctx context.Context) (*sql.Tx, txCleanupFunc, error) { tx, err := mds.db.BeginTx(ctx, mds.readTxOptions) diff --git a/internal/datastore/postgres/postgres.go b/internal/datastore/postgres/postgres.go index 8f5d66b068..68d72e80c7 100644 --- a/internal/datastore/postgres/postgres.go +++ b/internal/datastore/postgres/postgres.go @@ -424,6 +424,10 @@ type pgDatastore struct { filterMaximumIDCount uint16 } +func (pgd *pgDatastore) MetricsID() (string, error) { + return common.MetricsIDFromURL(pgd.dburl) +} + func (pgd *pgDatastore) IsStrictReadModeEnabled() bool { return pgd.inStrictReadMode } diff --git a/internal/datastore/proxy/checkingreplicated.go b/internal/datastore/proxy/checkingreplicated.go index 0f40284e95..62c7c50c97 100644 --- a/internal/datastore/proxy/checkingreplicated.go +++ b/internal/datastore/proxy/checkingreplicated.go @@ -23,12 +23,19 @@ var ( Help: "total number of readers created by the checking replica proxy", }) - checkingReplicatedReplicaReaderCount = promauto.NewCounter(prometheus.CounterOpts{ + checkingReplicatedReplicaReaderCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "spicedb", Subsystem: "datastore_replica", Name: "checking_replicated_replica_reader_total", Help: "number of readers created by the checking replica proxy that are using the replica", - }) + }, []string{"replica"}) + + readReplicatedSelectedReplicaCount = promauto.NewCounterVec(prometheus.CounterOpts{ + Namespace: "spicedb", + Subsystem: "datastore_replica", + Name: "selected_replica_total", + Help: "the selected replica in a read replicated datastore", + }, []string{"replica"}) ) // NewCheckingReplicatedDatastore creates a new datastore that writes to the provided primary and reads @@ -89,6 +96,12 @@ type checkingReplicatedDatastore struct { // Any errors establishing the reader will be returned by subsequent calls. func (rd *checkingReplicatedDatastore) SnapshotReader(revision datastore.Revision) datastore.Reader { replica := selectReplica(rd.replicas, &rd.lastReplica) + replicaID, err := replica.MetricsID() + if err != nil { + log.Warn().Err(err).Msg("unable to determine metrics ID for replica") + replicaID = "unknown" + } + readReplicatedSelectedReplicaCount.WithLabelValues(replicaID).Inc() return &checkingStableReader{ rev: revision, replica: replica, @@ -223,7 +236,13 @@ func (rr *checkingStableReader) determineSource(ctx context.Context) error { } log.Trace().Str("revision", rr.rev.String()).Msg("replica contains the requested revision") - checkingReplicatedReplicaReaderCount.Inc() + metricsID, err := rr.replica.MetricsID() + if err != nil { + log.Warn().Err(err).Msg("unable to determine metrics ID for replica") + metricsID = "unknown" + } + + checkingReplicatedReplicaReaderCount.WithLabelValues(metricsID).Inc() rr.chosenReader = rr.replica.SnapshotReader(rr.rev) rr.chosePrimaryForTest = false }) diff --git a/internal/datastore/proxy/checkingreplicated_test.go b/internal/datastore/proxy/checkingreplicated_test.go index fdf5bd528c..9544a032fe 100644 --- a/internal/datastore/proxy/checkingreplicated_test.go +++ b/internal/datastore/proxy/checkingreplicated_test.go @@ -83,6 +83,10 @@ type fakeDatastore struct { revision datastore.Revision } +func (f fakeDatastore) MetricsID() (string, error) { + return "fake", nil +} + func (f fakeDatastore) SnapshotReader(revision datastore.Revision) datastore.Reader { return fakeSnapshotReader{ revision: revision, diff --git a/internal/datastore/proxy/observable.go b/internal/datastore/proxy/observable.go index 3f92beb71e..47e76dc0ca 100644 --- a/internal/datastore/proxy/observable.go +++ b/internal/datastore/proxy/observable.go @@ -68,6 +68,10 @@ func NewObservableDatastoreProxy(d datastore.Datastore) datastore.Datastore { type observableProxy struct{ delegate datastore.Datastore } +func (p *observableProxy) MetricsID() (string, error) { + return p.delegate.MetricsID() +} + func (p *observableProxy) SnapshotReader(rev datastore.Revision) datastore.Reader { delegateReader := p.delegate.SnapshotReader(rev) return &observableReader{delegateReader} diff --git a/internal/datastore/proxy/proxy_test/mock.go b/internal/datastore/proxy/proxy_test/mock.go index dc59c780ff..51acf57384 100644 --- a/internal/datastore/proxy/proxy_test/mock.go +++ b/internal/datastore/proxy/proxy_test/mock.go @@ -22,6 +22,10 @@ func (dm *MockDatastore) SnapshotReader(rev datastore.Revision) datastore.Reader return args.Get(0).(datastore.Reader) } +func (dm *MockDatastore) MetricsID() (string, error) { + return "mock", nil +} + func (dm *MockDatastore) ReadWriteTx( ctx context.Context, f datastore.TxUserFunc, diff --git a/internal/datastore/proxy/relationshipintegrity.go b/internal/datastore/proxy/relationshipintegrity.go index 4b129b14d1..c21f3c7d30 100644 --- a/internal/datastore/proxy/relationshipintegrity.go +++ b/internal/datastore/proxy/relationshipintegrity.go @@ -157,6 +157,10 @@ func computeRelationshipHash(rel tuple.Relationship, key *hmacConfig) ([]byte, e return hasher.Sum(nil)[:hashLength], nil } +func (r *relationshipIntegrityProxy) MetricsID() (string, error) { + return r.ds.MetricsID() +} + func (r *relationshipIntegrityProxy) SnapshotReader(rev datastore.Revision) datastore.Reader { return relationshipIntegrityReader{ parent: r, diff --git a/internal/datastore/proxy/schemacaching/watchingcache_test.go b/internal/datastore/proxy/schemacaching/watchingcache_test.go index 8512e998ef..70330bde75 100644 --- a/internal/datastore/proxy/schemacaching/watchingcache_test.go +++ b/internal/datastore/proxy/schemacaching/watchingcache_test.go @@ -333,6 +333,10 @@ type fakeDatastore struct { lock sync.RWMutex } +func (fds *fakeDatastore) MetricsID() (string, error) { + return "fake", nil +} + func (fds *fakeDatastore) updateNamespace(name string, def *corev1.NamespaceDefinition, revision datastore.Revision) { fds.lock.Lock() defer fds.lock.Unlock() diff --git a/internal/datastore/proxy/singleflight.go b/internal/datastore/proxy/singleflight.go index ee02f32e1d..618e53bbd4 100644 --- a/internal/datastore/proxy/singleflight.go +++ b/internal/datastore/proxy/singleflight.go @@ -24,6 +24,10 @@ type singleflightProxy struct { var _ datastore.Datastore = (*singleflightProxy)(nil) +func (p *singleflightProxy) MetricsID() (string, error) { + return p.delegate.MetricsID() +} + func (p *singleflightProxy) SnapshotReader(rev datastore.Revision) datastore.Reader { return p.delegate.SnapshotReader(rev) } diff --git a/internal/datastore/proxy/strictreplicated.go b/internal/datastore/proxy/strictreplicated.go index eae1eb0055..381f835cdb 100644 --- a/internal/datastore/proxy/strictreplicated.go +++ b/internal/datastore/proxy/strictreplicated.go @@ -25,12 +25,12 @@ var ( Help: "total number of reads made by the strict read replicated datastore", }) - strictReadReplicatedFallbackQueryCount = promauto.NewCounter(prometheus.CounterOpts{ + strictReadReplicatedFallbackQueryCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: "spicedb", Subsystem: "datastore_replica", Name: "strict_replicated_fallback_query_total", Help: "number of queries that have fallen back to the primary datastore", - }) + }, []string{"replica"}) ) // NewStrictReplicatedDatastore creates a new datastore that writes to the provided primary and reads @@ -77,10 +77,17 @@ type strictReplicatedDatastore struct { // Any errors establishing the reader will be returned by subsequent calls. func (rd *strictReplicatedDatastore) SnapshotReader(revision datastore.Revision) datastore.Reader { replica := selectReplica(rd.replicas, &rd.lastReplica) + replicaID, err := replica.MetricsID() + if err != nil { + log.Error().Err(err).Msg("failed to get replica metrics ID") + replicaID = "unknown" + } + return &strictReadReplicatedReader{ - rev: revision, - replica: replica, - primary: rd.Datastore, + rev: revision, + replica: replica, + replicaID: replicaID, + primary: rd.Datastore, } } @@ -92,9 +99,10 @@ func (rd *strictReplicatedDatastore) SnapshotReader(revision datastore.Revision) // read mode enabled, to ensure the query will fail with a RevisionUnavailableError if the revision is // not available. type strictReadReplicatedReader struct { - rev datastore.Revision - replica datastore.ReadOnlyDatastore - primary datastore.Datastore + rev datastore.Revision + replica datastore.ReadOnlyDatastore + replicaID string + primary datastore.Datastore } func (rr *strictReadReplicatedReader) ReadCaveatByName(ctx context.Context, name string) (*core.CaveatDefinition, datastore.Revision, error) { @@ -150,7 +158,7 @@ func queryRelationships[F any, O any]( if err != nil { if errors.As(err, &common.RevisionUnavailableError{}) { log.Trace().Str("revision", rr.rev.String()).Msg("replica does not contain the requested revision, using primary") - strictReadReplicatedFallbackQueryCount.Inc() + strictReadReplicatedFallbackQueryCount.WithLabelValues(rr.replicaID).Inc() return handler(rr.primary.SnapshotReader(rr.rev))(ctx, filter, options...) } return nil, err @@ -187,7 +195,7 @@ func queryRelationships[F any, O any]( if requiresFallback { log.Trace().Str("revision", rr.rev.String()).Msg("replica does not contain the requested revision, using primary") - strictReadReplicatedFallbackQueryCount.Inc() + strictReadReplicatedFallbackQueryCount.WithLabelValues(rr.replicaID).Inc() pit, err := handler(rr.primary.SnapshotReader(rr.rev))(ctx, filter, options...) if err != nil { yield(tuple.Relationship{}, err) diff --git a/internal/datastore/spanner/spanner.go b/internal/datastore/spanner/spanner.go index ed54c657da..afa82d023d 100644 --- a/internal/datastore/spanner/spanner.go +++ b/internal/datastore/spanner/spanner.go @@ -272,6 +272,10 @@ func (sd *spannerDatastore) SnapshotReader(revisionRaw datastore.Revision) datas return spannerReader{executor, txSource, sd.filterMaximumIDCount, sd.schema} } +func (sd *spannerDatastore) MetricsID() (string, error) { + return sd.database, nil +} + func (sd *spannerDatastore) readTransactionMetadata(ctx context.Context, transactionTag string) (map[string]any, error) { row, err := sd.client.Single().ReadRow(ctx, tableTransactionMetadata, spanner.Key{transactionTag}, []string{colMetadata}) if err != nil { diff --git a/pkg/datastore/datastore.go b/pkg/datastore/datastore.go index 6505db7bd4..943a38fcf3 100644 --- a/pkg/datastore/datastore.go +++ b/pkg/datastore/datastore.go @@ -628,6 +628,11 @@ func (wo WatchOptions) WithCheckpointInterval(interval time.Duration) WatchOptio // ReadOnlyDatastore is an interface for reading relationships from the datastore. type ReadOnlyDatastore interface { + // MetricsID returns an identifier for the datastore for use in metrics. + // This identifier is typically the hostname of the datastore (where applicable) + // and may not be unique; callers should not rely on uniqueness. + MetricsID() (string, error) + // SnapshotReader creates a read-only handle that reads the datastore at the specified revision. // Any errors establishing the reader will be returned by subsequent calls. SnapshotReader(Revision) Reader diff --git a/pkg/datastore/datastore_test.go b/pkg/datastore/datastore_test.go index 0dd7786643..5e7824268f 100644 --- a/pkg/datastore/datastore_test.go +++ b/pkg/datastore/datastore_test.go @@ -586,6 +586,10 @@ func (f fakeDatastore) Unwrap() Datastore { return f.delegate } +func (f fakeDatastore) MetricsID() (string, error) { + return "fake", nil +} + func (f fakeDatastore) SnapshotReader(_ Revision) Reader { return nil }