diff --git a/dbs/migrate.go b/dbs/migrate.go index 57c5f206..7b35d48d 100644 --- a/dbs/migrate.go +++ b/dbs/migrate.go @@ -24,6 +24,7 @@ import ( "log" "net/url" "os" + "os/exec" "sort" "strconv" "strings" @@ -1130,6 +1131,42 @@ func (a *API) processMigration(ch chan<- bool, status *int64, mrec MigrationRequ log.Printf("updated migration request %v with status %v", mid, *status) } +// helper function to check host of migation request +func migrationHost(mid int64) (string, error) { + // check if migration server is empty when migration status is IN_PROGRESS + // we use sql.NullString as migration server info may not be present in DB + // https://medium.com/aubergine-solutions/how-i-handled-null-possible-values-from-database-rows-in-golang-521fb0ee267 + var msrv sql.NullString + stm := getSQL("check_migration_server") + err := DB.QueryRow(stm, mid).Scan(&msrv) + if err != nil { + msg := fmt.Sprintf("unable to query statement:\n%v\nerror=%v", stm, err) + log.Println(msg) + return "", Error(err, QueryErrorCode, "", "dbs.migrate.updateMigrationStatus") + } + migServer := msrv.String + hostname, err := os.Hostname() + if err != nil { + return "", Error(err, GenericErrorCode, "", "dbs.migrate.updateMigrationStatus") + } + + // on k8s if migServer differ from hostname we need to check if such pod exists + out, err := exec.Command("kubectl", "get", "pods", "-n", "dbs").Output() + // we migration server among the pods + if err == nil && strings.Contains(string(out), migServer) { + // compare migration server to hostname + if migServer != "" && migServer != hostname { + msg := fmt.Sprintf("migration request %d is already taken by %s", mid, migServer) + log.Println(msg) + return "", Error(ConcurrencyErr, MigrationErrorCode, msg, "dbs.migrate.updateMigrationStatus") + + } + } + // otherwise we do not have a pod with name of migration server + // or we do not run migration server in multi-node environment + return hostname, nil +} + // updateMigrationStatus updates migration status and increment retry count of // migration record. func updateMigrationStatus(mrec MigrationRequest, status int) error { @@ -1142,39 +1179,23 @@ func updateMigrationStatus(mrec MigrationRequest, status int) error { return Error(err, LoadErrorCode, "", "dbs.migrate.updateMigrationStatus") } - // start transaction - tx, err := DB.Begin() - if err != nil { - log.Println("unable to get DB transaction", err) - return Error(err, TransactionErrorCode, "", "dbs.migrate.updateMigrationStatus") - } - defer tx.Rollback() stm = CleanStatement(stm) mid := mrec.MIGRATION_REQUEST_ID retryCount := mrec.RETRY_COUNT - // check if migration server is empty when migration status is IN_PROGRESS - // we use sql.NullString as migration server info may not be present in DB - // https://medium.com/aubergine-solutions/how-i-handled-null-possible-values-from-database-rows-in-golang-521fb0ee267 - var msrv sql.NullString - s := getSQL("check_migration_server") - err = tx.QueryRow(s, mid).Scan(&msrv) + // get migration host or fail + hostname, err := migrationHost(mid) if err != nil { - msg := fmt.Sprintf("unable to query statement:\n%v\nerror=%v", s, err) - log.Println(msg) - return Error(err, QueryErrorCode, "", "dbs.migrate.updateMigrationStatus") + return err } - migServer := msrv.String - hostname, err := os.Hostname() - if err != nil { - return Error(err, GenericErrorCode, "", "dbs.migrate.updateMigrationStatus") - } - if migServer != "" && migServer != hostname { - msg := fmt.Sprintf("migration request %d is already taken by %s", mid, migServer) - log.Println(msg) - return Error(ConcurrencyErr, MigrationErrorCode, msg, "dbs.migrate.updateMigrationStatus") + // start transaction + tx, err := DB.Begin() + if err != nil { + log.Println("unable to get DB transaction", err) + return Error(err, TransactionErrorCode, "", "dbs.migrate.updateMigrationStatus") } + defer tx.Rollback() // if our status is FAILED we check for retry count // if retry count is less then threshold we increment retry count and set status to IN PROGRESS @@ -1280,7 +1301,8 @@ func (a *API) RemoveMigration() error { return nil } msg := fmt.Sprintf( - "Invalid request, requestID=%v is not found", rec.MIGRATION_REQUEST_ID) + "unable to remove %v as it is either does not exists or its status is not failed", + rec.MIGRATION_REQUEST_ID) return Error(InvalidRequestErr, InvalidRequestErrorCode, msg, "dbs.migrate.RemoveMigration") } @@ -1317,6 +1339,9 @@ func (a *API) StatusMigration() error { if _, e := getSingleValue(a.Params, "migration_url"); e == nil { conds, args = AddParam("migration_url", "MR.MIGRATION_URL", a.Params, conds, args) } + if _, e := getSingleValue(a.Params, "migration_status"); e == nil { + conds, args = AddParam("migration_status", "MR.MIGRATION_STATUS", a.Params, conds, args) + } if _, e := getSingleValue(a.Params, "block_name"); e == nil { tmpl["Blocks"] = true conds, args = AddParam("block_name", "MB.MIGRATION_BLOCK_NAME", a.Params, conds, args)