Skip to content

Commit

Permalink
wip: recreate DB on schema change
Browse files Browse the repository at this point in the history
  • Loading branch information
stuartwdouglas committed Feb 27, 2025
1 parent 13d5fa3 commit bedd5db
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 4 deletions.
88 changes: 86 additions & 2 deletions backend/provisioner/dev_provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package provisioner

import (
"context"
"database/sql"
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -37,6 +39,15 @@ func provisionMysql(mysqlPort int, recreate bool) InMemResourceProvisionerFn {
dbName := strcase.ToLowerSnake(deployment.Payload.Module) + "_" + strcase.ToLowerSnake(res.ResourceID())

logger.Debugf("Provisioning mysql database: %s", dbName)
db, ok := res.(*schema.Database)
if !ok {
return nil, fmt.Errorf("expected database, got %T", res)
}
migrationHash := ""
for migration := range slices.FilterVariants[*schema.MetadataSQLMigration](db.Metadata) {
migrationHash = migration.Digest
break
}

// We assume that the DB hsas already been started when running in dev mode
mysqlDSN, err := dev.SetupMySQL(ctx, mysqlPort)
Expand All @@ -51,7 +62,7 @@ func provisionMysql(mysqlPort int, recreate bool) InMemResourceProvisionerFn {
case <-timeout:
return nil, fmt.Errorf("failed to query database: %w", err)
case <-retry.C:
event, err := establishMySQLDB(ctx, mysqlDSN, dbName, mysqlPort, recreate)
event, err := establishMySQLDB(ctx, mysqlDSN, dbName, mysqlPort, recreate, migrationHash)
if err != nil {
logger.Debugf("failed to establish mysql database: %s", err.Error())
continue
Expand All @@ -67,8 +78,9 @@ func provisionMysql(mysqlPort int, recreate bool) InMemResourceProvisionerFn {
}
}

func establishMySQLDB(ctx context.Context, mysqlDSN string, dbName string, mysqlPort int, recreate bool) (*schema.DatabaseRuntimeConnections, error) {
func establishMySQLDB(ctx context.Context, mysqlDSN string, dbName string, mysqlPort int, recreate bool, migrationHash string) (*schema.DatabaseRuntimeConnections, error) {
conn, err := otelsql.Open("mysql", mysqlDSN)
logger := log.FromContext(ctx)
if err != nil {
return nil, fmt.Errorf("failed to connect to mysql: %w", err)
}
Expand All @@ -81,6 +93,32 @@ func establishMySQLDB(ctx context.Context, mysqlDSN string, dbName string, mysql
defer res.Close()

exists := res.Next()

if migrationHash != "" {
_, err := conn.Exec("CREATE TABLE IF NOT EXISTS migrations (db VARCHAR(255) PRIMARY KEY NOT NULL, migration VARCHAR(255) NOT NULL)")
if err != nil {
return nil, fmt.Errorf("failed to create migrations tracking table: %w", err)
}
if exists && !recreate {
// We might still need to recreate the database if the schema has changed
existing := ""
err := conn.QueryRow("SELECT migration FROM migrations WHERE db=?", dbName).Scan(&existing)
if err != nil {
if !errors.Is(err, sql.ErrNoRows) {
return nil, fmt.Errorf("failed to query migrations table: %w", err)
}
logger.Infof("existing migration: %s", existing)

Check failure on line 110 in backend/provisioner/dev_provisioner.go

View workflow job for this annotation

GitHub Actions / Lint

use of `logger.Infof` forbidden because "Infof should only be used for user-facing messages, use //nolint to suppress" (forbidigo)
} else {
logger.Infof("existing migration: %s", existing)

Check failure on line 112 in backend/provisioner/dev_provisioner.go

View workflow job for this annotation

GitHub Actions / Lint

use of `logger.Infof` forbidden because "Infof should only be used for user-facing messages, use //nolint to suppress" (forbidigo)
logger.Infof("existing current: %s", migrationHash)

Check failure on line 113 in backend/provisioner/dev_provisioner.go

View workflow job for this annotation

GitHub Actions / Lint

use of `logger.Infof` forbidden because "Infof should only be used for user-facing messages, use //nolint to suppress" (forbidigo)
if existing != migrationHash {
logger.Infof("Recreating database %q due to schema change", dbName)

Check failure on line 115 in backend/provisioner/dev_provisioner.go

View workflow job for this annotation

GitHub Actions / Lint

use of `logger.Infof` forbidden because "Infof should only be used for user-facing messages, use //nolint to suppress" (forbidigo)
recreate = true
}
}
}
}

if exists && recreate {
_, err = conn.ExecContext(ctx, "DROP DATABASE "+dbName)
if err != nil {
Expand All @@ -96,6 +134,13 @@ func establishMySQLDB(ctx context.Context, mysqlDSN string, dbName string, mysql

dsn := dsn.MySQLDSN(dbName, dsn.Port(mysqlPort))

if migrationHash != "" {
_, err := conn.Exec("INSERT INTO migrations (db, migration) VALUES (?, ?) ON DUPLICATE KEY UPDATE migration = ?", dbName, migrationHash, migrationHash)
if err != nil {
return nil, fmt.Errorf("failed to insert migration hash: %w", err)
}
}

return &schema.DatabaseRuntimeConnections{
Write: &schema.DSNDatabaseConnector{DSN: dsn},
Read: &schema.DSNDatabaseConnector{DSN: dsn},
Expand Down Expand Up @@ -124,6 +169,12 @@ func ProvisionMySQLForTest(ctx context.Context, moduleName string, id string) (s

func provisionPostgres(postgresPort int, recreate bool) InMemResourceProvisionerFn {
return func(ctx context.Context, changeset key.Changeset, deployment key.Deployment, resource schema.Provisioned) (*schema.RuntimeElement, error) {

db, ok := resource.(*schema.Database)
if !ok {
return nil, fmt.Errorf("expected database, got %T", resource)
}

logger := log.FromContext(ctx)

dbName := strcase.ToLowerSnake(deployment.Payload.Module) + "_" + strcase.ToLowerSnake(resource.ResourceID())
Expand All @@ -149,6 +200,33 @@ func provisionPostgres(postgresPort int, recreate bool) InMemResourceProvisioner
defer res.Close()

exists := res.Next()
migrationHash := ""
for migration := range slices.FilterVariants[*schema.MetadataSQLMigration](db.Metadata) {
_, err := conn.Exec("CREATE TABLE IF NOT EXISTS migrations (db VARCHAR PRIMARY KEY NOT NULL, migration VARCHAR NOT NULL)")
if err != nil {
return nil, fmt.Errorf("failed to create migrations tracking table: %w", err)
}
migrationHash = migration.Digest
if exists && !recreate {
// We might still need to recreate the database if the schema has changed
existing := ""
err := conn.QueryRow("SELECT migration FROM migrations WHERE db=$1", dbName).Scan(&existing)
if err != nil {
if !errors.Is(err, sql.ErrNoRows) {
return nil, fmt.Errorf("failed to query migrations table: %w", err)
}
logger.Infof("existing migration: %s", existing)

Check failure on line 218 in backend/provisioner/dev_provisioner.go

View workflow job for this annotation

GitHub Actions / Lint

use of `logger.Infof` forbidden because "Infof should only be used for user-facing messages, use //nolint to suppress" (forbidigo)
} else {
logger.Infof("existing migration: %s", existing)

Check failure on line 220 in backend/provisioner/dev_provisioner.go

View workflow job for this annotation

GitHub Actions / Lint

use of `logger.Infof` forbidden because "Infof should only be used for user-facing messages, use //nolint to suppress" (forbidigo)
logger.Infof("existing current: %s", migrationHash)

Check failure on line 221 in backend/provisioner/dev_provisioner.go

View workflow job for this annotation

GitHub Actions / Lint

use of `logger.Infof` forbidden because "Infof should only be used for user-facing messages, use //nolint to suppress" (forbidigo)
if existing != migrationHash {
logger.Infof("Recreating database %q due to schema change", dbName)

Check failure on line 223 in backend/provisioner/dev_provisioner.go

View workflow job for this annotation

GitHub Actions / Lint

use of `logger.Infof` forbidden because "Infof should only be used for user-facing messages, use //nolint to suppress" (forbidigo)
recreate = true
}
}
}
}

if exists && recreate {
// Terminate any dangling connections.
_, err = conn.ExecContext(ctx, `
Expand All @@ -171,6 +249,12 @@ func provisionPostgres(postgresPort int, recreate bool) InMemResourceProvisioner
}
}

if migrationHash != "" {
_, err := conn.Exec("INSERT INTO migrations (db, migration) VALUES ($1, $2)ON CONFLICT (db) DO UPDATE SET migration = EXCLUDED.migration;", dbName, migrationHash)
if err != nil {
return nil, fmt.Errorf("failed to insert migration hash: %w", err)
}
}
dsn := dsn.PostgresDSN(dbName, dsn.Port(postgresPort))

return &schema.RuntimeElement{
Expand Down
5 changes: 3 additions & 2 deletions common/schema/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,9 @@ func (d *Database) GetProvisioned() ResourceSet {
migration, ok := slices.FindVariant[*MetadataSQLMigration](d.Metadata)
if ok {
result = append(result, &ProvisionedResource{
Kind: ResourceTypeSQLMigration,
Config: &Database{Type: d.Type, Metadata: []Metadata{migration}},
Kind: ResourceTypeSQLMigration,
Config: &Database{Type: d.Type, Metadata: []Metadata{migration}},
DeploymentSpecific: true,
})
}

Expand Down

0 comments on commit bedd5db

Please sign in to comment.