From 46a4f8ba5e4f5c85f0ff2016a58f74b2498ab081 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=ADctor=20Rold=C3=A1n=20Betancort?= Date: Fri, 21 Feb 2025 16:17:15 +0000 Subject: [PATCH 1/3] expand pgSnapshot tests --- internal/datastore/postgres/snapshot.go | 18 +++++++ internal/datastore/postgres/snapshot_test.go | 50 ++++++++++++++++++++ 2 files changed, 68 insertions(+) diff --git a/internal/datastore/postgres/snapshot.go b/internal/datastore/postgres/snapshot.go index 163bd1f63b..7d2f65658d 100644 --- a/internal/datastore/postgres/snapshot.go +++ b/internal/datastore/postgres/snapshot.go @@ -144,6 +144,21 @@ const ( concurrent ) +func (cr comparisonResult) String() string { + switch cr { + case equal: + return "=" + case lt: + return "<" + case gt: + return ">" + case concurrent: + return "~" + default: + return "?" + } +} + // compare will return whether we can definitely assert that one snapshot was // definitively created after, before, at the same time, or was executed // concurrent with another transaction. We assess this based on whether a @@ -151,6 +166,9 @@ const ( // of in-progress transactions. E.g. if one snapshot only sees txids 1 and 3 as // visible but another transaction sees 1-3 as visible, that transaction is // greater. +// example: +// 0:4:2 -> (1,3 visible) +// 0:4:2,3 -> (1 visible) func (s pgSnapshot) compare(rhs pgSnapshot) comparisonResult { rhsHasMoreInfo := rhs.anyTXVisible(s.xmax, s.xipList) lhsHasMoreInfo := s.anyTXVisible(rhs.xmax, rhs.xipList) diff --git a/internal/datastore/postgres/snapshot_test.go b/internal/datastore/postgres/snapshot_test.go index 5969e8af7f..a1dc9e3533 100644 --- a/internal/datastore/postgres/snapshot_test.go +++ b/internal/datastore/postgres/snapshot_test.go @@ -56,6 +56,7 @@ func TestMarkComplete(t *testing.T) { {snap(3, 5, 3), 3, snap(5, 5)}, {snap(0, 0), 5, snap(0, 6, 0, 1, 2, 3, 4)}, {snap(5, 5), 4, snap(5, 5)}, + {snap(3, 5, 4), 5, snap(4, 6, 4)}, } for _, tc := range testCases { @@ -68,6 +69,55 @@ func TestMarkComplete(t *testing.T) { } } +func TestVisible(t *testing.T) { + testCases := []struct { + snapshot pgSnapshot + txID uint64 + visible bool + }{ + {snap(840, 842, 840), 841, true}, + {snap(840, 842, 840), 840, false}, + {snap(840, 842, 840), 842, false}, + {snap(840, 842, 840), 839, true}, + } + + f := func(b bool) string { + if b { + return "visible" + } + return "not visible" + } + + for _, tc := range testCases { + tc := tc + + t.Run(fmt.Sprintf("%d %s %s", tc.txID, f(tc.visible)+" in", tc.snapshot), func(t *testing.T) { + require := require.New(t) + result := tc.snapshot.txVisible(tc.txID) + require.Equal(tc.visible, result, "expected %s but got %s", f(tc.visible), f(result)) + }) + } +} + +func TestCompare(t *testing.T) { + testCases := []struct { + snapshot pgSnapshot + compareWith pgSnapshot + result comparisonResult + }{ + {snap(0, 4, 2), snap(0, 4, 2, 3), gt}, + } + + for _, tc := range testCases { + tc := tc + t.Run(fmt.Sprintf("%s %s %s", tc.snapshot, tc.result, tc.compareWith), func(t *testing.T) { + require := require.New(t) + result := tc.snapshot.compare(tc.compareWith) + require.Equal(tc.result, result, "expected %s got %s", tc.result, result) + }) + } +} + func TestMarkInProgress(t *testing.T) { testCases := []struct { snapshot pgSnapshot From 5b3d384cbbe9bcc5d488bb1360f565110cecb184 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=ADctor=20Rold=C3=A1n=20Betancort?= Date: Wed, 26 Feb 2025 12:22:50 +0000 Subject: [PATCH 2/3] make pg datastore continuously checkpoint using a revision heartbeat alternative implementation to https://github.com/authzed/spicedb/pull/2247 --- internal/datastore/postgres/options.go | 8 + internal/datastore/postgres/postgres.go | 59 ++++- .../postgres/postgres_shared_test.go | 207 ++++++++++++------ internal/datastore/postgres/revisions.go | 15 ++ internal/datastore/postgres/watch.go | 42 +--- pkg/cmd/datastore/datastore.go | 2 + pkg/cmd/datastore/zz_generated.options.go | 9 + pkg/cmd/serve.go | 1 + pkg/cmd/server/server.go | 2 + pkg/cmd/server/zz_generated.options.go | 9 + 10 files changed, 238 insertions(+), 116 deletions(-) diff --git a/internal/datastore/postgres/options.go b/internal/datastore/postgres/options.go index e2b40f08c9..709bc332fc 100644 --- a/internal/datastore/postgres/options.go +++ b/internal/datastore/postgres/options.go @@ -33,6 +33,7 @@ type postgresOptions struct { expirationDisabled bool columnOptimizationOption common.ColumnOptimizationOption includeQueryParametersInTraces bool + revisionHeartbeatEnabled bool migrationPhase string allowedMigrations []string @@ -77,6 +78,7 @@ const ( defaultExpirationDisabled = false // no follower delay by default, it should only be set if using read replicas defaultFollowerReadDelay = 0 + defaultRevisionHeartbeat = false ) // Option provides the facility to configure how clients within the @@ -103,6 +105,7 @@ func generateConfig(options []Option) (postgresOptions, error) { includeQueryParametersInTraces: defaultIncludeQueryParametersInTraces, expirationDisabled: defaultExpirationDisabled, followerReadDelay: defaultFollowerReadDelay, + revisionHeartbeatEnabled: defaultRevisionHeartbeat, } for _, option := range options { @@ -420,3 +423,8 @@ func WithColumnOptimization(isEnabled bool) Option { func WithExpirationDisabled(isDisabled bool) Option { return func(po *postgresOptions) { po.expirationDisabled = isDisabled } } + +// WithRevisionHeartbeat enables the revision heartbeat. +func WithRevisionHeartbeat(isEnabled bool) Option { + return func(po *postgresOptions) { po.revisionHeartbeatEnabled = isEnabled } +} diff --git a/internal/datastore/postgres/postgres.go b/internal/datastore/postgres/postgres.go index d0781529b3..8b189e6174 100644 --- a/internal/datastore/postgres/postgres.go +++ b/internal/datastore/postgres/postgres.go @@ -311,6 +311,15 @@ func newPostgresDatastore( followerReadDelayNanos, ) + revisionHeartbeatQuery := fmt.Sprintf( + insertHeartBeatRevision, + colXID, + tableTransaction, + colTimestamp, + quantizationPeriodNanos, + colSnapshot, + ) + validTransactionQuery := fmt.Sprintf( queryValidTransaction, colXID, @@ -353,12 +362,13 @@ func newPostgresDatastore( watchBufferWriteTimeout: config.watchBufferWriteTimeout, optimizedRevisionQuery: revisionQuery, validTransactionQuery: validTransactionQuery, + revisionHeartbeatQuery: revisionHeartbeatQuery, gcWindow: config.gcWindow, gcInterval: config.gcInterval, gcTimeout: config.gcMaxOperationTime, analyzeBeforeStatistics: config.analyzeBeforeStatistics, watchEnabled: watchEnabled, - gcCtx: gcCtx, + workerCtx: gcCtx, cancelGc: cancelGc, readTxOptions: pgx.TxOptions{IsoLevel: pgx.RepeatableRead, AccessMode: pgx.ReadOnly}, maxRetries: config.maxRetries, @@ -367,6 +377,7 @@ func newPostgresDatastore( inStrictReadMode: config.readStrictMode, filterMaximumIDCount: config.filterMaximumIDCount, schema: *schema, + quantizationPeriodNanos: quantizationPeriodNanos, } if isPrimary && config.readStrictMode { @@ -381,11 +392,17 @@ func newPostgresDatastore( // Start a goroutine for garbage collection. if isPrimary { + datastore.workerGroup, datastore.workerCtx = errgroup.WithContext(datastore.workerCtx) + if config.revisionHeartbeatEnabled { + datastore.workerGroup.Go(func() error { + return datastore.startRevisionHeartbeat(datastore.workerCtx) + }) + } + if datastore.gcInterval > 0*time.Minute && config.gcEnabled { - datastore.gcGroup, datastore.gcCtx = errgroup.WithContext(datastore.gcCtx) - datastore.gcGroup.Go(func() error { + datastore.workerGroup.Go(func() error { return common.StartGarbageCollector( - datastore.gcCtx, + datastore.workerCtx, datastore, datastore.gcInterval, datastore.gcWindow, @@ -410,6 +427,7 @@ type pgDatastore struct { watchBufferWriteTimeout time.Duration optimizedRevisionQuery string validTransactionQuery string + revisionHeartbeatQuery string gcWindow time.Duration gcInterval time.Duration gcTimeout time.Duration @@ -424,11 +442,12 @@ type pgDatastore struct { credentialsProvider datastore.CredentialsProvider - gcGroup *errgroup.Group - gcCtx context.Context - cancelGc context.CancelFunc - gcHasRun atomic.Bool - filterMaximumIDCount uint16 + workerGroup *errgroup.Group + workerCtx context.Context + cancelGc context.CancelFunc + gcHasRun atomic.Bool + filterMaximumIDCount uint16 + quantizationPeriodNanos int64 } func (pgd *pgDatastore) IsStrictReadModeEnabled() bool { @@ -651,8 +670,8 @@ func wrapError(err error) error { func (pgd *pgDatastore) Close() error { pgd.cancelGc() - if pgd.gcGroup != nil { - err := pgd.gcGroup.Wait() + if pgd.workerGroup != nil { + err := pgd.workerGroup.Wait() log.Warn().Err(err).Msg("completed shutdown of postgres datastore") } @@ -721,7 +740,7 @@ func (pgd *pgDatastore) OfflineFeatures() (*datastore.Features, error) { Status: datastore.FeatureUnsupported, }, ContinuousCheckpointing: datastore.Feature{ - Status: datastore.FeatureUnsupported, + Status: datastore.FeatureSupported, }, WatchEmitsImmediately: datastore.Feature{ Status: datastore.FeatureUnsupported, @@ -742,6 +761,22 @@ func (pgd *pgDatastore) OfflineFeatures() (*datastore.Features, error) { }, nil } +func (pgd *pgDatastore) startRevisionHeartbeat(ctx context.Context) error { + tick := time.NewTicker(time.Nanosecond * time.Duration(pgd.quantizationPeriodNanos)) + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-tick.C: + _, err := pgd.writePool.Exec(ctx, pgd.revisionHeartbeatQuery) + if err != nil { + log.Warn().Err(err).Msg("failed to write heartbeat revision") + } + } + } +} + func buildLivingObjectFilterForRevision(revision postgresRevision) queryFilterer { createdBeforeTXN := sq.Expr(fmt.Sprintf( snapshotAlive, diff --git a/internal/datastore/postgres/postgres_shared_test.go b/internal/datastore/postgres/postgres_shared_test.go index e7b19c950e..cf901107a6 100644 --- a/internal/datastore/postgres/postgres_shared_test.go +++ b/internal/datastore/postgres/postgres_shared_test.go @@ -132,6 +132,10 @@ func testPostgresDatastore(t *testing.T, config postgresTestConfig) { QuantizedRevisionTest(t, b) }) + t.Run("OverlappingRevision", func(t *testing.T) { + OverlappingRevisionTest(t, b) + }) + t.Run("WatchNotEnabled", func(t *testing.T) { WatchNotEnabledTest(t, b, config.pgVersion) }) @@ -211,14 +215,14 @@ func testPostgresDatastore(t *testing.T, config postgresTestConfig) { MigrationPhase(config.migrationPhase), )) - t.Run("TestCheckpointsOnOutOfBandChanges", createDatastoreTest( + t.Run("TestContinuousCheckpointTest", createDatastoreTest( b, - CheckpointsOnOutOfBandChangesTest, - RevisionQuantization(0), - GCWindow(1*time.Millisecond), + ContinuousCheckpointTest, + RevisionQuantization(100*time.Millisecond), GCInterval(veryLargeGCInterval), WatchBufferLength(50), MigrationPhase(config.migrationPhase), + WithRevisionHeartbeat(true), )) t.Run("TestSerializationError", createDatastoreTest( @@ -325,6 +329,7 @@ type datastoreTestFunc func(t *testing.T, ds datastore.Datastore) func createDatastoreTest(b testdatastore.RunningEngineForTest, tf datastoreTestFunc, options ...Option) func(*testing.T) { return func(t *testing.T) { + t.Helper() ctx := context.Background() ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore { ds, err := newPostgresDatastore(ctx, uri, primaryInstanceID, options...) @@ -778,33 +783,47 @@ func QuantizedRevisionTest(t *testing.T, b testdatastore.RunningEngineForTest) { []time.Duration{-2 * time.Second}, 1, 0, }, + { + "OldestInWindowIsSelected", + 1 * time.Second, + 0, + []time.Duration{1 * time.Millisecond, 2 * time.Millisecond}, + 1, 1, + }, + { + "ShouldObservePreviousAndCurrent", + 1 * time.Second, + 0, + []time.Duration{-1 * time.Second, 0}, + 2, 0, + }, { "OnlyFutureRevisions", 1 * time.Second, 0, []time.Duration{2 * time.Second}, - 0, 1, + 1, 0, }, { "QuantizedLower", 2 * time.Second, 0, []time.Duration{-4 * time.Second, -1 * time.Nanosecond, 0}, - 1, 2, + 2, 1, }, { "QuantizedRecentWithFollowerReadDelay", 500 * time.Millisecond, 2 * time.Second, []time.Duration{-4 * time.Second, -2 * time.Second, 0}, - 1, 2, + 2, 1, }, { "QuantizedRecentWithoutFollowerReadDelay", 500 * time.Millisecond, 0, []time.Duration{-4 * time.Second, -2 * time.Second, 0}, - 2, 1, + 3, 0, }, { "QuantizationDisabled", @@ -833,7 +852,7 @@ func QuantizedRevisionTest(t *testing.T, b testdatastore.RunningEngineForTest) { ctx, uri, primaryInstanceID, - RevisionQuantization(5*time.Second), + RevisionQuantization(tc.quantization), GCWindow(24*time.Hour), WatchBufferLength(1), FollowerReadDelay(tc.followerReadDelay), @@ -865,37 +884,114 @@ func QuantizedRevisionTest(t *testing.T, b testdatastore.RunningEngineForTest) { } } - queryRevision := fmt.Sprintf( - querySelectRevision, - colXID, - tableTransaction, - colTimestamp, - tc.quantization.Nanoseconds(), - colSnapshot, - tc.followerReadDelay.Nanoseconds(), - ) - - var revision xid8 - var snapshot pgSnapshot - var validFor time.Duration - err = conn.QueryRow(ctx, queryRevision).Scan(&revision, &snapshot, &validFor) + assertRevisionLowerAndHigher(ctx, t, ds, conn, tc.numLower, tc.numHigher) + }) + } +} + +func OverlappingRevisionTest(t *testing.T, b testdatastore.RunningEngineForTest) { + testCases := []struct { + testName string + quantization time.Duration + followerReadDelay time.Duration + revisions []postgresRevision + numLower uint64 + numHigher uint64 + }{ + { // the two revisions are concurrent, and given they are past the quantization window (are way in the past, around unix epoch) + // the function should return a revision snapshot that captures both of them + "ConcurrentRevisions", + 5 * time.Second, + 0, + []postgresRevision{ + {optionalTxID: newXid8(3), snapshot: pgSnapshot{xmin: 1, xmax: 4, xipList: []uint64{2}}, optionalNanosTimestamp: uint64((time.Second * 1) * time.Nanosecond)}, + {optionalTxID: newXid8(2), snapshot: pgSnapshot{xmin: 1, xmax: 4, xipList: []uint64{3}}, optionalNanosTimestamp: uint64((time.Second * 2) * time.Nanosecond)}, + }, + 2, 0, + }, + } + + for _, tc := range testCases { + t.Run(tc.testName, func(t *testing.T) { + require := require.New(t) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + var conn *pgx.Conn + ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore { + var err error + conn, err = pgx.Connect(ctx, uri) + require.NoError(err) + + RegisterTypes(conn.TypeMap()) + + ds, err := newPostgresDatastore( + ctx, + uri, + primaryInstanceID, + RevisionQuantization(tc.quantization), + GCWindow(24*time.Hour), + WatchBufferLength(1), + FollowerReadDelay(tc.followerReadDelay), + ) + require.NoError(err) + + return ds + }) + defer ds.Close() + + // set a random time zone to ensure the queries are unaffected by tz + _, err := conn.Exec(ctx, fmt.Sprintf("SET TIME ZONE -%d", rand.Intn(8)+1)) require.NoError(err) - queryFmt := "SELECT COUNT(%[1]s) FROM %[2]s WHERE pg_visible_in_snapshot(%[1]s, $1) = %[3]s;" - numLowerQuery := fmt.Sprintf(queryFmt, colXID, tableTransaction, "true") - numHigherQuery := fmt.Sprintf(queryFmt, colXID, tableTransaction, "false") + for _, rev := range tc.revisions { + stmt := sq.StatementBuilder.PlaceholderFormat(sq.Dollar) + insertTxn := stmt.Insert(tableTransaction).Columns(colXID, colSnapshot, colTimestamp) - var numLower, numHigher uint64 - require.NoError(conn.QueryRow(ctx, numLowerQuery, snapshot).Scan(&numLower), "%s - %s", revision, snapshot) - require.NoError(conn.QueryRow(ctx, numHigherQuery, snapshot).Scan(&numHigher), "%s - %s", revision, snapshot) + ts := time.Unix(0, int64(rev.optionalNanosTimestamp)) + sql, args, err := insertTxn.Values(rev.optionalTxID, rev.snapshot, ts).ToSql() + require.NoError(err) - // Subtract one from numLower because of the artificially injected first transaction row - require.Equal(tc.numLower, numLower-1) - require.Equal(tc.numHigher, numHigher) + _, err = conn.Exec(ctx, sql, args...) + require.NoError(err) + } + + assertRevisionLowerAndHigher(ctx, t, ds, conn, tc.numLower, tc.numHigher) }) } } +func assertRevisionLowerAndHigher(ctx context.Context, t *testing.T, ds datastore.Datastore, conn *pgx.Conn, + expectedNumLower, expectedNumHigher uint64, +) { + t.Helper() + + var revision xid8 + var snapshot pgSnapshot + pgDS, ok := ds.(*pgDatastore) + require.True(t, ok) + rev, _, err := pgDS.optimizedRevisionFunc(ctx) + require.NoError(t, err) + + pgRev, ok := rev.(postgresRevision) + require.True(t, ok) + revision = pgRev.optionalTxID + require.NotNil(t, revision) + snapshot = pgRev.snapshot + + queryFmt := "SELECT COUNT(%[1]s) FROM %[2]s WHERE pg_visible_in_snapshot(%[1]s, $1) = %[3]s;" + numLowerQuery := fmt.Sprintf(queryFmt, colXID, tableTransaction, "true") + numHigherQuery := fmt.Sprintf(queryFmt, colXID, tableTransaction, "false") + + var numLower, numHigher uint64 + require.NoError(t, conn.QueryRow(ctx, numLowerQuery, snapshot).Scan(&numLower), "%s - %s", revision, snapshot) + require.NoError(t, conn.QueryRow(ctx, numHigherQuery, snapshot).Scan(&numHigher), "%s - %s", revision, snapshot) + + // Subtract one from numLower because of the artificially injected first transaction row + require.Equal(t, expectedNumLower, numLower-1, "incorrect number of revisions visible to snapshot, expected %d, got %d", expectedNumLower, numLower-1) + require.Equal(t, expectedNumHigher, numHigher, "incorrect number of revisions invisible to snapshot, expected %d, got %d", expectedNumHigher, numHigher) +} + // ConcurrentRevisionHeadTest uses goroutines and channels to intentionally set up a pair of // revisions that are concurrently applied and then ensures a call to HeadRevision reflects // the changes found in *both* revisions. @@ -1847,7 +1943,7 @@ func RevisionTimestampAndTransactionIDTest(t *testing.T, ds datastore.Datastore) } } -func CheckpointsOnOutOfBandChangesTest(t *testing.T, ds datastore.Datastore) { +func ContinuousCheckpointTest(t *testing.T, ds datastore.Datastore) { require := require.New(t) ctx, cancel := context.WithCancel(context.Background()) @@ -1858,59 +1954,38 @@ func CheckpointsOnOutOfBandChangesTest(t *testing.T, ds datastore.Datastore) { // Run the watch API. changes, errchan := ds.Watch(ctx, lowestRevision, datastore.WatchOptions{ - Content: datastore.WatchRelationships | datastore.WatchSchema | datastore.WatchCheckpoints, - // loop quickly over the changes to make sure we don't re-issue checkpoints - CheckpointInterval: 10 * time.Nanosecond, + Content: datastore.WatchCheckpoints, + CheckpointInterval: 100 * time.Millisecond, }) require.Zero(len(errchan)) - // Make the current snapshot move - pds := ds.(*pgDatastore) - tx, err := pds.writePool.BeginTx(ctx, pgx.TxOptions{IsoLevel: pgx.Serializable}) - require.NoError(err) - _, err = tx.Exec(ctx, "LOCK pg_class;") - require.NoError(err) - require.NoError(tx.Commit(ctx)) - - newRevision, err := ds.HeadRevision(ctx) - require.NoError(err) - require.True(newRevision.GreaterThan(lowestRevision)) - - awaitManyCheckpoints := time.NewTimer(1 * time.Second) - checkpointCount := make(map[string]int) + var checkpointCount int for { changeWait := time.NewTimer(waitForChangesTimeout) select { case change, ok := <-changes: if !ok { - for _, count := range checkpointCount { - require.Equal(1, count, "found duplicated checkpoint revision event") - } + require.GreaterOrEqual(checkpointCount, 10, "expected at least 5 checkpoints") return } if change.IsCheckpoint { - if change.Revision.Equal(newRevision) || change.Revision.GreaterThan(newRevision) { - checkpointCount[change.Revision.String()] = checkpointCount[change.Revision.String()] + 1 + if change.Revision.GreaterThan(lowestRevision) { + checkpointCount++ + lowestRevision = change.Revision + } + + if checkpointCount >= 10 { + return } } time.Sleep(10 * time.Millisecond) case <-changeWait.C: require.Fail("timed out waiting for checkpoint for out of band change") - // we want to make sure checkpoints are not reissued when moving out-of-band, so with a short poll interval - // we wait a bit before checking if we received checkpoints, - case <-awaitManyCheckpoints.C: - if len(checkpointCount) > 0 { - for _, count := range checkpointCount { - require.Equal(1, count, "found duplicated checkpoint revision event") - } - - return - } } } } -const waitForChangesTimeout = 30 * time.Second +const waitForChangesTimeout = 10 * time.Second diff --git a/internal/datastore/postgres/revisions.go b/internal/datastore/postgres/revisions.go index 850d7aa52b..3ef550f927 100644 --- a/internal/datastore/postgres/revisions.go +++ b/internal/datastore/postgres/revisions.go @@ -23,6 +23,21 @@ const ( errCheckRevision = "unable to check revision: %w" errRevisionFormat = "invalid revision format: %w" + // %[1] Name of xid column + // %[2] Relationship tuple transaction table + // %[3] Name of timestamp column + // %[4] Quantization period (in nanoseconds) + // %[5] Name of snapshot column + insertHeartBeatRevision = ` + INSERT INTO %[2]s (%[1]s, %[5]s) + SELECT pg_current_xact_id(), pg_current_snapshot() + WHERE NOT EXISTS ( + SELECT 1 + FROM %[2]s rtt + WHERE rtt.%[3]s >= TO_TIMESTAMP(FLOOR((EXTRACT(EPOCH FROM NOW() AT TIME ZONE 'utc') * 1000000000)/ %[4]d) * %[4]d / 1000000000) AT TIME ZONE 'utc' + LIMIT 1 + );` + // querySelectRevision will round the database's timestamp down to the nearest // quantization period, and then find the first transaction (and its active xmin) // after that. If there are no transactions newer than the quantization period, diff --git a/internal/datastore/postgres/watch.go b/internal/datastore/postgres/watch.go index 94bf2a8f12..0f6556ba2d 100644 --- a/internal/datastore/postgres/watch.go +++ b/internal/datastore/postgres/watch.go @@ -125,7 +125,7 @@ func (pgd *pgDatastore) Watch( currentTxn := afterRevision requestedCheckpoints := options.Content&datastore.WatchCheckpoints == datastore.WatchCheckpoints for { - newTxns, optionalHeadRevision, err := pgd.getNewRevisions(ctx, currentTxn, requestedCheckpoints) + newTxns, err := pgd.getNewRevisions(ctx, currentTxn) if err != nil { if errors.Is(ctx.Err(), context.Canceled) { errs <- datastore.NewWatchCanceledErr() @@ -187,31 +187,6 @@ func (pgd *pgDatastore) Watch( } } } else { - // PG head revision could move outside of changes (e.g. VACUUM ANALYZE), and we still need to - // send a checkpoint to the client, as could have determined also via Head that changes exist and - // called Watch API - // - // we need to compute the head revision in the same transaction where we determine any new spicedb, - // transactions, otherwise there could be a race that means we issue a checkpoint before we issue - // the corresponding changes. - if requestedCheckpoints { - if optionalHeadRevision == nil { - errs <- spiceerrors.MustBugf("expected to have an optional head revision") - return - } - - if optionalHeadRevision.GreaterThan(currentTxn) { - if !sendChange(datastore.RevisionChanges{ - Revision: *optionalHeadRevision, - IsCheckpoint: true, - }) { - return - } - - currentTxn = *optionalHeadRevision - } - } - select { case <-time.NewTimer(watchSleep).C: break @@ -226,18 +201,9 @@ func (pgd *pgDatastore) Watch( return updates, errs } -func (pgd *pgDatastore) getNewRevisions(ctx context.Context, afterTX postgresRevision, returnHeadRevision bool) ([]postgresRevision, *postgresRevision, error) { +func (pgd *pgDatastore) getNewRevisions(ctx context.Context, afterTX postgresRevision) ([]postgresRevision, error) { var ids []postgresRevision - var optionalHeadRevision *postgresRevision - var err error if err := pgx.BeginTxFunc(ctx, pgd.readPool, pgx.TxOptions{IsoLevel: pgx.RepeatableRead}, func(tx pgx.Tx) error { - if returnHeadRevision { - optionalHeadRevision, err = pgd.getHeadRevision(ctx, tx) - if err != nil { - return fmt.Errorf("unable to get head revision: %w", err) - } - } - rows, err := tx.Query(ctx, newRevisionsQuery, afterTX.snapshot) if err != nil { return fmt.Errorf("unable to load new revisions: %w", err) @@ -270,10 +236,10 @@ func (pgd *pgDatastore) getNewRevisions(ctx context.Context, afterTX postgresRev } return nil }); err != nil { - return nil, optionalHeadRevision, fmt.Errorf("transaction error: %w", err) + return nil, fmt.Errorf("transaction error: %w", err) } - return ids, optionalHeadRevision, nil + return ids, nil } func (pgd *pgDatastore) loadChanges(ctx context.Context, revisions []postgresRevision, options datastore.WatchOptions) ([]datastore.RevisionChanges, error) { diff --git a/pkg/cmd/datastore/datastore.go b/pkg/cmd/datastore/datastore.go index e7357e65d9..f8feb30c4c 100644 --- a/pkg/cmd/datastore/datastore.go +++ b/pkg/cmd/datastore/datastore.go @@ -174,6 +174,7 @@ type Config struct { // Expermimental ExperimentalColumnOptimization bool `debugmap:"visible"` EnableExperimentalRelationshipExpiration bool `debugmap:"visible"` + EnableExperimentalRevisionHeartbeat bool `debugmap:"visible"` } //go:generate go run github.com/ecordell/optgen -sensitive-field-name-matches uri,secure -output zz_generated.relintegritykey.options.go . RelIntegrityKey @@ -627,6 +628,7 @@ func newPostgresPrimaryDatastore(ctx context.Context, opts Config) (datastore.Da postgres.WatchBufferWriteTimeout(opts.WatchBufferWriteTimeout), postgres.MigrationPhase(opts.MigrationPhase), postgres.AllowedMigrations(opts.AllowedMigrations), + postgres.WithRevisionHeartbeat(opts.EnableExperimentalRevisionHeartbeat), } commonOptions, err := commonPostgresDatastoreOptions(opts) diff --git a/pkg/cmd/datastore/zz_generated.options.go b/pkg/cmd/datastore/zz_generated.options.go index 656419a7bc..4d50e9c749 100644 --- a/pkg/cmd/datastore/zz_generated.options.go +++ b/pkg/cmd/datastore/zz_generated.options.go @@ -81,6 +81,7 @@ func (c *Config) ToOption() ConfigOption { to.AllowedMigrations = c.AllowedMigrations to.ExperimentalColumnOptimization = c.ExperimentalColumnOptimization to.EnableExperimentalRelationshipExpiration = c.EnableExperimentalRelationshipExpiration + to.EnableExperimentalRevisionHeartbeat = c.EnableExperimentalRevisionHeartbeat } } @@ -136,6 +137,7 @@ func (c Config) DebugMap() map[string]any { debugMap["AllowedMigrations"] = helpers.DebugValue(c.AllowedMigrations, false) debugMap["ExperimentalColumnOptimization"] = helpers.DebugValue(c.ExperimentalColumnOptimization, false) debugMap["EnableExperimentalRelationshipExpiration"] = helpers.DebugValue(c.EnableExperimentalRelationshipExpiration, false) + debugMap["EnableExperimentalRevisionHeartbeat"] = helpers.DebugValue(c.EnableExperimentalRevisionHeartbeat, false) return debugMap } @@ -546,3 +548,10 @@ func WithEnableExperimentalRelationshipExpiration(enableExperimentalRelationship c.EnableExperimentalRelationshipExpiration = enableExperimentalRelationshipExpiration } } + +// WithEnableExperimentalRevisionHeartbeat returns an option that can set EnableExperimentalRevisionHeartbeat on a Config +func WithEnableExperimentalRevisionHeartbeat(enableExperimentalRevisionHeartbeat bool) ConfigOption { + return func(c *Config) { + c.EnableExperimentalRevisionHeartbeat = enableExperimentalRevisionHeartbeat + } +} diff --git a/pkg/cmd/serve.go b/pkg/cmd/serve.go index 97d5a613cb..d900686e37 100644 --- a/pkg/cmd/serve.go +++ b/pkg/cmd/serve.go @@ -169,6 +169,7 @@ func RegisterServeFlags(cmd *cobra.Command, config *server.Config) error { Msg("The old implementation of LookupResources is no longer available, and a `false` value is no longer valid. Please remove this flag.") } + experimentalFlags.BoolVar(&config.EnableExperimentalRevisionHeartbeat, "enable-experimental-revision-heartbeat", false, "enables experimental support for postgres revision heartbeat, used to create a synthetic revision on a given interval (postgres only)") experimentalFlags.BoolVar(&config.EnableExperimentalRelationshipExpiration, "enable-experimental-relationship-expiration", false, "enables experimental support for first-class relationship expiration") experimentalFlags.BoolVar(&config.EnableExperimentalWatchableSchemaCache, "enable-experimental-watchable-schema-cache", false, "enables the experimental schema cache which makes use of the Watch API for automatic updates") // TODO: these two could reasonably be put in either the Dispatch group or the Experimental group. Is there a preference? diff --git a/pkg/cmd/server/server.go b/pkg/cmd/server/server.go index 3ded6efb7a..5d0e2bf109 100644 --- a/pkg/cmd/server/server.go +++ b/pkg/cmd/server/server.go @@ -121,6 +121,7 @@ type Config struct { MaxBulkExportRelationshipsLimit uint32 `debugmap:"visible"` EnableExperimentalLookupResources bool `debugmap:"visible"` EnableExperimentalRelationshipExpiration bool `debugmap:"visible"` + EnableExperimentalRevisionHeartbeat bool `debugmap:"visible"` // Additional Services MetricsAPI util.HTTPServerConfig `debugmap:"visible"` @@ -228,6 +229,7 @@ func (c *Config) Complete(ctx context.Context) (RunnableServer, error) { // are at most the number of elements returned from a datastore query datastorecfg.WithFilterMaximumIDCount(c.DispatchChunkSize), datastorecfg.WithEnableExperimentalRelationshipExpiration(c.EnableExperimentalRelationshipExpiration), + datastorecfg.WithEnableExperimentalRevisionHeartbeat(c.EnableExperimentalRevisionHeartbeat), ) if err != nil { return nil, spiceerrors.NewTerminationErrorBuilder(fmt.Errorf("failed to create datastore: %w", err)). diff --git a/pkg/cmd/server/zz_generated.options.go b/pkg/cmd/server/zz_generated.options.go index 8e2dfaddef..ae035d786a 100644 --- a/pkg/cmd/server/zz_generated.options.go +++ b/pkg/cmd/server/zz_generated.options.go @@ -89,6 +89,7 @@ func (c *Config) ToOption() ConfigOption { to.MaxBulkExportRelationshipsLimit = c.MaxBulkExportRelationshipsLimit to.EnableExperimentalLookupResources = c.EnableExperimentalLookupResources to.EnableExperimentalRelationshipExpiration = c.EnableExperimentalRelationshipExpiration + to.EnableExperimentalRevisionHeartbeat = c.EnableExperimentalRevisionHeartbeat to.MetricsAPI = c.MetricsAPI to.UnaryMiddlewareModification = c.UnaryMiddlewareModification to.StreamingMiddlewareModification = c.StreamingMiddlewareModification @@ -158,6 +159,7 @@ func (c Config) DebugMap() map[string]any { debugMap["MaxBulkExportRelationshipsLimit"] = helpers.DebugValue(c.MaxBulkExportRelationshipsLimit, false) debugMap["EnableExperimentalLookupResources"] = helpers.DebugValue(c.EnableExperimentalLookupResources, false) debugMap["EnableExperimentalRelationshipExpiration"] = helpers.DebugValue(c.EnableExperimentalRelationshipExpiration, false) + debugMap["EnableExperimentalRevisionHeartbeat"] = helpers.DebugValue(c.EnableExperimentalRevisionHeartbeat, false) debugMap["MetricsAPI"] = helpers.DebugValue(c.MetricsAPI, false) debugMap["SilentlyDisableTelemetry"] = helpers.DebugValue(c.SilentlyDisableTelemetry, false) debugMap["TelemetryCAOverridePath"] = helpers.DebugValue(c.TelemetryCAOverridePath, false) @@ -570,6 +572,13 @@ func WithEnableExperimentalRelationshipExpiration(enableExperimentalRelationship } } +// WithEnableExperimentalRevisionHeartbeat returns an option that can set EnableExperimentalRevisionHeartbeat on a Config +func WithEnableExperimentalRevisionHeartbeat(enableExperimentalRevisionHeartbeat bool) ConfigOption { + return func(c *Config) { + c.EnableExperimentalRevisionHeartbeat = enableExperimentalRevisionHeartbeat + } +} + // WithMetricsAPI returns an option that can set MetricsAPI on a Config func WithMetricsAPI(metricsAPI util.HTTPServerConfig) ConfigOption { return func(c *Config) { From 478c1f298652d55949bb9c4016f51d0796a9f450 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?V=C3=ADctor=20Rold=C3=A1n=20Betancort?= Date: Wed, 26 Feb 2025 13:33:39 +0000 Subject: [PATCH 3/3] add heartbeat leader election so that multiple instances are not trying to insert into the transactions table, increasing contention --- internal/datastore/postgres/locks.go | 3 +++ internal/datastore/postgres/postgres.go | 34 ++++++++++++++++++++++++- 2 files changed, 36 insertions(+), 1 deletion(-) diff --git a/internal/datastore/postgres/locks.go b/internal/datastore/postgres/locks.go index 497385cf49..d757076c4b 100644 --- a/internal/datastore/postgres/locks.go +++ b/internal/datastore/postgres/locks.go @@ -11,6 +11,9 @@ type lockID uint32 const ( // gcRunLock is the lock ID for the garbage collection run. gcRunLock lockID = 1 + + // revisionHeartbeatLock is the lock ID for the leader that will generate the heartbeat revisions. + revisionHeartbeatLock lockID = 2 ) func (pgd *pgDatastore) tryAcquireLock(ctx context.Context, lockID lockID) (bool, error) { diff --git a/internal/datastore/postgres/postgres.go b/internal/datastore/postgres/postgres.go index 8b189e6174..150bc90096 100644 --- a/internal/datastore/postgres/postgres.go +++ b/internal/datastore/postgres/postgres.go @@ -5,6 +5,7 @@ import ( dbsql "database/sql" "errors" "fmt" + "math/rand/v2" "os" "strconv" "strings" @@ -761,8 +762,39 @@ func (pgd *pgDatastore) OfflineFeatures() (*datastore.Features, error) { }, nil } +const defaultMaxHeartbeatLeaderJitterPercent = 10 + func (pgd *pgDatastore) startRevisionHeartbeat(ctx context.Context) error { - tick := time.NewTicker(time.Nanosecond * time.Duration(pgd.quantizationPeriodNanos)) + heartbeatDuration := time.Nanosecond * time.Duration(pgd.quantizationPeriodNanos) + log.Info().Stringer("interval", heartbeatDuration).Msg("starting revision heartbeat") + tick := time.NewTicker(heartbeatDuration) + + // Leader election. Continue trying to acquire in case the current leader died. + for { + if ctx.Err() != nil { + return ctx.Err() + } + + ok, err := pgd.tryAcquireLock(ctx, revisionHeartbeatLock) + if err != nil { + log.Warn().Err(err).Msg("failed to acquire revision heartbeat lock") + } + + if ok { + break + } + + jitter := time.Duration(float64(heartbeatDuration) * rand.Float64() * defaultMaxHeartbeatLeaderJitterPercent / 100) // nolint:gosec + time.Sleep(jitter) + } + + defer func() { + if err := pgd.releaseLock(ctx, revisionHeartbeatLock); err != nil { + log.Warn().Err(err).Msg("failed to release revision heartbeat lock") + } + }() + + log.Info().Stringer("interval", heartbeatDuration).Msg("got elected revision heartbeat leader, starting") for { select {