Skip to content

Commit

Permalink
Merge pull request #2214 from josephschorr/replicated-query
Browse files Browse the repository at this point in the history
Fixes for replicated query iterator
  • Loading branch information
vroldanbet authored Jan 29, 2025
2 parents b0b3a63 + 2bf945e commit 7466dea
Show file tree
Hide file tree
Showing 4 changed files with 414 additions and 230 deletions.
210 changes: 210 additions & 0 deletions internal/datastore/proxy/checkingreplicated.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
package proxy

import (
"context"
"errors"
"sync"
"sync/atomic"

log "github.com/authzed/spicedb/internal/logging"
"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/datastore/options"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
)

// NewCheckingReplicatedDatastore creates a new datastore that writes to the provided primary and reads
// from the provided replicas. The replicas are chosen in a round-robin fashion. If a replica does
// not have the requested revision, the primary is used instead.
//
// NOTE: Be *very* careful when using this function. It is not safe to use this function without
// knowledge of the layout of the underlying datastore and its replicas.
//
// Replicas will be checked for the requested revision before reading from them, which means that the
// read pool for the replicas *must* point to a *stable* instance of the datastore (not a load balancer).
// That means that *each* replica node in the database must be configured as its own replica to SpiceDB,
// with each URI given distinctly.
func NewCheckingReplicatedDatastore(primary datastore.Datastore, replicas ...datastore.ReadOnlyDatastore) (datastore.Datastore, error) {
if len(replicas) == 0 {
log.Debug().Msg("No replicas provided, using primary as read source")
return primary, nil
}

cachingReplicas := make([]datastore.ReadOnlyDatastore, 0, len(replicas))
for _, replica := range replicas {
cachingReplicas = append(cachingReplicas, newCachedCheckRevision(replica))
}

log.Debug().Int("replica-count", len(replicas)).Msg("Using replicas for reads")
return &checkingReplicatedDatastore{
primary,
cachingReplicas,
0,
}, nil
}

func selectReplica[T any](replicas []T, lastReplica *uint64) T {
if len(replicas) == 1 {
return replicas[0]
}

var swapped bool
var next uint64
for !swapped {
last := *lastReplica
next = (*lastReplica + 1) % uint64(len(replicas))
swapped = atomic.CompareAndSwapUint64(lastReplica, last, next)
}

log.Trace().Uint64("replica", next).Msg("choosing replica for read")
return replicas[next]
}

type checkingReplicatedDatastore struct {
datastore.Datastore
replicas []datastore.ReadOnlyDatastore

lastReplica uint64
}

// SnapshotReader creates a read-only handle that reads the datastore at the specified revision.
// Any errors establishing the reader will be returned by subsequent calls.
func (rd *checkingReplicatedDatastore) SnapshotReader(revision datastore.Revision) datastore.Reader {
replica := selectReplica(rd.replicas, &rd.lastReplica)
return &checkingStableReader{
rev: revision,
replica: replica,
primary: rd.Datastore,
}
}

// checkingStableReader is a reader that will check the replica for the requested revision before
// reading from it. If the replica does not have the requested revision, the primary will be used
// instead. Only supported for a stable replica within each pool.
type checkingStableReader struct {
rev datastore.Revision
replica datastore.ReadOnlyDatastore
primary datastore.Datastore

// chosePrimaryForTest is used for testing to determine if the primary was used for the read.
chosePrimaryForTest bool

chosenReader datastore.Reader
choose sync.Once
}

func (rr *checkingStableReader) ReadCaveatByName(ctx context.Context, name string) (caveat *core.CaveatDefinition, lastWritten datastore.Revision, err error) {
if err := rr.determineSource(ctx); err != nil {
return nil, datastore.NoRevision, err
}

return rr.chosenReader.ReadCaveatByName(ctx, name)
}

func (rr *checkingStableReader) ListAllCaveats(ctx context.Context) ([]datastore.RevisionedCaveat, error) {
if err := rr.determineSource(ctx); err != nil {
return nil, err
}

return rr.chosenReader.ListAllCaveats(ctx)
}

func (rr *checkingStableReader) LookupCaveatsWithNames(ctx context.Context, names []string) ([]datastore.RevisionedCaveat, error) {
if err := rr.determineSource(ctx); err != nil {
return nil, err
}

return rr.chosenReader.LookupCaveatsWithNames(ctx, names)
}

func (rr *checkingStableReader) QueryRelationships(
ctx context.Context,
filter datastore.RelationshipsFilter,
options ...options.QueryOptionsOption,
) (datastore.RelationshipIterator, error) {
if err := rr.determineSource(ctx); err != nil {
return nil, err
}

return rr.chosenReader.QueryRelationships(ctx, filter, options...)
}

func (rr *checkingStableReader) ReverseQueryRelationships(
ctx context.Context,
subjectsFilter datastore.SubjectsFilter,
options ...options.ReverseQueryOptionsOption,
) (datastore.RelationshipIterator, error) {
if err := rr.determineSource(ctx); err != nil {
return nil, err
}

return rr.chosenReader.ReverseQueryRelationships(ctx, subjectsFilter, options...)
}

func (rr *checkingStableReader) ReadNamespaceByName(ctx context.Context, nsName string) (ns *core.NamespaceDefinition, lastWritten datastore.Revision, err error) {
if err := rr.determineSource(ctx); err != nil {
return nil, datastore.NoRevision, err
}

return rr.chosenReader.ReadNamespaceByName(ctx, nsName)
}

func (rr *checkingStableReader) ListAllNamespaces(ctx context.Context) ([]datastore.RevisionedNamespace, error) {
if err := rr.determineSource(ctx); err != nil {
return nil, err
}

return rr.chosenReader.ListAllNamespaces(ctx)
}

func (rr *checkingStableReader) LookupNamespacesWithNames(ctx context.Context, nsNames []string) ([]datastore.RevisionedNamespace, error) {
if err := rr.determineSource(ctx); err != nil {
return nil, err
}

return rr.chosenReader.LookupNamespacesWithNames(ctx, nsNames)
}

func (rr *checkingStableReader) CountRelationships(ctx context.Context, filter string) (int, error) {
if err := rr.determineSource(ctx); err != nil {
return 0, err
}

return rr.chosenReader.CountRelationships(ctx, filter)
}

func (rr *checkingStableReader) LookupCounters(ctx context.Context) ([]datastore.RelationshipCounter, error) {
if err := rr.determineSource(ctx); err != nil {
return nil, err
}

return rr.chosenReader.LookupCounters(ctx)
}

// determineSource will choose the replica or primary to read from based on the revision, by checking
// if the replica contains the revision. If the replica does not contain the revision, the primary
// will be used instead.
func (rr *checkingStableReader) determineSource(ctx context.Context) error {
var finalError error
rr.choose.Do(func() {
// If the revision is not known to the replica, use the primary instead.
if err := rr.replica.CheckRevision(ctx, rr.rev); err != nil {
var irr datastore.InvalidRevisionError
if errors.As(err, &irr) {
if irr.Reason() == datastore.CouldNotDetermineRevision {
log.Trace().Str("revision", rr.rev.String()).Err(err).Msg("replica does not contain the requested revision, using primary")
rr.chosenReader = rr.primary.SnapshotReader(rr.rev)
rr.chosePrimaryForTest = true
return
}
}
finalError = err
return
}
log.Trace().Str("revision", rr.rev.String()).Msg("replica contains the requested revision")

rr.chosenReader = rr.replica.SnapshotReader(rr.rev)
rr.chosePrimaryForTest = false
})

return finalError
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,10 @@ import (
"github.com/authzed/spicedb/pkg/datastore/options"
"github.com/authzed/spicedb/pkg/datastore/revisionparsing"
corev1 "github.com/authzed/spicedb/pkg/proto/core/v1"
"github.com/authzed/spicedb/pkg/tuple"
)

func TestReplicatedReaderWithOnlyPrimary(t *testing.T) {
primary := fakeDatastore{true, revisionparsing.MustParseRevisionForTest("2")}

replicated, err := NewStrictReplicatedDatastore(primary)
require.NoError(t, err)

require.Equal(t, primary, replicated)
}

func TestReplicatedReaderFallsbackToPrimaryOnCheckRevisionFailure(t *testing.T) {
func TestCheckingReplicatedReaderFallsbackToPrimaryOnCheckRevisionFailure(t *testing.T) {
primary := fakeDatastore{true, revisionparsing.MustParseRevisionForTest("2")}
replica := fakeDatastore{false, revisionparsing.MustParseRevisionForTest("1")}

Expand All @@ -47,7 +39,7 @@ func TestReplicatedReaderFallsbackToPrimaryOnCheckRevisionFailure(t *testing.T)
require.True(t, reader.(*checkingStableReader).chosePrimaryForTest)
}

func TestReplicatedReaderFallsbackToPrimaryOnRevisionNotAvailableError(t *testing.T) {
func TestCheckingReplicatedReaderFallsbackToPrimaryOnRevisionNotAvailableError(t *testing.T) {
primary := fakeDatastore{true, revisionparsing.MustParseRevisionForTest("2")}
replica := fakeDatastore{false, revisionparsing.MustParseRevisionForTest("1")}

Expand Down Expand Up @@ -198,12 +190,12 @@ func (fakeSnapshotReader) ListAllNamespaces(context.Context) ([]datastore.Revisi
return nil, nil
}

func (fakeSnapshotReader) QueryRelationships(context.Context, datastore.RelationshipsFilter, ...options.QueryOptionsOption) (datastore.RelationshipIterator, error) {
return nil, fmt.Errorf("not implemented")
func (fsr fakeSnapshotReader) QueryRelationships(context.Context, datastore.RelationshipsFilter, ...options.QueryOptionsOption) (datastore.RelationshipIterator, error) {
return fakeIterator(fsr), nil
}

func (fakeSnapshotReader) ReverseQueryRelationships(context.Context, datastore.SubjectsFilter, ...options.ReverseQueryOptionsOption) (datastore.RelationshipIterator, error) {
return nil, fmt.Errorf("not implemented")
func (fsr fakeSnapshotReader) ReverseQueryRelationships(context.Context, datastore.SubjectsFilter, ...options.ReverseQueryOptionsOption) (datastore.RelationshipIterator, error) {
return fakeIterator(fsr), nil
}

func (fakeSnapshotReader) CountRelationships(ctx context.Context, filter string) (int, error) {
Expand All @@ -213,3 +205,29 @@ func (fakeSnapshotReader) CountRelationships(ctx context.Context, filter string)
func (fakeSnapshotReader) LookupCounters(ctx context.Context) ([]datastore.RelationshipCounter, error) {
return nil, fmt.Errorf("not implemented")
}

func fakeIterator(fsr fakeSnapshotReader) datastore.RelationshipIterator {
return func(yield func(tuple.Relationship, error) bool) {
if fsr.isPrimary {
if !yield(tuple.MustParse("resource:123#viewer@user:tom"), nil) {
return
}
if !yield(tuple.MustParse("resource:456#viewer@user:tom"), nil) {
return
}
return
}

if fsr.revision.GreaterThan(revisionparsing.MustParseRevisionForTest("2")) {
yield(tuple.Relationship{}, common.NewRevisionUnavailableError(fmt.Errorf("revision not available")))
return
}

if !yield(tuple.MustParse("resource:123#viewer@user:tom"), nil) {
return
}
if !yield(tuple.MustParse("resource:456#viewer@user:tom"), nil) {
return
}
}
}
Loading

0 comments on commit 7466dea

Please sign in to comment.