Skip to content

Commit

Permalink
Add migration_server to migration_requests table; adjust code and sql…
Browse files Browse the repository at this point in the history
… templates accordingly; fixed issue with look-up of migration request by block_name
  • Loading branch information
vkuznet committed Jul 21, 2022
1 parent ffc79e3 commit 768e623
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 13 deletions.
51 changes: 40 additions & 11 deletions dbs/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ package dbs
import (
"bytes"
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"io"
"log"
"net/url"
"os"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -1126,6 +1128,30 @@ func updateMigrationStatus(mrec MigrationRequest, status int) error {
stm = CleanStatement(stm)
mid := mrec.MIGRATION_REQUEST_ID
retryCount := mrec.RETRY_COUNT

// check if migration server is empty when migration status is IN_PROGRESS
// we use sql.NullString as migration server info may not be present in DB
// https://medium.com/aubergine-solutions/how-i-handled-null-possible-values-from-database-rows-in-golang-521fb0ee267
var msrv sql.NullString
s := getSQL("check_migration_server")
err = tx.QueryRow(s, mid).Scan(&msrv)
if err != nil {
msg := fmt.Sprintf("unable to query statement:\n%v\nerror=%v", s, err)
log.Println(msg)
return Error(err, QueryErrorCode, "", "dbs.migrate.updateMigrationStatus")
}
migServer := msrv.String
hostname, err := os.Hostname()
if err != nil {
return Error(err, GenericErrorCode, "", "dbs.migrate.updateMigrationStatus")
}
if migServer != "" && migServer != hostname {
msg := fmt.Sprintf("migration request %d is already taken by %s", mid, migServer)
log.Println(msg)
return Error(ConcurrencyErr, MigrationErrorCode, msg, "dbs.migrate.updateMigrationStatus")

}

// if our status is FAILED we check for retry count
// if retry count is less then threshold we increment retry count and set status to IN PROGRESS
// this will allow migration service to pick up failed migration request
Expand All @@ -1142,11 +1168,12 @@ func updateMigrationStatus(mrec MigrationRequest, status int) error {
var args []interface{}
args = append(args, status)
args = append(args, retryCount)
args = append(args, hostname)
args = append(args, mid)
utils.PrintSQL(stm, args, "execute")
utils.PrintSQL(stm, args, "execute update migration status query")
}

_, err = tx.Exec(stm, status, retryCount, mid)
_, err = tx.Exec(stm, status, retryCount, hostname, mid)
if err != nil {
log.Printf("unable to execute %s, error %v", stm, err)
return Error(err, UpdateErrorCode, "", "dbs.migrate.updateMigrationStatus")
Expand Down Expand Up @@ -1266,25 +1293,27 @@ func (a *API) StatusMigration() error {
if _, e := getSingleValue(a.Params, "migration_url"); e == nil {
conds, args = AddParam("migration_url", "MR.MIGRATION_URL", a.Params, conds, args)
}
if _, e := getSingleValue(a.Params, "dataset"); e == nil {
conds, args = AddParam("dataset", "MR.DATASET", a.Params, conds, args)
}
if _, e := getSingleValue(a.Params, "block_name"); e == nil {
conds, args = AddParam("block_name", "MR.BLOCK_NAME", a.Params, conds, args)
}
if _, e := getSingleValue(a.Params, "user"); e == nil {
conds, args = AddParam("user", "MR.USER", a.Params, conds, args)
tmpl["Blocks"] = true
conds, args = AddParam("block_name", "MB.MIGRATION_BLOCK_NAME", a.Params, conds, args)
}
if _, e := getSingleValue(a.Params, "create_by"); e == nil {
conds, args = AddParam("create_by", "MR.CREATE_BY", a.Params, conds, args)
}

// get SQL statement from static area
stm := getSQL("migration_requests")
stm, err := LoadTemplateSQL("migration_requests", tmpl)
if err != nil {
log.Println("unable to load migration_requests template", err)
return Error(err, LoadErrorCode, "", "dbs.migrate.StatusMigration")
}
stm = WhereClause(stm, conds)
if oldest == "true" {
stm += "ORDER BY MR.creation_date"
}

// use generic query API to fetch the results from DB
err := executeAll(a.Writer, a.Separator, stm, args...)
err = executeAll(a.Writer, a.Separator, stm, args...)
if err != nil {
return Error(err, QueryErrorCode, "", "dbs.migrate.StatusMigration")
}
Expand Down
2 changes: 2 additions & 0 deletions dbs/migration_requests.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type MigrationRequest struct {
MIGRATION_URL string `json:"migration_url" validate:"required"`
MIGRATION_INPUT string `json:"migration_input" validate:"required"`
MIGRATION_STATUS int64 `json:"migration_status" validate:"gte=0,lte=10"`
MIGRATION_SERVER string `json:"migration_server"`
CREATE_BY string `json:"create_by" validate:"required"`
CREATION_DATE int64 `json:"creation_date" validate:"required,number,gt=0"`
LAST_MODIFIED_BY string `json:"last_modified_by" validate:"required"`
Expand Down Expand Up @@ -57,6 +58,7 @@ func (r *MigrationRequest) Insert(tx *sql.Tx) error {
r.MIGRATION_URL,
r.MIGRATION_INPUT,
r.MIGRATION_STATUS,
r.MIGRATION_SERVER,
r.CREATION_DATE,
r.CREATE_BY,
r.LAST_MODIFICATION_DATE,
Expand Down
1 change: 1 addition & 0 deletions static/schema/DDL/create-mysql-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,7 @@ CREATE TABLE `MIGRATION_REQUESTS` (
`MIGRATION_URL` VARCHAR(300),
`MIGRATION_INPUT` VARCHAR(700),
`MIGRATION_STATUS` INTEGER,
`MIGRATION_SERVER` VARCHAR2(100),
`CREATION_DATE` INTEGER,
`CREATE_BY` VARCHAR(100),
`LAST_MODIFICATION_DATE` INTEGER,
Expand Down
1 change: 1 addition & 0 deletions static/schema/DDL/create-oracle-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,7 @@ CREATE TABLE MIGRATION_REQUESTS (
MIGRATION_URL VARCHAR2(300),
MIGRATION_INPUT VARCHAR2(700),
MIGRATION_STATUS INTEGER,
MIGRATION_SERVER VARCHAR2(100),
CREATION_DATE INTEGER,
CREATE_BY VARCHAR2(500),
LAST_MODIFICATION_DATE INTEGER,
Expand Down
1 change: 1 addition & 0 deletions static/schema/sqlite-schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@
"MIGRATION_URL" VARCHAR2(300),
"MIGRATION_INPUT" VARCHAR2(700),
"MIGRATION_STATUS" INTEGER,
"MIGRATION_SERVER" VARCHAR2(100),
"CREATION_DATE" INTEGER,
"CREATE_BY" VARCHAR2(500),
"LAST_MODIFICATION_DATE" INTEGER,
Expand Down
2 changes: 2 additions & 0 deletions static/sql/insert_migration_requests.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ INSERT INTO {{.Owner}}.MIGRATION_REQUESTS
MIGRATION_URL,
MIGRATION_INPUT,
MIGRATION_STATUS,
MIGRATION_SERVER,
CREATION_DATE,
CREATE_BY,
LAST_MODIFICATION_DATE,
Expand All @@ -13,6 +14,7 @@ VALUES
:migration_url,
:migration_input,
:migration_status,
:migration_server,
:creation_date,
:create_by,
:last_modification_date,
Expand Down
6 changes: 4 additions & 2 deletions static/sql/update_migration_status.sql
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
UPDATE {{.Owner}}.MIGRATION_REQUESTS
SET MIGRATION_STATUS = :status, RETRY_COUNT = :retry_count
WHERE MIGRATION_REQUEST_ID = :migration_request_id
SET MIGRATION_STATUS = :status,
RETRY_COUNT = :retry_count,
MIGRATION_SERVER = :migration_server
WHERE MIGRATION_REQUEST_ID = :migration_request_id

0 comments on commit 768e623

Please sign in to comment.