Skip to content

Commit

Permalink
make pg datastore continuously checkpoint using a revision heartbeat
Browse files Browse the repository at this point in the history
alternative implementation to
#2247
  • Loading branch information
vroldanbet committed Feb 26, 2025
1 parent 26bf35e commit 4d1c55c
Show file tree
Hide file tree
Showing 10 changed files with 221 additions and 78 deletions.
8 changes: 8 additions & 0 deletions internal/datastore/postgres/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type postgresOptions struct {
expirationDisabled bool
columnOptimizationOption common.ColumnOptimizationOption
includeQueryParametersInTraces bool
revisionHeartbeatEnabled bool

migrationPhase string
allowedMigrations []string
Expand Down Expand Up @@ -77,6 +78,7 @@ const (
defaultExpirationDisabled = false
// no follower delay by default, it should only be set if using read replicas
defaultFollowerReadDelay = 0
defaultRevisionHeartbeat = false
)

// Option provides the facility to configure how clients within the
Expand All @@ -103,6 +105,7 @@ func generateConfig(options []Option) (postgresOptions, error) {
includeQueryParametersInTraces: defaultIncludeQueryParametersInTraces,
expirationDisabled: defaultExpirationDisabled,
followerReadDelay: defaultFollowerReadDelay,
revisionHeartbeatEnabled: defaultRevisionHeartbeat,
}

for _, option := range options {
Expand Down Expand Up @@ -420,3 +423,8 @@ func WithColumnOptimization(isEnabled bool) Option {
func WithExpirationDisabled(isDisabled bool) Option {
return func(po *postgresOptions) { po.expirationDisabled = isDisabled }
}

// WithRevisionHeartbeat enables the revision heartbeat.
func WithRevisionHeartbeat(isEnabled bool) Option {
return func(po *postgresOptions) { po.revisionHeartbeatEnabled = isEnabled }
}
59 changes: 47 additions & 12 deletions internal/datastore/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,6 +311,15 @@ func newPostgresDatastore(
followerReadDelayNanos,
)

revisionHeartbeatQuery := fmt.Sprintf(
insertHeartBeatRevision,
colXID,
tableTransaction,
colTimestamp,
quantizationPeriodNanos,
colSnapshot,
)

validTransactionQuery := fmt.Sprintf(
queryValidTransaction,
colXID,
Expand Down Expand Up @@ -353,12 +362,13 @@ func newPostgresDatastore(
watchBufferWriteTimeout: config.watchBufferWriteTimeout,
optimizedRevisionQuery: revisionQuery,
validTransactionQuery: validTransactionQuery,
revisionHeartbeatQuery: revisionHeartbeatQuery,
gcWindow: config.gcWindow,
gcInterval: config.gcInterval,
gcTimeout: config.gcMaxOperationTime,
analyzeBeforeStatistics: config.analyzeBeforeStatistics,
watchEnabled: watchEnabled,
gcCtx: gcCtx,
workerCtx: gcCtx,
cancelGc: cancelGc,
readTxOptions: pgx.TxOptions{IsoLevel: pgx.RepeatableRead, AccessMode: pgx.ReadOnly},
maxRetries: config.maxRetries,
Expand All @@ -367,6 +377,7 @@ func newPostgresDatastore(
inStrictReadMode: config.readStrictMode,
filterMaximumIDCount: config.filterMaximumIDCount,
schema: *schema,
quantizationPeriodNanos: quantizationPeriodNanos,
}

if isPrimary && config.readStrictMode {
Expand All @@ -381,11 +392,17 @@ func newPostgresDatastore(

// Start a goroutine for garbage collection.
if isPrimary {
datastore.workerGroup, datastore.workerCtx = errgroup.WithContext(datastore.workerCtx)
if config.revisionHeartbeatEnabled {
datastore.workerGroup.Go(func() error {
return datastore.startRevisionHeartbeat(datastore.workerCtx)
})
}

if datastore.gcInterval > 0*time.Minute && config.gcEnabled {
datastore.gcGroup, datastore.gcCtx = errgroup.WithContext(datastore.gcCtx)
datastore.gcGroup.Go(func() error {
datastore.workerGroup.Go(func() error {
return common.StartGarbageCollector(
datastore.gcCtx,
datastore.workerCtx,
datastore,
datastore.gcInterval,
datastore.gcWindow,
Expand All @@ -410,6 +427,7 @@ type pgDatastore struct {
watchBufferWriteTimeout time.Duration
optimizedRevisionQuery string
validTransactionQuery string
revisionHeartbeatQuery string
gcWindow time.Duration
gcInterval time.Duration
gcTimeout time.Duration
Expand All @@ -424,11 +442,12 @@ type pgDatastore struct {

credentialsProvider datastore.CredentialsProvider

gcGroup *errgroup.Group
gcCtx context.Context
cancelGc context.CancelFunc
gcHasRun atomic.Bool
filterMaximumIDCount uint16
workerGroup *errgroup.Group
workerCtx context.Context
cancelGc context.CancelFunc
gcHasRun atomic.Bool
filterMaximumIDCount uint16
quantizationPeriodNanos int64
}

func (pgd *pgDatastore) IsStrictReadModeEnabled() bool {
Expand Down Expand Up @@ -651,8 +670,8 @@ func wrapError(err error) error {
func (pgd *pgDatastore) Close() error {
pgd.cancelGc()

if pgd.gcGroup != nil {
err := pgd.gcGroup.Wait()
if pgd.workerGroup != nil {
err := pgd.workerGroup.Wait()
log.Warn().Err(err).Msg("completed shutdown of postgres datastore")
}

Expand Down Expand Up @@ -721,7 +740,7 @@ func (pgd *pgDatastore) OfflineFeatures() (*datastore.Features, error) {
Status: datastore.FeatureUnsupported,
},
ContinuousCheckpointing: datastore.Feature{
Status: datastore.FeatureUnsupported,
Status: datastore.FeatureSupported,
},
WatchEmitsImmediately: datastore.Feature{
Status: datastore.FeatureUnsupported,
Expand All @@ -742,6 +761,22 @@ func (pgd *pgDatastore) OfflineFeatures() (*datastore.Features, error) {
}, nil
}

func (pgd *pgDatastore) startRevisionHeartbeat(ctx context.Context) error {
tick := time.NewTicker(time.Nanosecond * time.Duration(pgd.quantizationPeriodNanos))

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-tick.C:
_, err := pgd.writePool.Exec(ctx, pgd.revisionHeartbeatQuery)
if err != nil {
log.Warn().Err(err).Msg("failed to write heartbeat revision")
}
}
}
}

func buildLivingObjectFilterForRevision(revision postgresRevision) queryFilterer {
createdBeforeTXN := sq.Expr(fmt.Sprintf(
snapshotAlive,
Expand Down
151 changes: 123 additions & 28 deletions internal/datastore/postgres/postgres_shared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ func testPostgresDatastore(t *testing.T, config postgresTestConfig) {
QuantizedRevisionTest(t, b)
})

t.Run("OverlappingRevision", func(t *testing.T) {
OverlappingRevisionTest(t, b)
})

t.Run("WatchNotEnabled", func(t *testing.T) {
WatchNotEnabledTest(t, b, config.pgVersion)
})
Expand Down Expand Up @@ -778,33 +782,47 @@ func QuantizedRevisionTest(t *testing.T, b testdatastore.RunningEngineForTest) {
[]time.Duration{-2 * time.Second},
1, 0,
},
{
"OldestInWindowIsSelected",
1 * time.Second,
0,
[]time.Duration{1 * time.Millisecond, 2 * time.Millisecond},
1, 1,
},
{
"ShouldObservePreviousAndCurrent",
1 * time.Second,
0,
[]time.Duration{-1 * time.Second, 0},
2, 0,
},
{
"OnlyFutureRevisions",
1 * time.Second,
0,
[]time.Duration{2 * time.Second},
0, 1,
1, 0,
},
{
"QuantizedLower",
2 * time.Second,
0,
[]time.Duration{-4 * time.Second, -1 * time.Nanosecond, 0},
1, 2,
2, 1,
},
{
"QuantizedRecentWithFollowerReadDelay",
500 * time.Millisecond,
2 * time.Second,
[]time.Duration{-4 * time.Second, -2 * time.Second, 0},
1, 2,
2, 1,
},
{
"QuantizedRecentWithoutFollowerReadDelay",
500 * time.Millisecond,
0,
[]time.Duration{-4 * time.Second, -2 * time.Second, 0},
2, 1,
3, 0,
},
{
"QuantizationDisabled",
Expand Down Expand Up @@ -833,7 +851,7 @@ func QuantizedRevisionTest(t *testing.T, b testdatastore.RunningEngineForTest) {
ctx,
uri,
primaryInstanceID,
RevisionQuantization(5*time.Second),
RevisionQuantization(tc.quantization),
GCWindow(24*time.Hour),
WatchBufferLength(1),
FollowerReadDelay(tc.followerReadDelay),
Expand Down Expand Up @@ -865,37 +883,114 @@ func QuantizedRevisionTest(t *testing.T, b testdatastore.RunningEngineForTest) {
}
}

queryRevision := fmt.Sprintf(
querySelectRevision,
colXID,
tableTransaction,
colTimestamp,
tc.quantization.Nanoseconds(),
colSnapshot,
tc.followerReadDelay.Nanoseconds(),
)

var revision xid8
var snapshot pgSnapshot
var validFor time.Duration
err = conn.QueryRow(ctx, queryRevision).Scan(&revision, &snapshot, &validFor)
assertRevisionLowerAndHigher(ctx, t, ds, conn, tc.numLower, tc.numHigher)
})
}
}

func OverlappingRevisionTest(t *testing.T, b testdatastore.RunningEngineForTest) {
testCases := []struct {
testName string
quantization time.Duration
followerReadDelay time.Duration
revisions []postgresRevision
numLower uint64
numHigher uint64
}{
{ // the two revisions are concurrent, and given they are past the quantization window (are way in the past, around unix epoch)
// the function should return a revision snapshot that captures both of them
"ConcurrentRevisions",
5 * time.Second,
0,
[]postgresRevision{
{optionalTxID: newXid8(3), snapshot: pgSnapshot{xmin: 1, xmax: 4, xipList: []uint64{2}}, optionalNanosTimestamp: uint64((time.Second * 1) * time.Nanosecond)},
{optionalTxID: newXid8(2), snapshot: pgSnapshot{xmin: 1, xmax: 4, xipList: []uint64{3}}, optionalNanosTimestamp: uint64((time.Second * 2) * time.Nanosecond)},
},
2, 0,
},
}

for _, tc := range testCases {
t.Run(tc.testName, func(t *testing.T) {
require := require.New(t)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

var conn *pgx.Conn
ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore {
var err error
conn, err = pgx.Connect(ctx, uri)
require.NoError(err)

RegisterTypes(conn.TypeMap())

ds, err := newPostgresDatastore(
ctx,
uri,
primaryInstanceID,
RevisionQuantization(tc.quantization),
GCWindow(24*time.Hour),
WatchBufferLength(1),
FollowerReadDelay(tc.followerReadDelay),
)
require.NoError(err)

return ds
})
defer ds.Close()

// set a random time zone to ensure the queries are unaffected by tz
_, err := conn.Exec(ctx, fmt.Sprintf("SET TIME ZONE -%d", rand.Intn(8)+1))
require.NoError(err)

queryFmt := "SELECT COUNT(%[1]s) FROM %[2]s WHERE pg_visible_in_snapshot(%[1]s, $1) = %[3]s;"
numLowerQuery := fmt.Sprintf(queryFmt, colXID, tableTransaction, "true")
numHigherQuery := fmt.Sprintf(queryFmt, colXID, tableTransaction, "false")
for _, rev := range tc.revisions {
stmt := sq.StatementBuilder.PlaceholderFormat(sq.Dollar)
insertTxn := stmt.Insert(tableTransaction).Columns(colXID, colSnapshot, colTimestamp)

ts := time.Unix(0, int64(rev.optionalNanosTimestamp))
sql, args, err := insertTxn.Values(rev.optionalTxID, rev.snapshot, ts).ToSql()
require.NoError(err)

var numLower, numHigher uint64
require.NoError(conn.QueryRow(ctx, numLowerQuery, snapshot).Scan(&numLower), "%s - %s", revision, snapshot)
require.NoError(conn.QueryRow(ctx, numHigherQuery, snapshot).Scan(&numHigher), "%s - %s", revision, snapshot)
_, err = conn.Exec(ctx, sql, args...)
require.NoError(err)
}

// Subtract one from numLower because of the artificially injected first transaction row
require.Equal(tc.numLower, numLower-1)
require.Equal(tc.numHigher, numHigher)
assertRevisionLowerAndHigher(ctx, t, ds, conn, tc.numLower, tc.numHigher)
})
}
}

func assertRevisionLowerAndHigher(ctx context.Context, t *testing.T, ds datastore.Datastore, conn *pgx.Conn,
expectedNumLower, expectedNumHigher uint64,
) {
t.Helper()

var revision xid8
var snapshot pgSnapshot
pgDS, ok := ds.(*pgDatastore)
require.True(t, ok)
rev, _, err := pgDS.optimizedRevisionFunc(ctx)
require.NoError(t, err)

pgRev, ok := rev.(postgresRevision)
require.True(t, ok)
revision = pgRev.optionalTxID
require.NotNil(t, revision)
snapshot = pgRev.snapshot

queryFmt := "SELECT COUNT(%[1]s) FROM %[2]s WHERE pg_visible_in_snapshot(%[1]s, $1) = %[3]s;"
numLowerQuery := fmt.Sprintf(queryFmt, colXID, tableTransaction, "true")
numHigherQuery := fmt.Sprintf(queryFmt, colXID, tableTransaction, "false")

var numLower, numHigher uint64
require.NoError(t, conn.QueryRow(ctx, numLowerQuery, snapshot).Scan(&numLower), "%s - %s", revision, snapshot)
require.NoError(t, conn.QueryRow(ctx, numHigherQuery, snapshot).Scan(&numHigher), "%s - %s", revision, snapshot)

// Subtract one from numLower because of the artificially injected first transaction row
require.Equal(t, expectedNumLower, numLower-1, "incorrect number of revisions visible to snapshot, expected %d, got %d", expectedNumLower, numLower-1)
require.Equal(t, expectedNumHigher, numHigher, "incorrect number of revisions invisible to snapshot, expected %d, got %d", expectedNumHigher, numHigher)
}

// ConcurrentRevisionHeadTest uses goroutines and channels to intentionally set up a pair of
// revisions that are concurrently applied and then ensures a call to HeadRevision reflects
// the changes found in *both* revisions.
Expand Down
17 changes: 17 additions & 0 deletions internal/datastore/postgres/revisions.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,23 @@ const (
errCheckRevision = "unable to check revision: %w"
errRevisionFormat = "invalid revision format: %w"

// %[1] Name of xid column
// %[2] Relationship tuple transaction table
// %[3] Name of timestamp column
// %[4] Quantization period (in nanoseconds)
// %[5] Name of snapshot column
insertHeartBeatRevision = `
WITH quantization_window AS (
SELECT TO_TIMESTAMP(FLOOR((EXTRACT(EPOCH FROM NOW() AT TIME ZONE 'utc') * 1000000000)/ %[4]d) * %[4]d / 1000000000) AT TIME ZONE 'utc' AS ts
)
INSERT INTO %[2]s (%[1]s, %[5]s)
SELECT pg_current_xact_id(), pg_current_snapshot()
WHERE NOT EXISTS (
SELECT 1
FROM %[2]s rtt
WHERE rtt.%[3]s >= (SELECT ts FROM quantization_window)
);`

// querySelectRevision will round the database's timestamp down to the nearest
// quantization period, and then find the first transaction (and its active xmin)
// after that. If there are no transactions newer than the quantization period,
Expand Down
Loading

0 comments on commit 4d1c55c

Please sign in to comment.