diff --git a/Dockerfile b/Dockerfile index 46d02b8cb9..59bd710801 100644 --- a/Dockerfile +++ b/Dockerfile @@ -7,9 +7,9 @@ RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/g FROM golang:1.23.6-alpine3.20 AS health-probe-builder WORKDIR /go/src/app RUN apk update && apk add --no-cache git -RUN git clone https://github.com/authzed/grpc-health-probe.git +RUN git clone https://github.com/grpc-ecosystem/grpc-health-probe.git WORKDIR /go/src/app/grpc-health-probe -RUN git checkout master +RUN git checkout 86f0bc2dea67fda67e9a060689bc8bb7a19ebe44 RUN CGO_ENABLED=0 go install -a -tags netgo -ldflags=-w FROM cgr.dev/chainguard/static:latest diff --git a/Dockerfile.release b/Dockerfile.release index 7c8014d2ff..0fade15e15 100644 --- a/Dockerfile.release +++ b/Dockerfile.release @@ -4,9 +4,9 @@ ARG BASE=cgr.dev/chainguard/static:latest FROM golang:1.23.6-alpine3.20 AS health-probe-builder WORKDIR /go/src/app RUN apk update && apk add --no-cache git -RUN git clone https://github.com/authzed/grpc-health-probe.git +RUN git clone https://github.com/grpc-ecosystem/grpc-health-probe.git WORKDIR /go/src/app/grpc-health-probe -RUN git checkout master +RUN git checkout 86f0bc2dea67fda67e9a060689bc8bb7a19ebe44 RUN CGO_ENABLED=0 go install -a -tags netgo -ldflags=-w FROM $BASE diff --git a/internal/datastore/postgres/postgres.go b/internal/datastore/postgres/postgres.go index d0781529b3..33ff331652 100644 --- a/internal/datastore/postgres/postgres.go +++ b/internal/datastore/postgres/postgres.go @@ -721,7 +721,7 @@ func (pgd *pgDatastore) OfflineFeatures() (*datastore.Features, error) { Status: datastore.FeatureUnsupported, }, ContinuousCheckpointing: datastore.Feature{ - Status: datastore.FeatureUnsupported, + Status: datastore.FeatureSupported, }, WatchEmitsImmediately: datastore.Feature{ Status: datastore.FeatureUnsupported, diff --git a/internal/datastore/postgres/postgres_shared_test.go b/internal/datastore/postgres/postgres_shared_test.go index e7b19c950e..17e1a6cfe8 100644 --- a/internal/datastore/postgres/postgres_shared_test.go +++ b/internal/datastore/postgres/postgres_shared_test.go @@ -132,6 +132,10 @@ func testPostgresDatastore(t *testing.T, config postgresTestConfig) { QuantizedRevisionTest(t, b) }) + t.Run("OverlappingRevision", func(t *testing.T) { + OverlappingRevisionTest(t, b) + }) + t.Run("WatchNotEnabled", func(t *testing.T) { WatchNotEnabledTest(t, b, config.pgVersion) }) @@ -778,33 +782,47 @@ func QuantizedRevisionTest(t *testing.T, b testdatastore.RunningEngineForTest) { []time.Duration{-2 * time.Second}, 1, 0, }, + { + "OldestInWindowIsSelected", + 1 * time.Second, + 0, + []time.Duration{1 * time.Millisecond, 2 * time.Millisecond}, + 1, 1, + }, + { + "ShouldObservePreviousAndCurrent", + 1 * time.Second, + 0, + []time.Duration{-1 * time.Second, 0}, + 2, 0, + }, { "OnlyFutureRevisions", 1 * time.Second, 0, []time.Duration{2 * time.Second}, - 0, 1, + 1, 0, }, { "QuantizedLower", 2 * time.Second, 0, []time.Duration{-4 * time.Second, -1 * time.Nanosecond, 0}, - 1, 2, + 2, 1, }, { "QuantizedRecentWithFollowerReadDelay", 500 * time.Millisecond, 2 * time.Second, []time.Duration{-4 * time.Second, -2 * time.Second, 0}, - 1, 2, + 2, 1, }, { "QuantizedRecentWithoutFollowerReadDelay", 500 * time.Millisecond, 0, []time.Duration{-4 * time.Second, -2 * time.Second, 0}, - 2, 1, + 3, 0, }, { "QuantizationDisabled", @@ -833,7 +851,7 @@ func QuantizedRevisionTest(t *testing.T, b testdatastore.RunningEngineForTest) { ctx, uri, primaryInstanceID, - RevisionQuantization(5*time.Second), + RevisionQuantization(tc.quantization), GCWindow(24*time.Hour), WatchBufferLength(1), FollowerReadDelay(tc.followerReadDelay), @@ -865,37 +883,114 @@ func QuantizedRevisionTest(t *testing.T, b testdatastore.RunningEngineForTest) { } } - queryRevision := fmt.Sprintf( - querySelectRevision, - colXID, - tableTransaction, - colTimestamp, - tc.quantization.Nanoseconds(), - colSnapshot, - tc.followerReadDelay.Nanoseconds(), - ) - - var revision xid8 - var snapshot pgSnapshot - var validFor time.Duration - err = conn.QueryRow(ctx, queryRevision).Scan(&revision, &snapshot, &validFor) + assertRevisionLowerAndHigher(ctx, t, ds, conn, tc.numLower, tc.numHigher) + }) + } +} + +func OverlappingRevisionTest(t *testing.T, b testdatastore.RunningEngineForTest) { + testCases := []struct { + testName string + quantization time.Duration + followerReadDelay time.Duration + revisions []postgresRevision + numLower uint64 + numHigher uint64 + }{ + { // the two revisions are concurrent, and given they are past the quantization window (are way in the past, around unix epoch) + // the function should return a revision snapshot that captures both of them + "ConcurrentRevisions", + 5 * time.Second, + 0, + []postgresRevision{ + {optionalTxID: newXid8(3), snapshot: pgSnapshot{xmin: 1, xmax: 4, xipList: []uint64{2}}, optionalNanosTimestamp: uint64((time.Second * 1) * time.Nanosecond)}, + {optionalTxID: newXid8(2), snapshot: pgSnapshot{xmin: 1, xmax: 4, xipList: []uint64{3}}, optionalNanosTimestamp: uint64((time.Second * 2) * time.Nanosecond)}, + }, + 2, 0, + }, + } + + for _, tc := range testCases { + t.Run(tc.testName, func(t *testing.T) { + require := require.New(t) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + var conn *pgx.Conn + ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore { + var err error + conn, err = pgx.Connect(ctx, uri) + require.NoError(err) + + RegisterTypes(conn.TypeMap()) + + ds, err := newPostgresDatastore( + ctx, + uri, + primaryInstanceID, + RevisionQuantization(tc.quantization), + GCWindow(24*time.Hour), + WatchBufferLength(1), + FollowerReadDelay(tc.followerReadDelay), + ) + require.NoError(err) + + return ds + }) + defer ds.Close() + + // set a random time zone to ensure the queries are unaffected by tz + _, err := conn.Exec(ctx, fmt.Sprintf("SET TIME ZONE -%d", rand.Intn(8)+1)) require.NoError(err) - queryFmt := "SELECT COUNT(%[1]s) FROM %[2]s WHERE pg_visible_in_snapshot(%[1]s, $1) = %[3]s;" - numLowerQuery := fmt.Sprintf(queryFmt, colXID, tableTransaction, "true") - numHigherQuery := fmt.Sprintf(queryFmt, colXID, tableTransaction, "false") + for _, rev := range tc.revisions { + stmt := sq.StatementBuilder.PlaceholderFormat(sq.Dollar) + insertTxn := stmt.Insert(tableTransaction).Columns(colXID, colSnapshot, colTimestamp) + + ts := time.Unix(0, int64(rev.optionalNanosTimestamp)) + sql, args, err := insertTxn.Values(rev.optionalTxID, rev.snapshot, ts).ToSql() + require.NoError(err) - var numLower, numHigher uint64 - require.NoError(conn.QueryRow(ctx, numLowerQuery, snapshot).Scan(&numLower), "%s - %s", revision, snapshot) - require.NoError(conn.QueryRow(ctx, numHigherQuery, snapshot).Scan(&numHigher), "%s - %s", revision, snapshot) + _, err = conn.Exec(ctx, sql, args...) + require.NoError(err) + } - // Subtract one from numLower because of the artificially injected first transaction row - require.Equal(tc.numLower, numLower-1) - require.Equal(tc.numHigher, numHigher) + assertRevisionLowerAndHigher(ctx, t, ds, conn, tc.numLower, tc.numHigher) }) } } +func assertRevisionLowerAndHigher(ctx context.Context, t *testing.T, ds datastore.Datastore, conn *pgx.Conn, + expectedNumLower, expectedNumHigher uint64, +) { + t.Helper() + + var revision xid8 + var snapshot pgSnapshot + pgDS, ok := ds.(*pgDatastore) + require.True(t, ok) + rev, _, err := pgDS.optimizedRevisionFunc(ctx) + require.NoError(t, err) + + pgRev, ok := rev.(postgresRevision) + require.True(t, ok) + revision = pgRev.optionalTxID + require.NotNil(t, revision) + snapshot = pgRev.snapshot + + queryFmt := "SELECT COUNT(%[1]s) FROM %[2]s WHERE pg_visible_in_snapshot(%[1]s, $1) = %[3]s;" + numLowerQuery := fmt.Sprintf(queryFmt, colXID, tableTransaction, "true") + numHigherQuery := fmt.Sprintf(queryFmt, colXID, tableTransaction, "false") + + var numLower, numHigher uint64 + require.NoError(t, conn.QueryRow(ctx, numLowerQuery, snapshot).Scan(&numLower), "%s - %s", revision, snapshot) + require.NoError(t, conn.QueryRow(ctx, numHigherQuery, snapshot).Scan(&numHigher), "%s - %s", revision, snapshot) + + // Subtract one from numLower because of the artificially injected first transaction row + require.Equal(t, expectedNumLower, numLower-1, "incorrect number of revisions visible to snapshot, expected %d, got %d", expectedNumLower, numLower-1) + require.Equal(t, expectedNumHigher, numHigher, "incorrect number of revisions invisible to snapshot, expected %d, got %d", expectedNumHigher, numHigher) +} + // ConcurrentRevisionHeadTest uses goroutines and channels to intentionally set up a pair of // revisions that are concurrently applied and then ensures a call to HeadRevision reflects // the changes found in *both* revisions. diff --git a/internal/datastore/postgres/revisions.go b/internal/datastore/postgres/revisions.go index 850d7aa52b..b81fc41417 100644 --- a/internal/datastore/postgres/revisions.go +++ b/internal/datastore/postgres/revisions.go @@ -5,6 +5,7 @@ import ( "encoding/base64" "errors" "fmt" + "math" "strconv" "strings" "time" @@ -26,8 +27,14 @@ const ( // querySelectRevision will round the database's timestamp down to the nearest // quantization period, and then find the first transaction (and its active xmin) // after that. If there are no transactions newer than the quantization period, - // it just picks the latest transaction. It will also return the amount of - // nanoseconds until the next optimized revision would be selected server-side, + // it will return all the transaction IDs and snapshots available in the last quantization window, so that + // the application derives a snapshot that includes all transactions in the last window, and thus guarantee + // all revisions from the previous transaction are observed. + // + // It avoids determining the high-watermark snapshot using pg_current_snapshot(), as it may move out of band + // (VACUUM, other workloads in the same database), which causes problems to cache quantization. + // + // It will also return the amount of nanoseconds until the next optimized revision would be selected server-side, // for use with caching. // // %[1] Name of xid column @@ -37,13 +44,16 @@ const ( // %[5] Name of snapshot column // %[6] Follower read delay (in nanoseconds) querySelectRevision = ` - WITH selected AS (SELECT ( - (SELECT %[1]s FROM %[2]s WHERE %[3]s >= TO_TIMESTAMP(FLOOR((EXTRACT(EPOCH FROM NOW() AT TIME ZONE 'utc') * 1000000000 - %[6]d)/ %[4]d) * %[4]d / 1000000000) AT TIME ZONE 'utc' ORDER BY %[3]s ASC LIMIT 1) - ) as xid) - SELECT selected.xid, - COALESCE((SELECT %[5]s FROM %[2]s WHERE %[1]s = selected.xid), (SELECT pg_current_snapshot())), - %[4]d - CAST(EXTRACT(EPOCH FROM NOW() AT TIME ZONE 'utc') * 1000000000 as bigint) %% %[4]d - FROM selected;` + WITH optimized AS + (SELECT %[1]s, %[5]s FROM %[2]s WHERE %[3]s >= TO_TIMESTAMP(FLOOR((EXTRACT(EPOCH FROM NOW() AT TIME ZONE 'utc') * 1000000000 - %[6]d)/ %[4]d) * %[4]d / 1000000000) AT TIME ZONE 'utc' ORDER BY %[3]s ASC LIMIT 1), + allRevisionsFromLastWindow AS (SELECT %[1]s, %[5]s FROM %[2]s WHERE %[3]s >= TO_TIMESTAMP(FLOOR((EXTRACT(EPOCH FROM (SELECT max(%[3]s) FROM %[2]s)) * 1000000000 - %[6]d)/ %[4]d) * %[4]d / 1000000000) AT TIME ZONE 'utc'), + validity AS (SELECT (%[4]d - CAST(EXTRACT(EPOCH FROM NOW() AT TIME ZONE 'utc') * 1000000000 as bigint) %% %[4]d) AS ts) + + SELECT %[1]s, %[5]s, validity.ts FROM optimized, validity + UNION ALL + SELECT %[1]s, %[5]s, validity.ts FROM allRevisionsFromLastWindow, validity + WHERE NOT EXISTS (SELECT 1 FROM optimized); + ` // queryValidTransaction will return a single row with three values: // 1) the transaction ID of the minimum valid (i.e. within the GC window) transaction @@ -69,24 +79,66 @@ const ( ) SELECT minvalid.%[1]s, minvalid.%[5]s, pg_current_snapshot() FROM minvalid;` - queryCurrentSnapshot = `SELECT pg_current_snapshot();` + queryCurrentSnapshot = `SELECT pg_current_snapshot();` + queryGenerateCheckpoint = `SELECT pg_current_xact_id(), pg_current_snapshot(), NOW() AT TIME ZONE 'utc';` queryCurrentTransactionID = `SELECT pg_current_xact_id()::text::integer;` queryLatestXID = `SELECT max(xid)::text::integer FROM relation_tuple_transaction;` ) func (pgd *pgDatastore) optimizedRevisionFunc(ctx context.Context) (datastore.Revision, time.Duration, error) { - var revision xid8 - var snapshot pgSnapshot + rows, err := pgd.readPool.Query(ctx, pgd.optimizedRevisionQuery) + if err != nil { + return datastore.NoRevision, 0, fmt.Errorf("faild to compute optimized revision: %w", err) + } + defer func() { + rows.Close() + }() + + var resultingPgRev []postgresRevision var validForNanos time.Duration - if err := pgd.readPool.QueryRow(ctx, pgd.optimizedRevisionQuery). - Scan(&revision, &snapshot, &validForNanos); err != nil { - return datastore.NoRevision, 0, fmt.Errorf(errRevision, err) + for rows.Next() { + var xid xid8 + var snapshot pgSnapshot + if err := rows.Scan(&xid, &snapshot, &validForNanos); err != nil { + return datastore.NoRevision, 0, fmt.Errorf("unable to decode candidate optimized revision: %w", err) + } + + resultingPgRev = append(resultingPgRev, postgresRevision{snapshot: snapshot, optionalTxID: xid}) } + if rows.Err() != nil { + return datastore.NoRevision, 0, fmt.Errorf("unable to compute optimized revision: %w", err) + } + + if len(resultingPgRev) == 0 { + return datastore.NoRevision, 0, spiceerrors.MustBugf("unexpected optimized revision query returnzed zero rows") + } else if len(resultingPgRev) == 1 { + resultingPgRev[0].snapshot = resultingPgRev[0].snapshot.markComplete(resultingPgRev[0].optionalTxID.Uint64) + return resultingPgRev[0], validForNanos, nil + } else if len(resultingPgRev) > 1 { + // if there are multiple rows, it means the query didn't find an eligible quantized revision + // in the window defined by the current time. In this case the query will return all revisions available in + // the last quantization window defined by the biggest timestamp in the transactions table. + // We will use all those transactions to forge a synthetic pgSnapshot that observes all transaction IDs + // in that window. + syntheticRev := postgresRevision{snapshot: pgSnapshot{xmin: uint64(math.MaxUint64), xmax: 0}} + for _, rev := range resultingPgRev { + if rev.snapshot.xmin < syntheticRev.snapshot.xmin { + syntheticRev.snapshot.xmin = rev.snapshot.xmin + } + if rev.snapshot.xmax > syntheticRev.snapshot.xmax { + syntheticRev.snapshot.xmax = rev.snapshot.xmax + } + } + + for _, rev := range resultingPgRev { + syntheticRev.snapshot = syntheticRev.snapshot.markComplete(rev.optionalTxID.Uint64) + } - snapshot = snapshot.markComplete(revision.Uint64) + return syntheticRev, validForNanos, nil + } - return postgresRevision{snapshot: snapshot, optionalTxID: revision}, validForNanos, nil + return datastore.NoRevision, 0, spiceerrors.MustBugf("unexpected optimized revision query result") } func (pgd *pgDatastore) HeadRevision(ctx context.Context) (datastore.Revision, error) { @@ -117,6 +169,30 @@ func (pgd *pgDatastore) getHeadRevision(ctx context.Context, querier common.Quer return &postgresRevision{snapshot: snapshot}, nil } +// generateCheckpoints creates a new transaction for the purpose of acting as a checkpoint high watermark. +// this calls pg_current_xact_id(), which consumes one transaction ID of the 64-bit space. +// For perspective, it would take 584.5 years to consume the space if we were able to generate 1 XID per nanosecond. +func (pgd *pgDatastore) generateCheckpoint(ctx context.Context, querier common.Querier) (*postgresRevision, error) { + var txID xid8 + var snapshot pgSnapshot + var timestamp time.Time + + if err := querier.QueryRow(ctx, queryGenerateCheckpoint).Scan(&txID, &snapshot, ×tamp); err != nil { + if errors.Is(err, pgx.ErrNoRows) { + return nil, nil + } + + return nil, fmt.Errorf(errRevision, err) + } + + safeTimestamp, err := safecast.ToUint64(timestamp.UnixNano()) + if err != nil { + return nil, fmt.Errorf(errRevision, err) + } + + return &postgresRevision{snapshot: snapshot, optionalTxID: txID, optionalNanosTimestamp: safeTimestamp}, nil +} + func (pgd *pgDatastore) CheckRevision(ctx context.Context, revisionRaw datastore.Revision) error { revision, ok := revisionRaw.(postgresRevision) if !ok { diff --git a/internal/datastore/postgres/snapshot.go b/internal/datastore/postgres/snapshot.go index 163bd1f63b..7d2f65658d 100644 --- a/internal/datastore/postgres/snapshot.go +++ b/internal/datastore/postgres/snapshot.go @@ -144,6 +144,21 @@ const ( concurrent ) +func (cr comparisonResult) String() string { + switch cr { + case equal: + return "=" + case lt: + return "<" + case gt: + return ">" + case concurrent: + return "~" + default: + return "?" + } +} + // compare will return whether we can definitely assert that one snapshot was // definitively created after, before, at the same time, or was executed // concurrent with another transaction. We assess this based on whether a @@ -151,6 +166,9 @@ const ( // of in-progress transactions. E.g. if one snapshot only sees txids 1 and 3 as // visible but another transaction sees 1-3 as visible, that transaction is // greater. +// example: +// 0:4:2 -> (1,3 visible) +// 0:4:2,3 -> (1 visible) func (s pgSnapshot) compare(rhs pgSnapshot) comparisonResult { rhsHasMoreInfo := rhs.anyTXVisible(s.xmax, s.xipList) lhsHasMoreInfo := s.anyTXVisible(rhs.xmax, rhs.xipList) diff --git a/internal/datastore/postgres/snapshot_test.go b/internal/datastore/postgres/snapshot_test.go index 5969e8af7f..0322533722 100644 --- a/internal/datastore/postgres/snapshot_test.go +++ b/internal/datastore/postgres/snapshot_test.go @@ -56,6 +56,7 @@ func TestMarkComplete(t *testing.T) { {snap(3, 5, 3), 3, snap(5, 5)}, {snap(0, 0), 5, snap(0, 6, 0, 1, 2, 3, 4)}, {snap(5, 5), 4, snap(5, 5)}, + {snap(3, 5, 4), 5, snap(4, 6, 4)}, } for _, tc := range testCases { @@ -68,6 +69,56 @@ func TestMarkComplete(t *testing.T) { } } +func TestVisible(t *testing.T) { + testCases := []struct { + snapshot pgSnapshot + txID uint64 + visible bool + }{ + {snap(840, 842, 840), 841, true}, + {snap(840, 842, 840), 840, false}, + {snap(840, 842, 840), 842, false}, + {snap(840, 842, 840), 839, true}, + } + + f := func(b bool) string { + if b { + return "visible" + } + return "not visible" + } + + for _, tc := range testCases { + tc := tc + + t.Run(fmt.Sprintf("%d %s %s", tc.txID, f(tc.visible)+" in", tc.snapshot), func(t *testing.T) { + require := require.New(t) + result := tc.snapshot.txVisible(tc.txID) + require.Equal(tc.visible, result, "expected %s but got %s", f(tc.visible), f(result)) + }) + } +} + +func TestCompare(t *testing.T) { + testCases := []struct { + snapshot pgSnapshot + compareWith pgSnapshot + result comparisonResult + }{ + {snap(0, 4, 2), snap(0, 4, 2, 3), gt}, + {snap(1, 4, 2), snap(1, 4, 3), concurrent}, + } + + for _, tc := range testCases { + tc := tc + t.Run(fmt.Sprintf("%s %s %s", tc.snapshot, tc.result, tc.compareWith), func(t *testing.T) { + require := require.New(t) + result := tc.snapshot.compare(tc.compareWith) + require.Equal(tc.result, result, "expected %s got %s", tc.result, result) + }) + } +} + func TestMarkInProgress(t *testing.T) { testCases := []struct { snapshot pgSnapshot diff --git a/internal/datastore/postgres/watch.go b/internal/datastore/postgres/watch.go index 94bf2a8f12..994012c8fc 100644 --- a/internal/datastore/postgres/watch.go +++ b/internal/datastore/postgres/watch.go @@ -125,7 +125,7 @@ func (pgd *pgDatastore) Watch( currentTxn := afterRevision requestedCheckpoints := options.Content&datastore.WatchCheckpoints == datastore.WatchCheckpoints for { - newTxns, optionalHeadRevision, err := pgd.getNewRevisions(ctx, currentTxn, requestedCheckpoints) + newTxns, snapshotForHighWatermark, err := pgd.getNewRevisions(ctx, currentTxn, requestedCheckpoints) if err != nil { if errors.Is(ctx.Err(), context.Canceled) { errs <- datastore.NewWatchCanceledErr() @@ -187,29 +187,55 @@ func (pgd *pgDatastore) Watch( } } } else { - // PG head revision could move outside of changes (e.g. VACUUM ANALYZE), and we still need to - // send a checkpoint to the client, as could have determined also via Head that changes exist and - // called Watch API + // A new transaction could have been generated outside of application changes (e.g. VACUUM ANALYZE, + // spicedb postgres shared with other applications). We want to make sure the Watch API reflects this + // by sending a checkpoint to the client. // - // we need to compute the head revision in the same transaction where we determine any new spicedb, - // transactions, otherwise there could be a race that means we issue a checkpoint before we issue - // the corresponding changes. + // Using pg_current_snapshot() gives us the next candidate snapshot, but we can't send it as a + // checkpoint, since the next transaction that is executed will get evaluated at that snapshot, and + // would in turn lead to a sequence of events via Watch API that is incorrect (i.e. checkpoint at rev X, + // then change revision at rev X). + // + // We could determine the last committed transaction using pg_last_committed_xact(), but we wouldn't + // be able to determine that snapshot at which it was evaluated. + // + // We solve it by creating an actual new transaction using pg_current_xact_id(). The tradeoff is + // it consumes a transaction of the 2^64 XID8 transaction space. Given this only happens when + // there are no new transactions, and that this loop executes on a predefined interval that, it + // would take _very long_ (billions of years) to exhaust it. See method pgDatastore.generateCheckpoint(). + // + // The computation of the new checkpoint needs to happen in the same transaction where we query + // if there are any new spicedb transactions, otherwise there could be a race that means we + // issue a checkpoint before we issue the corresponding changes. if requestedCheckpoints { - if optionalHeadRevision == nil { + if snapshotForHighWatermark == nil { errs <- spiceerrors.MustBugf("expected to have an optional head revision") return } - if optionalHeadRevision.GreaterThan(currentTxn) { + snapshotForHighWatermark.snapshot = snapshotForHighWatermark.snapshot.markComplete(snapshotForHighWatermark.optionalTxID.Uint64) + // we expect the new checkpoint to be bigger or equal than the last observed transaction. + // - equal: if the caller invoked afterRevision = HeadRevision, there would be no associated transaction, + // so once generateCheckpoint is invoked, that snapshot will be part of a transaction, + // making currentTxn == snapshotForHighWatermark + // - bigger: if the loop is executed again, a new transaction will be generated, and currentTxn will + // no longer be pointing at HeadRevision, but instead at the last committed transaction observed. + if snapshotForHighWatermark.LessThan(currentTxn) { + errs <- spiceerrors.MustBugf("expected the generate checkpoint to be greater than the current revision") + } + + // Do not emit a checkpoint at HeadRevision, as the expectation is Watch API starts emitting + // events after the revision provided. + if snapshotForHighWatermark.GreaterThan(currentTxn) { if !sendChange(datastore.RevisionChanges{ - Revision: *optionalHeadRevision, + Revision: *snapshotForHighWatermark, IsCheckpoint: true, }) { return } - - currentTxn = *optionalHeadRevision } + + currentTxn = *snapshotForHighWatermark } select { @@ -226,18 +252,10 @@ func (pgd *pgDatastore) Watch( return updates, errs } -func (pgd *pgDatastore) getNewRevisions(ctx context.Context, afterTX postgresRevision, returnHeadRevision bool) ([]postgresRevision, *postgresRevision, error) { +func (pgd *pgDatastore) getNewRevisions(ctx context.Context, afterTX postgresRevision, retrieveNewHighWatermark bool) ([]postgresRevision, *postgresRevision, error) { var ids []postgresRevision - var optionalHeadRevision *postgresRevision - var err error + var newCheckpoint *postgresRevision if err := pgx.BeginTxFunc(ctx, pgd.readPool, pgx.TxOptions{IsoLevel: pgx.RepeatableRead}, func(tx pgx.Tx) error { - if returnHeadRevision { - optionalHeadRevision, err = pgd.getHeadRevision(ctx, tx) - if err != nil { - return fmt.Errorf("unable to get head revision: %w", err) - } - } - rows, err := tx.Query(ctx, newRevisionsQuery, afterTX.snapshot) if err != nil { return fmt.Errorf("unable to load new revisions: %w", err) @@ -268,12 +286,24 @@ func (pgd *pgDatastore) getNewRevisions(ctx context.Context, afterTX postgresRev if rows.Err() != nil { return fmt.Errorf("unable to load new revisions: %w", err) } + + // there are no new transactions - generate a checkpoint revision if requested + // it would take more than 5 billion years to exhaust this at a rate of 1 transaction ID per second, + // and this will be called only if no other new transactions already existed, and if checkpoints were requested + if len(ids) == 0 && retrieveNewHighWatermark { + if retrieveNewHighWatermark { + newCheckpoint, err = pgd.generateCheckpoint(ctx, tx) + if err != nil { + return fmt.Errorf("unable to get head revision: %w", err) + } + } + } return nil }); err != nil { - return nil, optionalHeadRevision, fmt.Errorf("transaction error: %w", err) + return nil, newCheckpoint, fmt.Errorf("transaction error: %w", err) } - return ids, optionalHeadRevision, nil + return ids, newCheckpoint, nil } func (pgd *pgDatastore) loadChanges(ctx context.Context, revisions []postgresRevision, options datastore.WatchOptions) ([]datastore.RevisionChanges, error) { diff --git a/pkg/datastore/test/datastore.go b/pkg/datastore/test/datastore.go index 034e265a43..ae4a1692dd 100644 --- a/pkg/datastore/test/datastore.go +++ b/pkg/datastore/test/datastore.go @@ -197,6 +197,7 @@ func AllWithExceptions(t *testing.T, tester DatastoreTester, except Categories, if !except.Watch() && !except.WatchCheckpoints() { t.Run("TestWatchCheckpoints", runner(tester, WatchCheckpointsTest)) + t.Run("TestWatchContinuousCheckpoints", runner(tester, WatchContinuousCheckpointsTest)) } t.Run("TestRelationshipCounters", runner(tester, RelationshipCountersTest)) diff --git a/pkg/datastore/test/watch.go b/pkg/datastore/test/watch.go index e01d74f29d..4321614db0 100644 --- a/pkg/datastore/test/watch.go +++ b/pkg/datastore/test/watch.go @@ -780,6 +780,36 @@ func verifyMixedUpdates( require.False(expectDisconnect, "all changes verified without expected disconnect") } +func WatchContinuousCheckpointsTest(t *testing.T, tester DatastoreTester) { + require := require.New(t) + + ds, err := tester.New(0, veryLargeGCInterval, veryLargeGCWindow, 16) + require.NoError(err) + + ctx := context.Background() + features, err := ds.Features(ctx) + require.NoError(err) + if features.ContinuousCheckpointing.Status == datastore.FeatureUnsupported { + t.Skip("unsupported continuous checkpoints") + } + + setupDatastore(ds, require) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + head, err := ds.HeadRevision(ctx) + require.NoError(err) + + changes, errchan := ds.Watch(ctx, head, datastore.WatchOptions{ + Content: datastore.WatchCheckpoints, + CheckpointInterval: 100 * time.Millisecond, + }) + require.Zero(len(errchan)) + + verifyCheckpointAfter(t, head, changes) +} + func WatchCheckpointsTest(t *testing.T, tester DatastoreTester) { require := require.New(t) @@ -805,14 +835,14 @@ func WatchCheckpointsTest(t *testing.T, tester DatastoreTester) { ) require.NoError(err) - verifyCheckpointUpdate(require, afterTouchRevision, changes) + verifyCheckpointUpdate(t, afterTouchRevision, changes) afterTouchRevision, err = ds.ReadWriteTx(ctx, func(ctx context.Context, rwt datastore.ReadWriteTransaction) error { return rwt.WriteNamespaces(ctx, &core.NamespaceDefinition{Name: "doesnotexist"}) }) require.NoError(err) - verifyCheckpointUpdate(require, afterTouchRevision, changes) + verifyCheckpointUpdate(t, afterTouchRevision, changes) } func WatchEmissionStrategyTest(t *testing.T, tester DatastoreTester) { @@ -909,16 +939,18 @@ func WatchEmissionStrategyTest(t *testing.T, tester DatastoreTester) { } func verifyCheckpointUpdate( - require *require.Assertions, + t *testing.T, expectedRevision datastore.Revision, changes <-chan datastore.RevisionChanges, ) { + t.Helper() + var relChangeEmitted, schemaChangeEmitted bool changeWait := time.NewTimer(waitForChangesTimeout) for { select { case change, ok := <-changes: - require.True(ok) + require.True(t, ok) if len(change.ChangedDefinitions) > 0 { schemaChangeEmitted = true } @@ -927,14 +959,37 @@ func verifyCheckpointUpdate( } if change.IsCheckpoint { if change.Revision.Equal(expectedRevision) || change.Revision.GreaterThan(expectedRevision) { - require.True(relChangeEmitted || schemaChangeEmitted, "expected relationship/schema changes before checkpoint") + require.True(t, relChangeEmitted || schemaChangeEmitted, "expected relationship/schema changes before checkpoint") return } // we received a past revision checkpoint, ignore } case <-changeWait.C: - require.Fail("Timed out", "waited for checkpoint") + require.Fail(t, "Timed out", "waited for checkpoint") + } + } +} + +func verifyCheckpointAfter( + t *testing.T, + pastRevision datastore.Revision, + changes <-chan datastore.RevisionChanges, +) { + t.Helper() + + changeWait := time.NewTimer(waitForChangesTimeout) + for { + select { + case change, ok := <-changes: + require.True(t, ok) + require.Zero(t, change.ChangedDefinitions) + require.Zero(t, change.RelationshipChanges) + require.True(t, change.IsCheckpoint) + require.True(t, change.Revision.GreaterThan(pastRevision)) + return + case <-changeWait.C: + require.Fail(t, "Timed out", "waited for checkpoint") } } }