Skip to content

Commit

Permalink
Merge pull request #2229 from josephschorr/replication-metrics
Browse files Browse the repository at this point in the history
Add metrics for usage of the replication proxies in the datastore
  • Loading branch information
josephschorr authored Feb 3, 2025
2 parents f5c4424 + fb56d35 commit abdc17d
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 0 deletions.
22 changes: 22 additions & 0 deletions internal/datastore/proxy/checkingreplicated.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,31 @@ import (
"sync"
"sync/atomic"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

log "github.com/authzed/spicedb/internal/logging"
"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/datastore/options"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
)

var (
checkingReplicatedTotalReaderCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "spicedb",
Subsystem: "datastore_replica",
Name: "checking_replicated_reader_total",
Help: "total number of readers created by the checking replica proxy",
})

checkingReplicatedReplicaReaderCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "spicedb",
Subsystem: "datastore_replica",
Name: "checking_replicated_replica_reader_total",
Help: "number of readers created by the checking replica proxy that are using the replica",
})
)

// NewCheckingReplicatedDatastore creates a new datastore that writes to the provided primary and reads
// from the provided replicas. The replicas are chosen in a round-robin fashion. If a replica does
// not have the requested revision, the primary is used instead.
Expand Down Expand Up @@ -186,6 +205,8 @@ func (rr *checkingStableReader) LookupCounters(ctx context.Context) ([]datastore
func (rr *checkingStableReader) determineSource(ctx context.Context) error {
var finalError error
rr.choose.Do(func() {
checkingReplicatedTotalReaderCount.Inc()

// If the revision is not known to the replica, use the primary instead.
if err := rr.replica.CheckRevision(ctx, rr.rev); err != nil {
var irr datastore.InvalidRevisionError
Expand All @@ -202,6 +223,7 @@ func (rr *checkingStableReader) determineSource(ctx context.Context) error {
}
log.Trace().Str("revision", rr.rev.String()).Msg("replica contains the requested revision")

checkingReplicatedReplicaReaderCount.Inc()
rr.chosenReader = rr.replica.SnapshotReader(rr.rev)
rr.chosePrimaryForTest = false
})
Expand Down
23 changes: 23 additions & 0 deletions internal/datastore/proxy/strictreplicated.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ import (
"errors"
"fmt"

"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/authzed/spicedb/internal/datastore/common"
log "github.com/authzed/spicedb/internal/logging"
"github.com/authzed/spicedb/pkg/datastore"
Expand All @@ -14,6 +17,22 @@ import (
"github.com/authzed/spicedb/pkg/tuple"
)

var (
strictReadReplicatedTotalQueryCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "spicedb",
Subsystem: "datastore_replica",
Name: "strict_replicated_query_total",
Help: "total number of reads made by the strict read replicated datastore",
})

strictReadReplicatedFallbackQueryCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "spicedb",
Subsystem: "datastore_replica",
Name: "strict_replicated_fallback_query_total",
Help: "number of queries that have fallen back to the primary datastore",
})
)

// NewStrictReplicatedDatastore creates a new datastore that writes to the provided primary and reads
// from the provided replicas. The replicas are chosen in a round-robin fashion. If a replica does
// not have the requested revision, the primary is used instead.
Expand Down Expand Up @@ -121,6 +140,8 @@ func queryRelationships[F any, O any](
options []O,
handler func(datastore.Reader) queryHandler[F, O],
) (datastore.RelationshipIterator, error) {
strictReadReplicatedTotalQueryCount.Inc()

sr := rr.replica.SnapshotReader(rr.rev)
it, err := handler(sr)(ctx, filter, options...)
// Check for a RevisionUnavailableError, which indicates the replica does not contain the requested
Expand All @@ -129,6 +150,7 @@ func queryRelationships[F any, O any](
if err != nil {
if errors.As(err, &common.RevisionUnavailableError{}) {
log.Trace().Str("revision", rr.rev.String()).Msg("replica does not contain the requested revision, using primary")
strictReadReplicatedFallbackQueryCount.Inc()
return handler(rr.primary.SnapshotReader(rr.rev))(ctx, filter, options...)
}
return nil, err
Expand Down Expand Up @@ -165,6 +187,7 @@ func queryRelationships[F any, O any](

if requiresFallback {
log.Trace().Str("revision", rr.rev.String()).Msg("replica does not contain the requested revision, using primary")
strictReadReplicatedFallbackQueryCount.Inc()
pit, err := handler(rr.primary.SnapshotReader(rr.rev))(ctx, filter, options...)
if err != nil {
yield(tuple.Relationship{}, err)
Expand Down

0 comments on commit abdc17d

Please sign in to comment.