diff --git a/Makefile b/Makefile index a111b1d..768fb08 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ NAME = miga -VERSION ?= v0.8.0 +VERSION ?= v0.9.0 PG_CONTAINER_NAME = miga-pg MYSQL_CONTAINER_NAME = miga-mysql CLICKHOUSE_CONTAINER_NAME = miga-clickhouse diff --git a/commands/all/command.go b/commands/all/command.go index 804bac5..3cc1a96 100644 --- a/commands/all/command.go +++ b/commands/all/command.go @@ -16,7 +16,7 @@ func Command() *cli.Command { Name: "all", Usage: "All command combine migration and seed command", Subcommands: cli.CommandsByName([]*cli.Command{ - &cli.Command{ + { Name: "up", Usage: "Up db to latest migration version and to latest seed.", Action: func(ctx *cli.Context) error { diff --git a/config/config.go b/config/config.go index 6dfd2c4..12f5e2d 100644 --- a/config/config.go +++ b/config/config.go @@ -131,6 +131,12 @@ func fillDBConfig(cfg *driver.Config) { if viper.IsSet("clickhouse.dsn") { cfg.Dialect = "clickhouse" cfg.Dsn = viper.GetString("clickhouse.dsn") + if viper.IsSet("clickhouse.cluster") { + cfg.ClickhouseClusterName = viper.GetString("clickhouse.cluster") + cfg.ClickhouseEngine = viper.GetString("clickhouse.engine") + cfg.ClickhouseSchema = viper.GetString("clickhouse.schema") + cfg.ClickhouseSharded = viper.GetBool("clickhouse.sharded") + } return } diff --git a/driver/driver.go b/driver/driver.go index ce3a012..ff9a211 100644 --- a/driver/driver.go +++ b/driver/driver.go @@ -33,6 +33,11 @@ type ( Dir string VersionTableName string Enabled bool + + ClickhouseSchema string + ClickhouseClusterName string + ClickhouseEngine string + ClickhouseSharded bool } Interface interface { @@ -57,6 +62,10 @@ func New(cfg *Config) (Interface, error) { cfg.Dsn, cfg.VersionTableName, cfg.Dir, + cfg.ClickhouseSchema, + cfg.ClickhouseClusterName, + cfg.ClickhouseEngine, + cfg.ClickhouseSharded, ) case Migrate: return migrate.New( diff --git a/driver/goose/driver.go b/driver/goose/driver.go index f1e5e9c..239dc68 100644 --- a/driver/goose/driver.go +++ b/driver/goose/driver.go @@ -2,6 +2,7 @@ package goose import ( "database/sql" + "fmt" "miga/utils" @@ -9,11 +10,21 @@ import ( ) type Goose struct { - db *sql.DB - dir string + db *sql.DB + dir string + dialect string + versionTableName string + clickhouseSchema string + clickhouseClusterName string + clickhouseEngine string + clickhouseSharded bool } -func New(dialect, dsn, tableName, dir string) (*Goose, error) { +func New( + dialect, dsn, tableName, dir string, + clickhouseSchema, clickhouseClusterName, clickhouseEngine string, + clickhouseSharded bool, +) (*Goose, error) { err := orig.SetDialect(dialect) if err != nil { return nil, err @@ -27,7 +38,20 @@ func New(dialect, dsn, tableName, dir string) (*Goose, error) { return nil, err } - return &Goose{db: db, dir: dir}, nil + return &Goose{ + db: db, + dir: dir, + dialect: dialect, + versionTableName: tableName, + clickhouseSchema: clickhouseSchema, + clickhouseClusterName: clickhouseClusterName, + clickhouseEngine: clickhouseEngine, + clickhouseSharded: clickhouseSharded, + }, nil +} + +func (g Goose) isClickhouse() bool { + return g.dialect == "clickhouse" } func (g Goose) Close() error { @@ -39,33 +63,109 @@ func (g Goose) Create(name, ext string) error { } func (g Goose) Down() error { + g.clickhouseHackEnsureTable() return orig.Run("down", g.db, g.dir) } func (g Goose) DownTo(version string) error { + g.clickhouseHackEnsureTable() return orig.Run("down-to", g.db, g.dir, version) } func (g Goose) Redo() error { + g.clickhouseHackEnsureTable() return orig.Run("redo", g.db, g.dir) } func (g Goose) Reset() error { + g.clickhouseHackEnsureTable() return orig.Run("reset", g.db, g.dir) } func (g Goose) Status() error { + g.clickhouseHackEnsureTable() return orig.Run("status", g.db, g.dir) } func (g Goose) Up() error { + g.clickhouseHackEnsureTable() return orig.Run("up", g.db, g.dir) } func (g Goose) UpTo(version string) error { + g.clickhouseHackEnsureTable() return orig.Run("up-to", g.db, g.dir, version) } func (g Goose) Version() error { + g.clickhouseHackEnsureTable() return orig.Run("version", g.db, g.dir) } + +func (g Goose) clickhouseHackEnsureTable() { + if !g.isClickhouse() || len(g.clickhouseClusterName) == 0 { + return + } + + var ( + queries = []string{} + schemaTable = fmt.Sprintf("%s.%s", g.clickhouseSchema, g.versionTableName) + ) + + if g.clickhouseSharded { + queries = append(queries, fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s_shard ON CLUSTER '%s' ( + version_id Int64, + is_applied UInt8, + date Date default now(), + tstamp DateTime default now() + ) Engine = %s + ORDER BY tstamp + `, schemaTable, g.clickhouseClusterName, g.clickhouseEngine)) + + queries = append(queries, fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ON CLUSTER '%s' AS %s_shard + ENGINE = Distributed('%s', %s, %s, rand()) + `, schemaTable, g.clickhouseClusterName, schemaTable, g.clickhouseClusterName, g.clickhouseSchema, g.versionTableName)) + } else { + queries = append(queries, fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s ON CLUSTER '%s' ( + version_id Int64, + is_applied UInt8, + date Date default now(), + tstamp DateTime default now() + ) Engine = %s + ORDER BY tstamp + `, schemaTable, g.clickhouseClusterName, g.clickhouseEngine)) + } + + for _, q := range queries { + if _, err := g.db.Exec(q); err != nil { + panic("Failed applly clickhouse dirty hack: " + err.Error()) + } + } + + var total int + err := g.db. + QueryRow(fmt.Sprintf("select count(*) from %s", schemaTable)). + Scan(&total) + if err != nil && err != sql.ErrNoRows { + panic("Failed get last applied version: " + err.Error()) + } else if total > 0 { + return + } + + tx, err := g.db.Begin() + if err != nil { + panic("Failed begin tx: " + err.Error()) + } + defer tx.Rollback() // nolint: errcheck + + if _, err := tx.Exec(fmt.Sprintf("INSERT INTO %s (version_id, is_applied) VALUES (?, ?)", schemaTable), 0, 1); err != nil { + panic("Failed insert initial version: " + err.Error()) + } + + if err := tx.Commit(); err != nil { + panic("Failed commit initial version: " + err.Error()) + } +}