Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make postgres a continuously checkpointing datastore #2247

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ RUN --mount=type=cache,target=/root/.cache/go-build --mount=type=cache,target=/g
FROM golang:1.23.6-alpine3.20 AS health-probe-builder
WORKDIR /go/src/app
RUN apk update && apk add --no-cache git
RUN git clone https://github.com/authzed/grpc-health-probe.git
RUN git clone https://github.com/grpc-ecosystem/grpc-health-probe.git
WORKDIR /go/src/app/grpc-health-probe
RUN git checkout master
RUN git checkout 86f0bc2dea67fda67e9a060689bc8bb7a19ebe44
RUN CGO_ENABLED=0 go install -a -tags netgo -ldflags=-w

FROM cgr.dev/chainguard/static:latest
Expand Down
4 changes: 2 additions & 2 deletions Dockerfile.release
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ ARG BASE=cgr.dev/chainguard/static:latest
FROM golang:1.23.6-alpine3.20 AS health-probe-builder
WORKDIR /go/src/app
RUN apk update && apk add --no-cache git
RUN git clone https://github.com/authzed/grpc-health-probe.git
RUN git clone https://github.com/grpc-ecosystem/grpc-health-probe.git
WORKDIR /go/src/app/grpc-health-probe
RUN git checkout master
RUN git checkout 86f0bc2dea67fda67e9a060689bc8bb7a19ebe44
RUN CGO_ENABLED=0 go install -a -tags netgo -ldflags=-w

FROM $BASE
Expand Down
2 changes: 1 addition & 1 deletion internal/datastore/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -721,7 +721,7 @@ func (pgd *pgDatastore) OfflineFeatures() (*datastore.Features, error) {
Status: datastore.FeatureUnsupported,
},
ContinuousCheckpointing: datastore.Feature{
Status: datastore.FeatureUnsupported,
Status: datastore.FeatureSupported,
},
WatchEmitsImmediately: datastore.Feature{
Status: datastore.FeatureUnsupported,
Expand Down
151 changes: 123 additions & 28 deletions internal/datastore/postgres/postgres_shared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,10 @@ func testPostgresDatastore(t *testing.T, config postgresTestConfig) {
QuantizedRevisionTest(t, b)
})

t.Run("OverlappingRevision", func(t *testing.T) {
OverlappingRevisionTest(t, b)
})

t.Run("WatchNotEnabled", func(t *testing.T) {
WatchNotEnabledTest(t, b, config.pgVersion)
})
Expand Down Expand Up @@ -778,33 +782,47 @@ func QuantizedRevisionTest(t *testing.T, b testdatastore.RunningEngineForTest) {
[]time.Duration{-2 * time.Second},
1, 0,
},
{
"OldestInWindowIsSelected",
1 * time.Second,
0,
[]time.Duration{1 * time.Millisecond, 2 * time.Millisecond},
1, 1,
},
{
"ShouldObservePreviousAndCurrent",
1 * time.Second,
0,
[]time.Duration{-1 * time.Second, 0},
2, 0,
},
{
"OnlyFutureRevisions",
1 * time.Second,
0,
[]time.Duration{2 * time.Second},
0, 1,
1, 0,
},
{
"QuantizedLower",
2 * time.Second,
0,
[]time.Duration{-4 * time.Second, -1 * time.Nanosecond, 0},
1, 2,
2, 1,
},
{
"QuantizedRecentWithFollowerReadDelay",
500 * time.Millisecond,
2 * time.Second,
[]time.Duration{-4 * time.Second, -2 * time.Second, 0},
1, 2,
2, 1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this because -2s lands exactly on a 500ms window? Maybe we can update this test to be clearer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's because there was a bug in the original test: it wasn't calling pgSnapshot.complete() like the actual optimizeRevisionFunc code was doing. So even though it was returning the revision at -2 as the optimized one, it wasn't observing its own transaction, and that led to only observing 1 revision. The test was wrong from the beginning. The other test cases were affected as well.

},
{
"QuantizedRecentWithoutFollowerReadDelay",
500 * time.Millisecond,
0,
[]time.Duration{-4 * time.Second, -2 * time.Second, 0},
2, 1,
3, 0,
},
{
"QuantizationDisabled",
Expand Down Expand Up @@ -833,7 +851,7 @@ func QuantizedRevisionTest(t *testing.T, b testdatastore.RunningEngineForTest) {
ctx,
uri,
primaryInstanceID,
RevisionQuantization(5*time.Second),
RevisionQuantization(tc.quantization),
GCWindow(24*time.Hour),
WatchBufferLength(1),
FollowerReadDelay(tc.followerReadDelay),
Expand Down Expand Up @@ -865,37 +883,114 @@ func QuantizedRevisionTest(t *testing.T, b testdatastore.RunningEngineForTest) {
}
}

queryRevision := fmt.Sprintf(
querySelectRevision,
colXID,
tableTransaction,
colTimestamp,
tc.quantization.Nanoseconds(),
colSnapshot,
tc.followerReadDelay.Nanoseconds(),
)

var revision xid8
var snapshot pgSnapshot
var validFor time.Duration
err = conn.QueryRow(ctx, queryRevision).Scan(&revision, &snapshot, &validFor)
assertRevisionLowerAndHigher(ctx, t, ds, conn, tc.numLower, tc.numHigher)
})
}
}

func OverlappingRevisionTest(t *testing.T, b testdatastore.RunningEngineForTest) {
testCases := []struct {
testName string
quantization time.Duration
followerReadDelay time.Duration
revisions []postgresRevision
numLower uint64
numHigher uint64
}{
{ // the two revisions are concurrent, and given they are past the quantization window (are way in the past, around unix epoch)
// the function should return a revision snapshot that captures both of them
"ConcurrentRevisions",
5 * time.Second,
0,
[]postgresRevision{
{optionalTxID: newXid8(3), snapshot: pgSnapshot{xmin: 1, xmax: 4, xipList: []uint64{2}}, optionalNanosTimestamp: uint64((time.Second * 1) * time.Nanosecond)},
{optionalTxID: newXid8(2), snapshot: pgSnapshot{xmin: 1, xmax: 4, xipList: []uint64{3}}, optionalNanosTimestamp: uint64((time.Second * 2) * time.Nanosecond)},
},
2, 0,
},
}

for _, tc := range testCases {
t.Run(tc.testName, func(t *testing.T) {
require := require.New(t)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

var conn *pgx.Conn
ds := b.NewDatastore(t, func(engine, uri string) datastore.Datastore {
var err error
conn, err = pgx.Connect(ctx, uri)
require.NoError(err)

RegisterTypes(conn.TypeMap())

ds, err := newPostgresDatastore(
ctx,
uri,
primaryInstanceID,
RevisionQuantization(tc.quantization),
GCWindow(24*time.Hour),
WatchBufferLength(1),
FollowerReadDelay(tc.followerReadDelay),
)
require.NoError(err)

return ds
})
defer ds.Close()

// set a random time zone to ensure the queries are unaffected by tz
_, err := conn.Exec(ctx, fmt.Sprintf("SET TIME ZONE -%d", rand.Intn(8)+1))
require.NoError(err)

queryFmt := "SELECT COUNT(%[1]s) FROM %[2]s WHERE pg_visible_in_snapshot(%[1]s, $1) = %[3]s;"
numLowerQuery := fmt.Sprintf(queryFmt, colXID, tableTransaction, "true")
numHigherQuery := fmt.Sprintf(queryFmt, colXID, tableTransaction, "false")
for _, rev := range tc.revisions {
stmt := sq.StatementBuilder.PlaceholderFormat(sq.Dollar)
insertTxn := stmt.Insert(tableTransaction).Columns(colXID, colSnapshot, colTimestamp)

ts := time.Unix(0, int64(rev.optionalNanosTimestamp))
sql, args, err := insertTxn.Values(rev.optionalTxID, rev.snapshot, ts).ToSql()
require.NoError(err)

var numLower, numHigher uint64
require.NoError(conn.QueryRow(ctx, numLowerQuery, snapshot).Scan(&numLower), "%s - %s", revision, snapshot)
require.NoError(conn.QueryRow(ctx, numHigherQuery, snapshot).Scan(&numHigher), "%s - %s", revision, snapshot)
_, err = conn.Exec(ctx, sql, args...)
require.NoError(err)
}

// Subtract one from numLower because of the artificially injected first transaction row
require.Equal(tc.numLower, numLower-1)
require.Equal(tc.numHigher, numHigher)
assertRevisionLowerAndHigher(ctx, t, ds, conn, tc.numLower, tc.numHigher)
})
}
}

func assertRevisionLowerAndHigher(ctx context.Context, t *testing.T, ds datastore.Datastore, conn *pgx.Conn,
expectedNumLower, expectedNumHigher uint64,
) {
t.Helper()

var revision xid8
var snapshot pgSnapshot
pgDS, ok := ds.(*pgDatastore)
require.True(t, ok)
rev, _, err := pgDS.optimizedRevisionFunc(ctx)
require.NoError(t, err)

pgRev, ok := rev.(postgresRevision)
require.True(t, ok)
revision = pgRev.optionalTxID
require.NotNil(t, revision)
snapshot = pgRev.snapshot

queryFmt := "SELECT COUNT(%[1]s) FROM %[2]s WHERE pg_visible_in_snapshot(%[1]s, $1) = %[3]s;"
numLowerQuery := fmt.Sprintf(queryFmt, colXID, tableTransaction, "true")
numHigherQuery := fmt.Sprintf(queryFmt, colXID, tableTransaction, "false")

var numLower, numHigher uint64
require.NoError(t, conn.QueryRow(ctx, numLowerQuery, snapshot).Scan(&numLower), "%s - %s", revision, snapshot)
require.NoError(t, conn.QueryRow(ctx, numHigherQuery, snapshot).Scan(&numHigher), "%s - %s", revision, snapshot)

// Subtract one from numLower because of the artificially injected first transaction row
require.Equal(t, expectedNumLower, numLower-1, "incorrect number of revisions visible to snapshot, expected %d, got %d", expectedNumLower, numLower-1)
require.Equal(t, expectedNumHigher, numHigher, "incorrect number of revisions invisible to snapshot, expected %d, got %d", expectedNumHigher, numHigher)
}

// ConcurrentRevisionHeadTest uses goroutines and channels to intentionally set up a pair of
// revisions that are concurrently applied and then ensures a call to HeadRevision reflects
// the changes found in *both* revisions.
Expand Down
110 changes: 93 additions & 17 deletions internal/datastore/postgres/revisions.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/base64"
"errors"
"fmt"
"math"
"strconv"
"strings"
"time"
Expand All @@ -26,8 +27,14 @@ const (
// querySelectRevision will round the database's timestamp down to the nearest
// quantization period, and then find the first transaction (and its active xmin)
// after that. If there are no transactions newer than the quantization period,
// it just picks the latest transaction. It will also return the amount of
// nanoseconds until the next optimized revision would be selected server-side,
// it will return all the transaction IDs and snapshots available in the last quantization window, so that
// the application derives a snapshot that includes all transactions in the last window, and thus guarantee
// all revisions from the previous transaction are observed.
//
// It avoids determining the high-watermark snapshot using pg_current_snapshot(), as it may move out of band
// (VACUUM, other workloads in the same database), which causes problems to cache quantization.
//
// It will also return the amount of nanoseconds until the next optimized revision would be selected server-side,
// for use with caching.
//
// %[1] Name of xid column
Expand All @@ -37,13 +44,16 @@ const (
// %[5] Name of snapshot column
// %[6] Follower read delay (in nanoseconds)
querySelectRevision = `
WITH selected AS (SELECT (
(SELECT %[1]s FROM %[2]s WHERE %[3]s >= TO_TIMESTAMP(FLOOR((EXTRACT(EPOCH FROM NOW() AT TIME ZONE 'utc') * 1000000000 - %[6]d)/ %[4]d) * %[4]d / 1000000000) AT TIME ZONE 'utc' ORDER BY %[3]s ASC LIMIT 1)
) as xid)
SELECT selected.xid,
COALESCE((SELECT %[5]s FROM %[2]s WHERE %[1]s = selected.xid), (SELECT pg_current_snapshot())),
%[4]d - CAST(EXTRACT(EPOCH FROM NOW() AT TIME ZONE 'utc') * 1000000000 as bigint) %% %[4]d
FROM selected;`
WITH optimized AS
(SELECT %[1]s, %[5]s FROM %[2]s WHERE %[3]s >= TO_TIMESTAMP(FLOOR((EXTRACT(EPOCH FROM NOW() AT TIME ZONE 'utc') * 1000000000 - %[6]d)/ %[4]d) * %[4]d / 1000000000) AT TIME ZONE 'utc' ORDER BY %[3]s ASC LIMIT 1),
allRevisionsFromLastWindow AS (SELECT %[1]s, %[5]s FROM %[2]s WHERE %[3]s >= TO_TIMESTAMP(FLOOR((EXTRACT(EPOCH FROM (SELECT max(%[3]s) FROM %[2]s)) * 1000000000 - %[6]d)/ %[4]d) * %[4]d / 1000000000) AT TIME ZONE 'utc'),
validity AS (SELECT (%[4]d - CAST(EXTRACT(EPOCH FROM NOW() AT TIME ZONE 'utc') * 1000000000 as bigint) %% %[4]d) AS ts)

SELECT %[1]s, %[5]s, validity.ts FROM optimized, validity
UNION ALL
SELECT %[1]s, %[5]s, validity.ts FROM allRevisionsFromLastWindow, validity
WHERE NOT EXISTS (SELECT 1 FROM optimized);
`

// queryValidTransaction will return a single row with three values:
// 1) the transaction ID of the minimum valid (i.e. within the GC window) transaction
Expand All @@ -69,24 +79,66 @@ const (
)
SELECT minvalid.%[1]s, minvalid.%[5]s, pg_current_snapshot() FROM minvalid;`

queryCurrentSnapshot = `SELECT pg_current_snapshot();`
queryCurrentSnapshot = `SELECT pg_current_snapshot();`
queryGenerateCheckpoint = `SELECT pg_current_xact_id(), pg_current_snapshot(), NOW() AT TIME ZONE 'utc';`

queryCurrentTransactionID = `SELECT pg_current_xact_id()::text::integer;`
queryLatestXID = `SELECT max(xid)::text::integer FROM relation_tuple_transaction;`
)

func (pgd *pgDatastore) optimizedRevisionFunc(ctx context.Context) (datastore.Revision, time.Duration, error) {
var revision xid8
var snapshot pgSnapshot
rows, err := pgd.readPool.Query(ctx, pgd.optimizedRevisionQuery)
if err != nil {
return datastore.NoRevision, 0, fmt.Errorf("faild to compute optimized revision: %w", err)
}
defer func() {
rows.Close()
}()

var resultingPgRev []postgresRevision
var validForNanos time.Duration
if err := pgd.readPool.QueryRow(ctx, pgd.optimizedRevisionQuery).
Scan(&revision, &snapshot, &validForNanos); err != nil {
return datastore.NoRevision, 0, fmt.Errorf(errRevision, err)
for rows.Next() {
var xid xid8
var snapshot pgSnapshot
if err := rows.Scan(&xid, &snapshot, &validForNanos); err != nil {
return datastore.NoRevision, 0, fmt.Errorf("unable to decode candidate optimized revision: %w", err)
}

resultingPgRev = append(resultingPgRev, postgresRevision{snapshot: snapshot, optionalTxID: xid})
}
if rows.Err() != nil {
return datastore.NoRevision, 0, fmt.Errorf("unable to compute optimized revision: %w", err)
}

if len(resultingPgRev) == 0 {
return datastore.NoRevision, 0, spiceerrors.MustBugf("unexpected optimized revision query returnzed zero rows")
} else if len(resultingPgRev) == 1 {
resultingPgRev[0].snapshot = resultingPgRev[0].snapshot.markComplete(resultingPgRev[0].optionalTxID.Uint64)
return resultingPgRev[0], validForNanos, nil
} else if len(resultingPgRev) > 1 {
// if there are multiple rows, it means the query didn't find an eligible quantized revision
// in the window defined by the current time. In this case the query will return all revisions available in
// the last quantization window defined by the biggest timestamp in the transactions table.
// We will use all those transactions to forge a synthetic pgSnapshot that observes all transaction IDs
// in that window.
syntheticRev := postgresRevision{snapshot: pgSnapshot{xmin: uint64(math.MaxUint64), xmax: 0}}
for _, rev := range resultingPgRev {
if rev.snapshot.xmin < syntheticRev.snapshot.xmin {
syntheticRev.snapshot.xmin = rev.snapshot.xmin
}
if rev.snapshot.xmax > syntheticRev.snapshot.xmax {
syntheticRev.snapshot.xmax = rev.snapshot.xmax
}
}

for _, rev := range resultingPgRev {
syntheticRev.snapshot = syntheticRev.snapshot.markComplete(rev.optionalTxID.Uint64)
}

snapshot = snapshot.markComplete(revision.Uint64)
return syntheticRev, validForNanos, nil
}

return postgresRevision{snapshot: snapshot, optionalTxID: revision}, validForNanos, nil
return datastore.NoRevision, 0, spiceerrors.MustBugf("unexpected optimized revision query result")
}

func (pgd *pgDatastore) HeadRevision(ctx context.Context) (datastore.Revision, error) {
Expand Down Expand Up @@ -117,6 +169,30 @@ func (pgd *pgDatastore) getHeadRevision(ctx context.Context, querier common.Quer
return &postgresRevision{snapshot: snapshot}, nil
}

// generateCheckpoints creates a new transaction for the purpose of acting as a checkpoint high watermark.
// this calls pg_current_xact_id(), which consumes one transaction ID of the 64-bit space.
// For perspective, it would take 584.5 years to consume the space if we were able to generate 1 XID per nanosecond.
func (pgd *pgDatastore) generateCheckpoint(ctx context.Context, querier common.Querier) (*postgresRevision, error) {
var txID xid8
var snapshot pgSnapshot
var timestamp time.Time

if err := querier.QueryRow(ctx, queryGenerateCheckpoint).Scan(&txID, &snapshot, &timestamp); err != nil {
if errors.Is(err, pgx.ErrNoRows) {
return nil, nil
}

return nil, fmt.Errorf(errRevision, err)
}

safeTimestamp, err := safecast.ToUint64(timestamp.UnixNano())
if err != nil {
return nil, fmt.Errorf(errRevision, err)
}

return &postgresRevision{snapshot: snapshot, optionalTxID: txID, optionalNanosTimestamp: safeTimestamp}, nil
}

func (pgd *pgDatastore) CheckRevision(ctx context.Context, revisionRaw datastore.Revision) error {
revision, ok := revisionRaw.(postgresRevision)
if !ok {
Expand Down
Loading
Loading