From 94d024b9247573b5af68462cf0bff19fd83f9d8b Mon Sep 17 00:00:00 2001 From: Joseph Schorr Date: Thu, 6 Feb 2025 12:44:06 -0500 Subject: [PATCH] Have strictreplicated proxy buffer relationships This ensures revision errors raised *during* iteration are also properly handled for fallback While not the best solution, it should work as an interim one until we find a better way, perhaps using cursors --- internal/datastore/proxy/strictreplicated.go | 57 ++++++------------- .../datastore/proxy/strictreplicated_test.go | 17 +----- 2 files changed, 20 insertions(+), 54 deletions(-) diff --git a/internal/datastore/proxy/strictreplicated.go b/internal/datastore/proxy/strictreplicated.go index eae1eb0055..043ac89b1f 100644 --- a/internal/datastore/proxy/strictreplicated.go +++ b/internal/datastore/proxy/strictreplicated.go @@ -13,7 +13,6 @@ import ( "github.com/authzed/spicedb/pkg/datastore" "github.com/authzed/spicedb/pkg/datastore/options" core "github.com/authzed/spicedb/pkg/proto/core/v1" - "github.com/authzed/spicedb/pkg/spiceerrors" "github.com/authzed/spicedb/pkg/tuple" ) @@ -156,48 +155,28 @@ func queryRelationships[F any, O any]( return nil, err } - beforeResultsYielded := true - requiresFallback := false - return func(yield func(tuple.Relationship, error) bool) { - replicaLoop: - for result, err := range it { - if err != nil { - // If the RevisionUnavailableError is returned on the first result, we should fallback - // to the primary. - if errors.As(err, &common.RevisionUnavailableError{}) { - if !beforeResultsYielded { - yield(tuple.Relationship{}, spiceerrors.MustBugf("RevisionUnavailableError should only be returned on the first result")) - return - } - requiresFallback = true - break replicaLoop - } - - if !yield(tuple.Relationship{}, err) { - return - } - continue - } - - beforeResultsYielded = false - if !yield(result, nil) { - return - } - } - - if requiresFallback { + // PG may raise a RevisionUnavailableError if the revision is not available at any time + // during the iteration, which means we cannot simply stream results to the parent + // iterator and still support fallback. Therefore, we conduct a full read of all relationships + // here, and if the iterator is exhausted without error, we return it. If an error is encountered, we + // return the primary as a fallback. + // TODO(jschorr): This is a temporary solution to support fallback. We should consider + // using cursors to support fallback instead. + rels, err := datastore.IteratorToSlice(it) + if err != nil { + if errors.As(err, &common.RevisionUnavailableError{}) { log.Trace().Str("revision", rr.rev.String()).Msg("replica does not contain the requested revision, using primary") strictReadReplicatedFallbackQueryCount.Inc() - pit, err := handler(rr.primary.SnapshotReader(rr.rev))(ctx, filter, options...) - if err != nil { - yield(tuple.Relationship{}, err) + return handler(rr.primary.SnapshotReader(rr.rev))(ctx, filter, options...) + } + return nil, err + } + + return func(yield func(tuple.Relationship, error) bool) { + for _, rel := range rels { + if !yield(rel, nil) { return } - for presult, perr := range pit { - if !yield(presult, perr) { - return - } - } } }, nil } diff --git a/internal/datastore/proxy/strictreplicated_test.go b/internal/datastore/proxy/strictreplicated_test.go index 0994edf864..91902bb8d9 100644 --- a/internal/datastore/proxy/strictreplicated_test.go +++ b/internal/datastore/proxy/strictreplicated_test.go @@ -97,21 +97,8 @@ func TestStrictReplicatedQueryNonFallbackError(t *testing.T) { // Query the replicated, which should return the error. reader := replicated.SnapshotReader(revisionparsing.MustParseRevisionForTest("3")) - iter, err := reader.QueryRelationships(context.Background(), datastore.RelationshipsFilter{ + _, err = reader.QueryRelationships(context.Background(), datastore.RelationshipsFilter{ OptionalResourceType: "resource", }) - require.NoError(t, err) - - relsCollected := 0 - var errFound error - for _, err := range iter { - if err != nil { - errFound = err - } else { - relsCollected++ - } - } - - require.Equal(t, 3, relsCollected) - require.ErrorContains(t, errFound, "raising an expected error") + require.ErrorContains(t, err, "raising an expected error") }