Skip to content

Commit

Permalink
Add migrationHost function; allow to check status by by using migrati…
Browse files Browse the repository at this point in the history
…on_status
  • Loading branch information
vkuznet committed Jul 22, 2022
1 parent 0cdd057 commit 8b856a2
Showing 1 changed file with 51 additions and 26 deletions.
77 changes: 51 additions & 26 deletions dbs/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"log"
"net/url"
"os"
"os/exec"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -1130,6 +1131,42 @@ func (a *API) processMigration(ch chan<- bool, status *int64, mrec MigrationRequ
log.Printf("updated migration request %v with status %v", mid, *status)
}

// helper function to check host of migation request
func migrationHost(mid int64) (string, error) {
// check if migration server is empty when migration status is IN_PROGRESS
// we use sql.NullString as migration server info may not be present in DB
// https://medium.com/aubergine-solutions/how-i-handled-null-possible-values-from-database-rows-in-golang-521fb0ee267
var msrv sql.NullString
stm := getSQL("check_migration_server")
err := DB.QueryRow(stm, mid).Scan(&msrv)
if err != nil {
msg := fmt.Sprintf("unable to query statement:\n%v\nerror=%v", stm, err)
log.Println(msg)
return "", Error(err, QueryErrorCode, "", "dbs.migrate.updateMigrationStatus")
}
migServer := msrv.String
hostname, err := os.Hostname()
if err != nil {
return "", Error(err, GenericErrorCode, "", "dbs.migrate.updateMigrationStatus")
}

// on k8s if migServer differ from hostname we need to check if such pod exists
out, err := exec.Command("kubectl", "get", "pods", "-n", "dbs").Output()
// we migration server among the pods
if err == nil && strings.Contains(string(out), migServer) {
// compare migration server to hostname
if migServer != "" && migServer != hostname {
msg := fmt.Sprintf("migration request %d is already taken by %s", mid, migServer)
log.Println(msg)
return "", Error(ConcurrencyErr, MigrationErrorCode, msg, "dbs.migrate.updateMigrationStatus")

}
}
// otherwise we do not have a pod with name of migration server
// or we do not run migration server in multi-node environment
return hostname, nil
}

// updateMigrationStatus updates migration status and increment retry count of
// migration record.
func updateMigrationStatus(mrec MigrationRequest, status int) error {
Expand All @@ -1142,39 +1179,23 @@ func updateMigrationStatus(mrec MigrationRequest, status int) error {
return Error(err, LoadErrorCode, "", "dbs.migrate.updateMigrationStatus")
}

// start transaction
tx, err := DB.Begin()
if err != nil {
log.Println("unable to get DB transaction", err)
return Error(err, TransactionErrorCode, "", "dbs.migrate.updateMigrationStatus")
}
defer tx.Rollback()
stm = CleanStatement(stm)
mid := mrec.MIGRATION_REQUEST_ID
retryCount := mrec.RETRY_COUNT

// check if migration server is empty when migration status is IN_PROGRESS
// we use sql.NullString as migration server info may not be present in DB
// https://medium.com/aubergine-solutions/how-i-handled-null-possible-values-from-database-rows-in-golang-521fb0ee267
var msrv sql.NullString
s := getSQL("check_migration_server")
err = tx.QueryRow(s, mid).Scan(&msrv)
// get migration host or fail
hostname, err := migrationHost(mid)
if err != nil {
msg := fmt.Sprintf("unable to query statement:\n%v\nerror=%v", s, err)
log.Println(msg)
return Error(err, QueryErrorCode, "", "dbs.migrate.updateMigrationStatus")
return err
}
migServer := msrv.String
hostname, err := os.Hostname()
if err != nil {
return Error(err, GenericErrorCode, "", "dbs.migrate.updateMigrationStatus")
}
if migServer != "" && migServer != hostname {
msg := fmt.Sprintf("migration request %d is already taken by %s", mid, migServer)
log.Println(msg)
return Error(ConcurrencyErr, MigrationErrorCode, msg, "dbs.migrate.updateMigrationStatus")

// start transaction
tx, err := DB.Begin()
if err != nil {
log.Println("unable to get DB transaction", err)
return Error(err, TransactionErrorCode, "", "dbs.migrate.updateMigrationStatus")
}
defer tx.Rollback()

// if our status is FAILED we check for retry count
// if retry count is less then threshold we increment retry count and set status to IN PROGRESS
Expand Down Expand Up @@ -1280,7 +1301,8 @@ func (a *API) RemoveMigration() error {
return nil
}
msg := fmt.Sprintf(
"Invalid request, requestID=%v is not found", rec.MIGRATION_REQUEST_ID)
"unable to remove %v as it is either does not exists or its status is not failed",
rec.MIGRATION_REQUEST_ID)
return Error(InvalidRequestErr, InvalidRequestErrorCode, msg, "dbs.migrate.RemoveMigration")
}

Expand Down Expand Up @@ -1317,6 +1339,9 @@ func (a *API) StatusMigration() error {
if _, e := getSingleValue(a.Params, "migration_url"); e == nil {
conds, args = AddParam("migration_url", "MR.MIGRATION_URL", a.Params, conds, args)
}
if _, e := getSingleValue(a.Params, "migration_status"); e == nil {
conds, args = AddParam("migration_status", "MR.MIGRATION_STATUS", a.Params, conds, args)
}
if _, e := getSingleValue(a.Params, "block_name"); e == nil {
tmpl["Blocks"] = true
conds, args = AddParam("block_name", "MB.MIGRATION_BLOCK_NAME", a.Params, conds, args)
Expand Down

0 comments on commit 8b856a2

Please sign in to comment.