diff --git a/dbs/migrate.go b/dbs/migrate.go index 2e2fd87f..b536286c 100644 --- a/dbs/migrate.go +++ b/dbs/migrate.go @@ -16,12 +16,14 @@ package dbs import ( "bytes" "context" + "database/sql" "encoding/json" "errors" "fmt" "io" "log" "net/url" + "os" "sort" "strconv" "strings" @@ -1126,6 +1128,30 @@ func updateMigrationStatus(mrec MigrationRequest, status int) error { stm = CleanStatement(stm) mid := mrec.MIGRATION_REQUEST_ID retryCount := mrec.RETRY_COUNT + + // check if migration server is empty when migration status is IN_PROGRESS + // we use sql.NullString as migration server info may not be present in DB + // https://medium.com/aubergine-solutions/how-i-handled-null-possible-values-from-database-rows-in-golang-521fb0ee267 + var msrv sql.NullString + s := getSQL("check_migration_server") + err = tx.QueryRow(s, mid).Scan(&msrv) + if err != nil { + msg := fmt.Sprintf("unable to query statement:\n%v\nerror=%v", s, err) + log.Println(msg) + return Error(err, QueryErrorCode, "", "dbs.migrate.updateMigrationStatus") + } + migServer := msrv.String + hostname, err := os.Hostname() + if err != nil { + return Error(err, GenericErrorCode, "", "dbs.migrate.updateMigrationStatus") + } + if migServer != "" && migServer != hostname { + msg := fmt.Sprintf("migration request %d is already taken by %s", mid, migServer) + log.Println(msg) + return Error(ConcurrencyErr, MigrationErrorCode, msg, "dbs.migrate.updateMigrationStatus") + + } + // if our status is FAILED we check for retry count // if retry count is less then threshold we increment retry count and set status to IN PROGRESS // this will allow migration service to pick up failed migration request @@ -1142,11 +1168,12 @@ func updateMigrationStatus(mrec MigrationRequest, status int) error { var args []interface{} args = append(args, status) args = append(args, retryCount) + args = append(args, hostname) args = append(args, mid) - utils.PrintSQL(stm, args, "execute") + utils.PrintSQL(stm, args, "execute update migration status query") } - _, err = tx.Exec(stm, status, retryCount, mid) + _, err = tx.Exec(stm, status, retryCount, hostname, mid) if err != nil { log.Printf("unable to execute %s, error %v", stm, err) return Error(err, UpdateErrorCode, "", "dbs.migrate.updateMigrationStatus") @@ -1266,25 +1293,27 @@ func (a *API) StatusMigration() error { if _, e := getSingleValue(a.Params, "migration_url"); e == nil { conds, args = AddParam("migration_url", "MR.MIGRATION_URL", a.Params, conds, args) } - if _, e := getSingleValue(a.Params, "dataset"); e == nil { - conds, args = AddParam("dataset", "MR.DATASET", a.Params, conds, args) - } if _, e := getSingleValue(a.Params, "block_name"); e == nil { - conds, args = AddParam("block_name", "MR.BLOCK_NAME", a.Params, conds, args) - } - if _, e := getSingleValue(a.Params, "user"); e == nil { - conds, args = AddParam("user", "MR.USER", a.Params, conds, args) + tmpl["Blocks"] = true + conds, args = AddParam("block_name", "MB.MIGRATION_BLOCK_NAME", a.Params, conds, args) } if _, e := getSingleValue(a.Params, "create_by"); e == nil { conds, args = AddParam("create_by", "MR.CREATE_BY", a.Params, conds, args) } // get SQL statement from static area - stm := getSQL("migration_requests") + stm, err := LoadTemplateSQL("migration_requests", tmpl) + if err != nil { + log.Println("unable to load migration_requests template", err) + return Error(err, LoadErrorCode, "", "dbs.migrate.StatusMigration") + } stm = WhereClause(stm, conds) + if oldest == "true" { + stm += "ORDER BY MR.creation_date" + } // use generic query API to fetch the results from DB - err := executeAll(a.Writer, a.Separator, stm, args...) + err = executeAll(a.Writer, a.Separator, stm, args...) if err != nil { return Error(err, QueryErrorCode, "", "dbs.migrate.StatusMigration") } diff --git a/dbs/migration_requests.go b/dbs/migration_requests.go index 57d8c554..87d87f03 100644 --- a/dbs/migration_requests.go +++ b/dbs/migration_requests.go @@ -17,6 +17,7 @@ type MigrationRequest struct { MIGRATION_URL string `json:"migration_url" validate:"required"` MIGRATION_INPUT string `json:"migration_input" validate:"required"` MIGRATION_STATUS int64 `json:"migration_status" validate:"gte=0,lte=10"` + MIGRATION_SERVER string `json:"migration_server"` CREATE_BY string `json:"create_by" validate:"required"` CREATION_DATE int64 `json:"creation_date" validate:"required,number,gt=0"` LAST_MODIFIED_BY string `json:"last_modified_by" validate:"required"` @@ -57,6 +58,7 @@ func (r *MigrationRequest) Insert(tx *sql.Tx) error { r.MIGRATION_URL, r.MIGRATION_INPUT, r.MIGRATION_STATUS, + r.MIGRATION_SERVER, r.CREATION_DATE, r.CREATE_BY, r.LAST_MODIFICATION_DATE, diff --git a/static/schema/DDL/create-mysql-schema.sql b/static/schema/DDL/create-mysql-schema.sql index 0ec94b5a..f0139818 100644 --- a/static/schema/DDL/create-mysql-schema.sql +++ b/static/schema/DDL/create-mysql-schema.sql @@ -542,6 +542,7 @@ CREATE TABLE `MIGRATION_REQUESTS` ( `MIGRATION_URL` VARCHAR(300), `MIGRATION_INPUT` VARCHAR(700), `MIGRATION_STATUS` INTEGER, + `MIGRATION_SERVER` VARCHAR2(100), `CREATION_DATE` INTEGER, `CREATE_BY` VARCHAR(100), `LAST_MODIFICATION_DATE` INTEGER, diff --git a/static/schema/DDL/create-oracle-schema.sql b/static/schema/DDL/create-oracle-schema.sql index 7dd8e100..ced95991 100644 --- a/static/schema/DDL/create-oracle-schema.sql +++ b/static/schema/DDL/create-oracle-schema.sql @@ -579,6 +579,7 @@ CREATE TABLE MIGRATION_REQUESTS ( MIGRATION_URL VARCHAR2(300), MIGRATION_INPUT VARCHAR2(700), MIGRATION_STATUS INTEGER, + MIGRATION_SERVER VARCHAR2(100), CREATION_DATE INTEGER, CREATE_BY VARCHAR2(500), LAST_MODIFICATION_DATE INTEGER, diff --git a/static/schema/sqlite-schema.sql b/static/schema/sqlite-schema.sql index f08402cf..9299b0ce 100644 --- a/static/schema/sqlite-schema.sql +++ b/static/schema/sqlite-schema.sql @@ -234,6 +234,7 @@ "MIGRATION_URL" VARCHAR2(300), "MIGRATION_INPUT" VARCHAR2(700), "MIGRATION_STATUS" INTEGER, + "MIGRATION_SERVER" VARCHAR2(100), "CREATION_DATE" INTEGER, "CREATE_BY" VARCHAR2(500), "LAST_MODIFICATION_DATE" INTEGER, diff --git a/static/sql/insert_migration_requests.sql b/static/sql/insert_migration_requests.sql index 59caaf52..773c3c21 100644 --- a/static/sql/insert_migration_requests.sql +++ b/static/sql/insert_migration_requests.sql @@ -3,6 +3,7 @@ INSERT INTO {{.Owner}}.MIGRATION_REQUESTS MIGRATION_URL, MIGRATION_INPUT, MIGRATION_STATUS, + MIGRATION_SERVER, CREATION_DATE, CREATE_BY, LAST_MODIFICATION_DATE, @@ -13,6 +14,7 @@ VALUES :migration_url, :migration_input, :migration_status, + :migration_server, :creation_date, :create_by, :last_modification_date, diff --git a/static/sql/update_migration_status.sql b/static/sql/update_migration_status.sql index 6324e634..b12c93da 100644 --- a/static/sql/update_migration_status.sql +++ b/static/sql/update_migration_status.sql @@ -1,3 +1,5 @@ UPDATE {{.Owner}}.MIGRATION_REQUESTS - SET MIGRATION_STATUS = :status, RETRY_COUNT = :retry_count - WHERE MIGRATION_REQUEST_ID = :migration_request_id + SET MIGRATION_STATUS = :status, + RETRY_COUNT = :retry_count, + MIGRATION_SERVER = :migration_server +WHERE MIGRATION_REQUEST_ID = :migration_request_id