Skip to content

Commit

Permalink
Merge branch 'main' into mage-test-coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
kartikaysaxena authored Feb 27, 2025
2 parents f05ec5a + ee35ee1 commit fce741b
Show file tree
Hide file tree
Showing 13 changed files with 341 additions and 116 deletions.
3 changes: 3 additions & 0 deletions internal/datastore/postgres/locks.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ type lockID uint32
const (
// gcRunLock is the lock ID for the garbage collection run.
gcRunLock lockID = 1

// revisionHeartbeatLock is the lock ID for the leader that will generate the heartbeat revisions.
revisionHeartbeatLock lockID = 2
)

func (pgd *pgDatastore) tryAcquireLock(ctx context.Context, lockID lockID) (bool, error) {
Expand Down
8 changes: 8 additions & 0 deletions internal/datastore/postgres/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type postgresOptions struct {
expirationDisabled bool
columnOptimizationOption common.ColumnOptimizationOption
includeQueryParametersInTraces bool
revisionHeartbeatEnabled bool

migrationPhase string
allowedMigrations []string
Expand Down Expand Up @@ -77,6 +78,7 @@ const (
defaultExpirationDisabled = false
// no follower delay by default, it should only be set if using read replicas
defaultFollowerReadDelay = 0
defaultRevisionHeartbeat = false
)

// Option provides the facility to configure how clients within the
Expand All @@ -103,6 +105,7 @@ func generateConfig(options []Option) (postgresOptions, error) {
includeQueryParametersInTraces: defaultIncludeQueryParametersInTraces,
expirationDisabled: defaultExpirationDisabled,
followerReadDelay: defaultFollowerReadDelay,
revisionHeartbeatEnabled: defaultRevisionHeartbeat,
}

for _, option := range options {
Expand Down Expand Up @@ -420,3 +423,8 @@ func WithColumnOptimization(isEnabled bool) Option {
func WithExpirationDisabled(isDisabled bool) Option {
return func(po *postgresOptions) { po.expirationDisabled = isDisabled }
}

// WithRevisionHeartbeat enables the revision heartbeat.
func WithRevisionHeartbeat(isEnabled bool) Option {
return func(po *postgresOptions) { po.revisionHeartbeatEnabled = isEnabled }
}
91 changes: 79 additions & 12 deletions internal/datastore/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
dbsql "database/sql"
"errors"
"fmt"
"math/rand/v2"
"os"
"strconv"
"strings"
Expand Down Expand Up @@ -311,6 +312,15 @@ func newPostgresDatastore(
followerReadDelayNanos,
)

revisionHeartbeatQuery := fmt.Sprintf(
insertHeartBeatRevision,
colXID,
tableTransaction,
colTimestamp,
quantizationPeriodNanos,
colSnapshot,
)

validTransactionQuery := fmt.Sprintf(
queryValidTransaction,
colXID,
Expand Down Expand Up @@ -353,12 +363,13 @@ func newPostgresDatastore(
watchBufferWriteTimeout: config.watchBufferWriteTimeout,
optimizedRevisionQuery: revisionQuery,
validTransactionQuery: validTransactionQuery,
revisionHeartbeatQuery: revisionHeartbeatQuery,
gcWindow: config.gcWindow,
gcInterval: config.gcInterval,
gcTimeout: config.gcMaxOperationTime,
analyzeBeforeStatistics: config.analyzeBeforeStatistics,
watchEnabled: watchEnabled,
gcCtx: gcCtx,
workerCtx: gcCtx,
cancelGc: cancelGc,
readTxOptions: pgx.TxOptions{IsoLevel: pgx.RepeatableRead, AccessMode: pgx.ReadOnly},
maxRetries: config.maxRetries,
Expand All @@ -367,6 +378,7 @@ func newPostgresDatastore(
inStrictReadMode: config.readStrictMode,
filterMaximumIDCount: config.filterMaximumIDCount,
schema: *schema,
quantizationPeriodNanos: quantizationPeriodNanos,
}

if isPrimary && config.readStrictMode {
Expand All @@ -381,11 +393,17 @@ func newPostgresDatastore(

// Start a goroutine for garbage collection.
if isPrimary {
datastore.workerGroup, datastore.workerCtx = errgroup.WithContext(datastore.workerCtx)
if config.revisionHeartbeatEnabled {
datastore.workerGroup.Go(func() error {
return datastore.startRevisionHeartbeat(datastore.workerCtx)
})
}

if datastore.gcInterval > 0*time.Minute && config.gcEnabled {
datastore.gcGroup, datastore.gcCtx = errgroup.WithContext(datastore.gcCtx)
datastore.gcGroup.Go(func() error {
datastore.workerGroup.Go(func() error {
return common.StartGarbageCollector(
datastore.gcCtx,
datastore.workerCtx,
datastore,
datastore.gcInterval,
datastore.gcWindow,
Expand All @@ -410,6 +428,7 @@ type pgDatastore struct {
watchBufferWriteTimeout time.Duration
optimizedRevisionQuery string
validTransactionQuery string
revisionHeartbeatQuery string
gcWindow time.Duration
gcInterval time.Duration
gcTimeout time.Duration
Expand All @@ -424,11 +443,12 @@ type pgDatastore struct {

credentialsProvider datastore.CredentialsProvider

gcGroup *errgroup.Group
gcCtx context.Context
cancelGc context.CancelFunc
gcHasRun atomic.Bool
filterMaximumIDCount uint16
workerGroup *errgroup.Group
workerCtx context.Context
cancelGc context.CancelFunc
gcHasRun atomic.Bool
filterMaximumIDCount uint16
quantizationPeriodNanos int64
}

func (pgd *pgDatastore) IsStrictReadModeEnabled() bool {
Expand Down Expand Up @@ -651,8 +671,8 @@ func wrapError(err error) error {
func (pgd *pgDatastore) Close() error {
pgd.cancelGc()

if pgd.gcGroup != nil {
err := pgd.gcGroup.Wait()
if pgd.workerGroup != nil {
err := pgd.workerGroup.Wait()
log.Warn().Err(err).Msg("completed shutdown of postgres datastore")
}

Expand Down Expand Up @@ -721,7 +741,7 @@ func (pgd *pgDatastore) OfflineFeatures() (*datastore.Features, error) {
Status: datastore.FeatureUnsupported,
},
ContinuousCheckpointing: datastore.Feature{
Status: datastore.FeatureUnsupported,
Status: datastore.FeatureSupported,
},
WatchEmitsImmediately: datastore.Feature{
Status: datastore.FeatureUnsupported,
Expand All @@ -742,6 +762,53 @@ func (pgd *pgDatastore) OfflineFeatures() (*datastore.Features, error) {
}, nil
}

const defaultMaxHeartbeatLeaderJitterPercent = 10

func (pgd *pgDatastore) startRevisionHeartbeat(ctx context.Context) error {
heartbeatDuration := time.Nanosecond * time.Duration(pgd.quantizationPeriodNanos)
log.Info().Stringer("interval", heartbeatDuration).Msg("starting revision heartbeat")
tick := time.NewTicker(heartbeatDuration)

// Leader election. Continue trying to acquire in case the current leader died.
for {
if ctx.Err() != nil {
return ctx.Err()
}

ok, err := pgd.tryAcquireLock(ctx, revisionHeartbeatLock)
if err != nil {
log.Warn().Err(err).Msg("failed to acquire revision heartbeat lock")
}

if ok {
break
}

jitter := time.Duration(float64(heartbeatDuration) * rand.Float64() * defaultMaxHeartbeatLeaderJitterPercent / 100) // nolint:gosec
time.Sleep(jitter)
}

defer func() {
if err := pgd.releaseLock(ctx, revisionHeartbeatLock); err != nil {
log.Warn().Err(err).Msg("failed to release revision heartbeat lock")
}
}()

log.Info().Stringer("interval", heartbeatDuration).Msg("got elected revision heartbeat leader, starting")

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-tick.C:
_, err := pgd.writePool.Exec(ctx, pgd.revisionHeartbeatQuery)
if err != nil {
log.Warn().Err(err).Msg("failed to write heartbeat revision")
}
}
}
}

func buildLivingObjectFilterForRevision(revision postgresRevision) queryFilterer {
createdBeforeTXN := sq.Expr(fmt.Sprintf(
snapshotAlive,
Expand Down
Loading

0 comments on commit fce741b

Please sign in to comment.