Skip to content

Commit

Permalink
Merge pull request #2236 from josephschorr/strictreplicated-buffer
Browse files Browse the repository at this point in the history
Have strictreplicated proxy buffer relationships
  • Loading branch information
josephschorr authored Feb 6, 2025
2 parents a042fdd + 94d024b commit f678be6
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 54 deletions.
57 changes: 18 additions & 39 deletions internal/datastore/proxy/strictreplicated.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/authzed/spicedb/pkg/datastore"
"github.com/authzed/spicedb/pkg/datastore/options"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
"github.com/authzed/spicedb/pkg/spiceerrors"
"github.com/authzed/spicedb/pkg/tuple"
)

Expand Down Expand Up @@ -156,48 +155,28 @@ func queryRelationships[F any, O any](
return nil, err
}

beforeResultsYielded := true
requiresFallback := false
return func(yield func(tuple.Relationship, error) bool) {
replicaLoop:
for result, err := range it {
if err != nil {
// If the RevisionUnavailableError is returned on the first result, we should fallback
// to the primary.
if errors.As(err, &common.RevisionUnavailableError{}) {
if !beforeResultsYielded {
yield(tuple.Relationship{}, spiceerrors.MustBugf("RevisionUnavailableError should only be returned on the first result"))
return
}
requiresFallback = true
break replicaLoop
}

if !yield(tuple.Relationship{}, err) {
return
}
continue
}

beforeResultsYielded = false
if !yield(result, nil) {
return
}
}

if requiresFallback {
// PG may raise a RevisionUnavailableError if the revision is not available at any time
// during the iteration, which means we cannot simply stream results to the parent
// iterator and still support fallback. Therefore, we conduct a full read of all relationships
// here, and if the iterator is exhausted without error, we return it. If an error is encountered, we
// return the primary as a fallback.
// TODO(jschorr): This is a temporary solution to support fallback. We should consider
// using cursors to support fallback instead.
rels, err := datastore.IteratorToSlice(it)
if err != nil {
if errors.As(err, &common.RevisionUnavailableError{}) {
log.Trace().Str("revision", rr.rev.String()).Msg("replica does not contain the requested revision, using primary")
strictReadReplicatedFallbackQueryCount.Inc()
pit, err := handler(rr.primary.SnapshotReader(rr.rev))(ctx, filter, options...)
if err != nil {
yield(tuple.Relationship{}, err)
return handler(rr.primary.SnapshotReader(rr.rev))(ctx, filter, options...)
}
return nil, err
}

return func(yield func(tuple.Relationship, error) bool) {
for _, rel := range rels {
if !yield(rel, nil) {
return
}
for presult, perr := range pit {
if !yield(presult, perr) {
return
}
}
}
}, nil
}
Expand Down
17 changes: 2 additions & 15 deletions internal/datastore/proxy/strictreplicated_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,21 +97,8 @@ func TestStrictReplicatedQueryNonFallbackError(t *testing.T) {

// Query the replicated, which should return the error.
reader := replicated.SnapshotReader(revisionparsing.MustParseRevisionForTest("3"))
iter, err := reader.QueryRelationships(context.Background(), datastore.RelationshipsFilter{
_, err = reader.QueryRelationships(context.Background(), datastore.RelationshipsFilter{
OptionalResourceType: "resource",
})
require.NoError(t, err)

relsCollected := 0
var errFound error
for _, err := range iter {
if err != nil {
errFound = err
} else {
relsCollected++
}
}

require.Equal(t, 3, relsCollected)
require.ErrorContains(t, errFound, "raising an expected error")
require.ErrorContains(t, err, "raising an expected error")
}

0 comments on commit f678be6

Please sign in to comment.