Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

follow ups to #2252 #2254

Merged
merged 1 commit into from
Feb 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion internal/datastore/postgres/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ const (
defaultExpirationDisabled = false
// no follower delay by default, it should only be set if using read replicas
defaultFollowerReadDelay = 0
defaultRevisionHeartbeat = false
defaultRevisionHeartbeat = true
)

// Option provides the facility to configure how clients within the
Expand Down
28 changes: 18 additions & 10 deletions internal/datastore/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -312,14 +312,17 @@ func newPostgresDatastore(
followerReadDelayNanos,
)

revisionHeartbeatQuery := fmt.Sprintf(
insertHeartBeatRevision,
colXID,
tableTransaction,
colTimestamp,
quantizationPeriodNanos,
colSnapshot,
)
var revisionHeartbeatQuery string
if config.revisionHeartbeatEnabled {
revisionHeartbeatQuery = fmt.Sprintf(
insertHeartBeatRevision,
colXID,
tableTransaction,
colTimestamp,
quantizationPeriodNanos,
colSnapshot,
)
}

validTransactionQuery := fmt.Sprintf(
queryValidTransaction,
Expand Down Expand Up @@ -732,6 +735,11 @@ func (pgd *pgDatastore) Features(ctx context.Context) (*datastore.Features, erro
}

func (pgd *pgDatastore) OfflineFeatures() (*datastore.Features, error) {
continuousCheckpointing := datastore.FeatureUnsupported
if pgd.revisionHeartbeatQuery != "" {
continuousCheckpointing = datastore.FeatureSupported
}

if pgd.watchEnabled {
return &datastore.Features{
Watch: datastore.Feature{
Expand All @@ -741,7 +749,7 @@ func (pgd *pgDatastore) OfflineFeatures() (*datastore.Features, error) {
Status: datastore.FeatureUnsupported,
},
ContinuousCheckpointing: datastore.Feature{
Status: datastore.FeatureSupported,
Status: continuousCheckpointing,
},
WatchEmitsImmediately: datastore.Feature{
Status: datastore.FeatureUnsupported,
Expand All @@ -765,7 +773,7 @@ func (pgd *pgDatastore) OfflineFeatures() (*datastore.Features, error) {
const defaultMaxHeartbeatLeaderJitterPercent = 10

func (pgd *pgDatastore) startRevisionHeartbeat(ctx context.Context) error {
heartbeatDuration := time.Nanosecond * time.Duration(pgd.quantizationPeriodNanos)
heartbeatDuration := max(time.Second, time.Nanosecond*time.Duration(pgd.quantizationPeriodNanos))
log.Info().Stringer("interval", heartbeatDuration).Msg("starting revision heartbeat")
tick := time.NewTicker(heartbeatDuration)

Expand Down
5 changes: 5 additions & 0 deletions internal/datastore/postgres/postgres_shared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ func testPostgresDatastore(t *testing.T, config postgresTestConfig) {
WatchBufferLength(watchBufferLength),
DebugAnalyzeBeforeStatistics(),
MigrationPhase(config.migrationPhase),
WithRevisionHeartbeat(false), // heartbeat revision messes with tests that assert over revisions
)
require.NoError(t, err)
return ds
Expand All @@ -111,6 +112,7 @@ func testPostgresDatastore(t *testing.T, config postgresTestConfig) {
WatchBufferLength(watchBufferLength),
DebugAnalyzeBeforeStatistics(),
MigrationPhase(config.migrationPhase),
WithRevisionHeartbeat(false), // heartbeat revision messes with tests that assert over revisions
)
require.NoError(t, err)
return ds
Expand Down Expand Up @@ -173,6 +175,7 @@ func testPostgresDatastore(t *testing.T, config postgresTestConfig) {
GCInterval(veryLargeGCInterval),
WatchBufferLength(50),
MigrationPhase(config.migrationPhase),
WithRevisionHeartbeat(false),
))

t.Run("OverlappingRevisionWatch", createDatastoreTest(
Expand Down Expand Up @@ -297,6 +300,7 @@ func testPostgresDatastoreWithoutCommitTimestamps(t *testing.T, config postgresT
GCInterval(veryLargeGCInterval),
WatchBufferLength(watchBufferLength),
DebugAnalyzeBeforeStatistics(),
WithRevisionHeartbeat(false),
)
require.NoError(t, err)
return ds
Expand All @@ -316,6 +320,7 @@ func testPostgresDatastoreWithoutCommitTimestamps(t *testing.T, config postgresT
GCInterval(gcInterval),
WatchBufferLength(watchBufferLength),
DebugAnalyzeBeforeStatistics(),
WithRevisionHeartbeat(false),
)
require.NoError(t, err)
return ds
Expand Down
1 change: 1 addition & 0 deletions internal/datastore/postgres/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func TestPostgresDatastoreGC(t *testing.T) {
GCInterval(veryLargeGCInterval),
WatchBufferLength(1),
MigrationPhase(config.migrationPhase),
WithRevisionHeartbeat(false),
))
})
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/cmd/datastore/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ type Config struct {
// Expermimental
ExperimentalColumnOptimization bool `debugmap:"visible"`
EnableExperimentalRelationshipExpiration bool `debugmap:"visible"`
EnableExperimentalRevisionHeartbeat bool `debugmap:"visible"`
EnableRevisionHeartbeat bool `debugmap:"visible"`
}

//go:generate go run github.com/ecordell/optgen -sensitive-field-name-matches uri,secure -output zz_generated.relintegritykey.options.go . RelIntegrityKey
Expand Down Expand Up @@ -628,7 +628,7 @@ func newPostgresPrimaryDatastore(ctx context.Context, opts Config) (datastore.Da
postgres.WatchBufferWriteTimeout(opts.WatchBufferWriteTimeout),
postgres.MigrationPhase(opts.MigrationPhase),
postgres.AllowedMigrations(opts.AllowedMigrations),
postgres.WithRevisionHeartbeat(opts.EnableExperimentalRevisionHeartbeat),
postgres.WithRevisionHeartbeat(opts.EnableRevisionHeartbeat),
}

commonOptions, err := commonPostgresDatastoreOptions(opts)
Expand Down
10 changes: 5 additions & 5 deletions pkg/cmd/datastore/zz_generated.options.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pkg/cmd/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,7 @@ func RegisterServeFlags(cmd *cobra.Command, config *server.Config) error {
apiFlags.Uint32Var(&config.MaxDeleteRelationshipsLimit, "max-delete-relationships-limit", 1000, "maximum number of relationships that can be deleted in a single request")
apiFlags.Uint32Var(&config.MaxLookupResourcesLimit, "max-lookup-resources-limit", 1000, "maximum number of resources that can be looked up in a single request")
apiFlags.Uint32Var(&config.MaxBulkExportRelationshipsLimit, "max-bulk-export-relationships-limit", 10_000, "maximum number of relationships that can be exported in a single request")
apiFlags.BoolVar(&config.EnableRevisionHeartbeat, "enable-revision-heartbeat", true, "enables support for revision heartbeat, used to create a synthetic revision on an interval defined by the quantization window (postgres only)")

datastoreFlags := nfs.FlagSet(BoldBlue("Datastore"))
// Flags for the datastore
Expand Down Expand Up @@ -169,7 +170,6 @@ func RegisterServeFlags(cmd *cobra.Command, config *server.Config) error {
Msg("The old implementation of LookupResources is no longer available, and a `false` value is no longer valid. Please remove this flag.")
}

experimentalFlags.BoolVar(&config.EnableExperimentalRevisionHeartbeat, "enable-experimental-revision-heartbeat", false, "enables experimental support for postgres revision heartbeat, used to create a synthetic revision on a given interval (postgres only)")
experimentalFlags.BoolVar(&config.EnableExperimentalRelationshipExpiration, "enable-experimental-relationship-expiration", false, "enables experimental support for first-class relationship expiration")
experimentalFlags.BoolVar(&config.EnableExperimentalWatchableSchemaCache, "enable-experimental-watchable-schema-cache", false, "enables the experimental schema cache which makes use of the Watch API for automatic updates")
// TODO: these two could reasonably be put in either the Dispatch group or the Experimental group. Is there a preference?
Expand Down
4 changes: 2 additions & 2 deletions pkg/cmd/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ type Config struct {
MaxBulkExportRelationshipsLimit uint32 `debugmap:"visible"`
EnableExperimentalLookupResources bool `debugmap:"visible"`
EnableExperimentalRelationshipExpiration bool `debugmap:"visible"`
EnableExperimentalRevisionHeartbeat bool `debugmap:"visible"`
EnableRevisionHeartbeat bool `debugmap:"visible"`

// Additional Services
MetricsAPI util.HTTPServerConfig `debugmap:"visible"`
Expand Down Expand Up @@ -229,7 +229,7 @@ func (c *Config) Complete(ctx context.Context) (RunnableServer, error) {
// are at most the number of elements returned from a datastore query
datastorecfg.WithFilterMaximumIDCount(c.DispatchChunkSize),
datastorecfg.WithEnableExperimentalRelationshipExpiration(c.EnableExperimentalRelationshipExpiration),
datastorecfg.WithEnableExperimentalRevisionHeartbeat(c.EnableExperimentalRevisionHeartbeat),
datastorecfg.WithEnableRevisionHeartbeat(c.EnableRevisionHeartbeat),
)
if err != nil {
return nil, spiceerrors.NewTerminationErrorBuilder(fmt.Errorf("failed to create datastore: %w", err)).
Expand Down
10 changes: 5 additions & 5 deletions pkg/cmd/server/zz_generated.options.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading