Skip to content

Commit

Permalink
Create clickhouse version table before UP commands (#18)
Browse files Browse the repository at this point in the history
  • Loading branch information
chapsuk authored Sep 14, 2021
1 parent a49a023 commit 15f2d4d
Show file tree
Hide file tree
Showing 5 changed files with 121 additions and 6 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
NAME = miga
VERSION ?= v0.8.0
VERSION ?= v0.9.0
PG_CONTAINER_NAME = miga-pg
MYSQL_CONTAINER_NAME = miga-mysql
CLICKHOUSE_CONTAINER_NAME = miga-clickhouse
Expand Down
2 changes: 1 addition & 1 deletion commands/all/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func Command() *cli.Command {
Name: "all",
Usage: "All command combine migration and seed command",
Subcommands: cli.CommandsByName([]*cli.Command{
&cli.Command{
{
Name: "up",
Usage: "Up db to latest migration version and to latest seed.",
Action: func(ctx *cli.Context) error {
Expand Down
6 changes: 6 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,12 @@ func fillDBConfig(cfg *driver.Config) {
if viper.IsSet("clickhouse.dsn") {
cfg.Dialect = "clickhouse"
cfg.Dsn = viper.GetString("clickhouse.dsn")
if viper.IsSet("clickhouse.cluster") {
cfg.ClickhouseClusterName = viper.GetString("clickhouse.cluster")
cfg.ClickhouseEngine = viper.GetString("clickhouse.engine")
cfg.ClickhouseSchema = viper.GetString("clickhouse.schema")
cfg.ClickhouseSharded = viper.GetBool("clickhouse.sharded")
}
return
}

Expand Down
9 changes: 9 additions & 0 deletions driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ type (
Dir string
VersionTableName string
Enabled bool

ClickhouseSchema string
ClickhouseClusterName string
ClickhouseEngine string
ClickhouseSharded bool
}

Interface interface {
Expand All @@ -57,6 +62,10 @@ func New(cfg *Config) (Interface, error) {
cfg.Dsn,
cfg.VersionTableName,
cfg.Dir,
cfg.ClickhouseSchema,
cfg.ClickhouseClusterName,
cfg.ClickhouseEngine,
cfg.ClickhouseSharded,
)
case Migrate:
return migrate.New(
Expand Down
108 changes: 104 additions & 4 deletions driver/goose/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,29 @@ package goose

import (
"database/sql"
"fmt"

"miga/utils"

orig "github.com/pressly/goose"
)

type Goose struct {
db *sql.DB
dir string
db *sql.DB
dir string
dialect string
versionTableName string
clickhouseSchema string
clickhouseClusterName string
clickhouseEngine string
clickhouseSharded bool
}

func New(dialect, dsn, tableName, dir string) (*Goose, error) {
func New(
dialect, dsn, tableName, dir string,
clickhouseSchema, clickhouseClusterName, clickhouseEngine string,
clickhouseSharded bool,
) (*Goose, error) {
err := orig.SetDialect(dialect)
if err != nil {
return nil, err
Expand All @@ -27,7 +38,20 @@ func New(dialect, dsn, tableName, dir string) (*Goose, error) {
return nil, err
}

return &Goose{db: db, dir: dir}, nil
return &Goose{
db: db,
dir: dir,
dialect: dialect,
versionTableName: tableName,
clickhouseSchema: clickhouseSchema,
clickhouseClusterName: clickhouseClusterName,
clickhouseEngine: clickhouseEngine,
clickhouseSharded: clickhouseSharded,
}, nil
}

func (g Goose) isClickhouse() bool {
return g.dialect == "clickhouse"
}

func (g Goose) Close() error {
Expand All @@ -39,33 +63,109 @@ func (g Goose) Create(name, ext string) error {
}

func (g Goose) Down() error {
g.clickhouseHackEnsureTable()
return orig.Run("down", g.db, g.dir)
}

func (g Goose) DownTo(version string) error {
g.clickhouseHackEnsureTable()
return orig.Run("down-to", g.db, g.dir, version)
}

func (g Goose) Redo() error {
g.clickhouseHackEnsureTable()
return orig.Run("redo", g.db, g.dir)
}

func (g Goose) Reset() error {
g.clickhouseHackEnsureTable()
return orig.Run("reset", g.db, g.dir)
}

func (g Goose) Status() error {
g.clickhouseHackEnsureTable()
return orig.Run("status", g.db, g.dir)
}

func (g Goose) Up() error {
g.clickhouseHackEnsureTable()
return orig.Run("up", g.db, g.dir)
}

func (g Goose) UpTo(version string) error {
g.clickhouseHackEnsureTable()
return orig.Run("up-to", g.db, g.dir, version)
}

func (g Goose) Version() error {
g.clickhouseHackEnsureTable()
return orig.Run("version", g.db, g.dir)
}

func (g Goose) clickhouseHackEnsureTable() {
if !g.isClickhouse() || len(g.clickhouseClusterName) == 0 {
return
}

var (
queries = []string{}
schemaTable = fmt.Sprintf("%s.%s", g.clickhouseSchema, g.versionTableName)
)

if g.clickhouseSharded {
queries = append(queries, fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s_shard ON CLUSTER '%s' (
version_id Int64,
is_applied UInt8,
date Date default now(),
tstamp DateTime default now()
) Engine = %s
ORDER BY tstamp
`, schemaTable, g.clickhouseClusterName, g.clickhouseEngine))

queries = append(queries, fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s ON CLUSTER '%s' AS %s_shard
ENGINE = Distributed('%s', %s, %s, rand())
`, schemaTable, g.clickhouseClusterName, schemaTable, g.clickhouseClusterName, g.clickhouseSchema, g.versionTableName))
} else {
queries = append(queries, fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s ON CLUSTER '%s' (
version_id Int64,
is_applied UInt8,
date Date default now(),
tstamp DateTime default now()
) Engine = %s
ORDER BY tstamp
`, schemaTable, g.clickhouseClusterName, g.clickhouseEngine))
}

for _, q := range queries {
if _, err := g.db.Exec(q); err != nil {
panic("Failed applly clickhouse dirty hack: " + err.Error())
}
}

var total int
err := g.db.
QueryRow(fmt.Sprintf("select count(*) from %s", schemaTable)).
Scan(&total)
if err != nil && err != sql.ErrNoRows {
panic("Failed get last applied version: " + err.Error())
} else if total > 0 {
return
}

tx, err := g.db.Begin()
if err != nil {
panic("Failed begin tx: " + err.Error())
}
defer tx.Rollback() // nolint: errcheck

if _, err := tx.Exec(fmt.Sprintf("INSERT INTO %s (version_id, is_applied) VALUES (?, ?)", schemaTable), 0, 1); err != nil {
panic("Failed insert initial version: " + err.Error())
}

if err := tx.Commit(); err != nil {
panic("Failed commit initial version: " + err.Error())
}
}

0 comments on commit 15f2d4d

Please sign in to comment.