From bedd5dbe854e6f6ec8e07abcf79d7a7567d8a2e1 Mon Sep 17 00:00:00 2001 From: Stuart Douglas Date: Thu, 27 Feb 2025 12:35:49 +1100 Subject: [PATCH] wip: recreate DB on schema change --- backend/provisioner/dev_provisioner.go | 88 +++++++++++++++++++++++++- common/schema/database.go | 5 +- 2 files changed, 89 insertions(+), 4 deletions(-) diff --git a/backend/provisioner/dev_provisioner.go b/backend/provisioner/dev_provisioner.go index 8b1b63d1d7..1ef9a93b7a 100644 --- a/backend/provisioner/dev_provisioner.go +++ b/backend/provisioner/dev_provisioner.go @@ -2,6 +2,8 @@ package provisioner import ( "context" + "database/sql" + "errors" "fmt" "time" @@ -37,6 +39,15 @@ func provisionMysql(mysqlPort int, recreate bool) InMemResourceProvisionerFn { dbName := strcase.ToLowerSnake(deployment.Payload.Module) + "_" + strcase.ToLowerSnake(res.ResourceID()) logger.Debugf("Provisioning mysql database: %s", dbName) + db, ok := res.(*schema.Database) + if !ok { + return nil, fmt.Errorf("expected database, got %T", res) + } + migrationHash := "" + for migration := range slices.FilterVariants[*schema.MetadataSQLMigration](db.Metadata) { + migrationHash = migration.Digest + break + } // We assume that the DB hsas already been started when running in dev mode mysqlDSN, err := dev.SetupMySQL(ctx, mysqlPort) @@ -51,7 +62,7 @@ func provisionMysql(mysqlPort int, recreate bool) InMemResourceProvisionerFn { case <-timeout: return nil, fmt.Errorf("failed to query database: %w", err) case <-retry.C: - event, err := establishMySQLDB(ctx, mysqlDSN, dbName, mysqlPort, recreate) + event, err := establishMySQLDB(ctx, mysqlDSN, dbName, mysqlPort, recreate, migrationHash) if err != nil { logger.Debugf("failed to establish mysql database: %s", err.Error()) continue @@ -67,8 +78,9 @@ func provisionMysql(mysqlPort int, recreate bool) InMemResourceProvisionerFn { } } -func establishMySQLDB(ctx context.Context, mysqlDSN string, dbName string, mysqlPort int, recreate bool) (*schema.DatabaseRuntimeConnections, error) { +func establishMySQLDB(ctx context.Context, mysqlDSN string, dbName string, mysqlPort int, recreate bool, migrationHash string) (*schema.DatabaseRuntimeConnections, error) { conn, err := otelsql.Open("mysql", mysqlDSN) + logger := log.FromContext(ctx) if err != nil { return nil, fmt.Errorf("failed to connect to mysql: %w", err) } @@ -81,6 +93,32 @@ func establishMySQLDB(ctx context.Context, mysqlDSN string, dbName string, mysql defer res.Close() exists := res.Next() + + if migrationHash != "" { + _, err := conn.Exec("CREATE TABLE IF NOT EXISTS migrations (db VARCHAR(255) PRIMARY KEY NOT NULL, migration VARCHAR(255) NOT NULL)") + if err != nil { + return nil, fmt.Errorf("failed to create migrations tracking table: %w", err) + } + if exists && !recreate { + // We might still need to recreate the database if the schema has changed + existing := "" + err := conn.QueryRow("SELECT migration FROM migrations WHERE db=?", dbName).Scan(&existing) + if err != nil { + if !errors.Is(err, sql.ErrNoRows) { + return nil, fmt.Errorf("failed to query migrations table: %w", err) + } + logger.Infof("existing migration: %s", existing) + } else { + logger.Infof("existing migration: %s", existing) + logger.Infof("existing current: %s", migrationHash) + if existing != migrationHash { + logger.Infof("Recreating database %q due to schema change", dbName) + recreate = true + } + } + } + } + if exists && recreate { _, err = conn.ExecContext(ctx, "DROP DATABASE "+dbName) if err != nil { @@ -96,6 +134,13 @@ func establishMySQLDB(ctx context.Context, mysqlDSN string, dbName string, mysql dsn := dsn.MySQLDSN(dbName, dsn.Port(mysqlPort)) + if migrationHash != "" { + _, err := conn.Exec("INSERT INTO migrations (db, migration) VALUES (?, ?) ON DUPLICATE KEY UPDATE migration = ?", dbName, migrationHash, migrationHash) + if err != nil { + return nil, fmt.Errorf("failed to insert migration hash: %w", err) + } + } + return &schema.DatabaseRuntimeConnections{ Write: &schema.DSNDatabaseConnector{DSN: dsn}, Read: &schema.DSNDatabaseConnector{DSN: dsn}, @@ -124,6 +169,12 @@ func ProvisionMySQLForTest(ctx context.Context, moduleName string, id string) (s func provisionPostgres(postgresPort int, recreate bool) InMemResourceProvisionerFn { return func(ctx context.Context, changeset key.Changeset, deployment key.Deployment, resource schema.Provisioned) (*schema.RuntimeElement, error) { + + db, ok := resource.(*schema.Database) + if !ok { + return nil, fmt.Errorf("expected database, got %T", resource) + } + logger := log.FromContext(ctx) dbName := strcase.ToLowerSnake(deployment.Payload.Module) + "_" + strcase.ToLowerSnake(resource.ResourceID()) @@ -149,6 +200,33 @@ func provisionPostgres(postgresPort int, recreate bool) InMemResourceProvisioner defer res.Close() exists := res.Next() + migrationHash := "" + for migration := range slices.FilterVariants[*schema.MetadataSQLMigration](db.Metadata) { + _, err := conn.Exec("CREATE TABLE IF NOT EXISTS migrations (db VARCHAR PRIMARY KEY NOT NULL, migration VARCHAR NOT NULL)") + if err != nil { + return nil, fmt.Errorf("failed to create migrations tracking table: %w", err) + } + migrationHash = migration.Digest + if exists && !recreate { + // We might still need to recreate the database if the schema has changed + existing := "" + err := conn.QueryRow("SELECT migration FROM migrations WHERE db=$1", dbName).Scan(&existing) + if err != nil { + if !errors.Is(err, sql.ErrNoRows) { + return nil, fmt.Errorf("failed to query migrations table: %w", err) + } + logger.Infof("existing migration: %s", existing) + } else { + logger.Infof("existing migration: %s", existing) + logger.Infof("existing current: %s", migrationHash) + if existing != migrationHash { + logger.Infof("Recreating database %q due to schema change", dbName) + recreate = true + } + } + } + } + if exists && recreate { // Terminate any dangling connections. _, err = conn.ExecContext(ctx, ` @@ -171,6 +249,12 @@ func provisionPostgres(postgresPort int, recreate bool) InMemResourceProvisioner } } + if migrationHash != "" { + _, err := conn.Exec("INSERT INTO migrations (db, migration) VALUES ($1, $2)ON CONFLICT (db) DO UPDATE SET migration = EXCLUDED.migration;", dbName, migrationHash) + if err != nil { + return nil, fmt.Errorf("failed to insert migration hash: %w", err) + } + } dsn := dsn.PostgresDSN(dbName, dsn.Port(postgresPort)) return &schema.RuntimeElement{ diff --git a/common/schema/database.go b/common/schema/database.go index 36a9c39234..b04cfc39dc 100644 --- a/common/schema/database.go +++ b/common/schema/database.go @@ -60,8 +60,9 @@ func (d *Database) GetProvisioned() ResourceSet { migration, ok := slices.FindVariant[*MetadataSQLMigration](d.Metadata) if ok { result = append(result, &ProvisionedResource{ - Kind: ResourceTypeSQLMigration, - Config: &Database{Type: d.Type, Metadata: []Metadata{migration}}, + Kind: ResourceTypeSQLMigration, + Config: &Database{Type: d.Type, Metadata: []Metadata{migration}}, + DeploymentSpecific: true, }) }