Skip to content

Commit

Permalink
Support ignoring the source keyspace for cancel/delete too
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Feb 12, 2025
1 parent 6440a91 commit 667e62b
Show file tree
Hide file tree
Showing 13 changed files with 388 additions and 282 deletions.
22 changes: 12 additions & 10 deletions go/cmd/vtctldclient/command/vreplication/common/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ import (
)

var CancelOptions = struct {
KeepData bool
KeepRoutingRules bool
Shards []string
DeleteBatchSize int64
KeepData bool
KeepRoutingRules bool
Shards []string
DeleteBatchSize int64
IgnoreSourceKeyspace bool
}{}

func GetCancelCommand(opts *SubCommandsOpts) *cobra.Command {
Expand All @@ -58,12 +59,13 @@ func commandCancel(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

req := &vtctldatapb.WorkflowDeleteRequest{
Keyspace: BaseOptions.TargetKeyspace,
Workflow: BaseOptions.Workflow,
KeepData: CancelOptions.KeepData,
KeepRoutingRules: CancelOptions.KeepRoutingRules,
Shards: CancelOptions.Shards,
DeleteBatchSize: CancelOptions.DeleteBatchSize,
Keyspace: BaseOptions.TargetKeyspace,
Workflow: BaseOptions.Workflow,
KeepData: CancelOptions.KeepData,
KeepRoutingRules: CancelOptions.KeepRoutingRules,
Shards: CancelOptions.Shards,
DeleteBatchSize: CancelOptions.DeleteBatchSize,
IgnoreSourceKeyspace: CancelOptions.IgnoreSourceKeyspace,
}
resp, err := GetClient().WorkflowDelete(GetCommandCtx(), req)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ func registerCommands(root *cobra.Command) {
cancel.Flags().BoolVar(&common.CancelOptions.KeepData, "keep-data", false, "Keep the partially copied table data from the MoveTables workflow in the target keyspace.")
cancel.Flags().BoolVar(&common.CancelOptions.KeepRoutingRules, "keep-routing-rules", false, "Keep the routing rules created for the MoveTables workflow.")
cancel.Flags().Int64Var(&common.CancelOptions.DeleteBatchSize, "delete-batch-size", DefaultDeleteBatchSize, "When cleaning up the migrated data in tables moved as part of a multi-tenant workflow, delete the records in batches of this size.")
cancel.Flags().BoolVar(&common.CancelOptions.IgnoreSourceKeyspace, "ignore-source-keyspace", false, "WARNING: This option should only be used when absolutely necessary. Ignore the source keyspace as the workflow is canceled and cleaned up. This allows the workflow to be canceled if the source keyspace has been deleted or is not currently available.")
common.AddShardSubsetFlag(cancel, &common.CancelOptions.Shards)
base.AddCommand(cancel)
}
Expand Down
20 changes: 11 additions & 9 deletions go/cmd/vtctldclient/command/vreplication/workflow/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ import (

var (
deleteOptions = struct {
KeepData bool
KeepRoutingRules bool
DeleteBatchSize int64
KeepData bool
KeepRoutingRules bool
DeleteBatchSize int64
IgnoreSourceKeyspace bool
}{}

// delete makes a WorkflowDelete gRPC call to a vtctld.
Expand All @@ -51,12 +52,13 @@ func commandDelete(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

req := &vtctldatapb.WorkflowDeleteRequest{
Keyspace: baseOptions.Keyspace,
Workflow: baseOptions.Workflow,
KeepData: deleteOptions.KeepData,
KeepRoutingRules: deleteOptions.KeepRoutingRules,
Shards: baseOptions.Shards,
DeleteBatchSize: deleteOptions.DeleteBatchSize,
Keyspace: baseOptions.Keyspace,
Workflow: baseOptions.Workflow,
KeepData: deleteOptions.KeepData,
KeepRoutingRules: deleteOptions.KeepRoutingRules,
Shards: baseOptions.Shards,
DeleteBatchSize: deleteOptions.DeleteBatchSize,
IgnoreSourceKeyspace: deleteOptions.IgnoreSourceKeyspace,
}
resp, err := common.GetClient().WorkflowDelete(common.GetCommandCtx(), req)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func registerCommands(root *cobra.Command) {
delete.Flags().BoolVar(&deleteOptions.KeepData, "keep-data", false, "Keep the partially copied table data from the workflow in the target keyspace.")
delete.Flags().BoolVar(&deleteOptions.KeepRoutingRules, "keep-routing-rules", false, "Keep the routing rules created for the workflow.")
delete.Flags().Int64Var(&deleteOptions.DeleteBatchSize, "delete-batch-size", movetables.DefaultDeleteBatchSize, "When cleaning up the migrated data in tables moved as part of a multi-tenant MoveTables workflow, delete the records in batches of this size.")
delete.Flags().BoolVar(&deleteOptions.IgnoreSourceKeyspace, "ignore-source-keyspace", false, "WARNING: This option should only be used when absolutely necessary. Ignore the source keyspace as the workflow is deleted and cleaned up. This allows the workflow to be deleted if the source keyspace has been deleted or is not currently available. NOTE: this is only used with MoveTables.")
common.AddShardSubsetFlag(delete, &baseOptions.Shards)
base.AddCommand(delete)

Expand Down
23 changes: 9 additions & 14 deletions go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,10 +417,7 @@ func (vc *VitessCluster) setupVtctldClient() {
// CleanupDataroot deletes the vtdataroot directory. Since we run multiple tests sequentially in a single CI test shard,
// we can run out of disk space due to all the leftover artifacts from previous tests.
func (vc *VitessCluster) CleanupDataroot(t *testing.T, recreate bool) {
// This is always set to "true" on GitHub Actions runners:
// https://docs.github.com/en/actions/learn-github-actions/variables#default-environment-variables
ci, ok := os.LookupEnv("CI")
if !ok || strings.ToLower(ci) != "true" {
if debugMode {
// Leave the directory in place to support local debugging.
return
}
Expand Down Expand Up @@ -794,12 +791,9 @@ func (vc *VitessCluster) teardown() {
}
}

var wg sync.WaitGroup

for _, keyspace := range keyspaces {
_ = vc.TearDownKeyspace(keyspace)
}
wg.Wait()
if err := vc.Vtctld.TearDown(); err != nil {
log.Infof("Error stopping Vtctld: %s", err.Error())
} else {
Expand Down Expand Up @@ -827,23 +821,24 @@ func (vc *VitessCluster) TearDownKeyspace(ks *Keyspace) error {
for _, shard := range ks.Shards {
for _, tablet := range shard.Tablets {
wg.Add(1)
go func(tablet2 *Tablet) {
go func() {
defer wg.Done()
if tablet2.DbServer != nil && tablet2.DbServer.TabletUID > 0 {
if err := tablet2.DbServer.Stop(); err != nil {
if tablet.DbServer != nil && tablet.DbServer.TabletUID > 0 {
if err := tablet.DbServer.Stop(); err != nil {
log.Infof("Error stopping mysql process: %s", err.Error())
errs.RecordError(err)
}
}
if err := tablet2.Vttablet.TearDown(); err != nil {
log.Infof("Error stopping vttablet %s %s", tablet2.Name, err.Error())
if err := tablet.Vttablet.TearDown(); err != nil {
log.Infof("Error stopping vttablet %s %s", tablet.Name, err.Error())
errs.RecordError(err)
} else {
log.Infof("Successfully stopped vttablet %s", tablet2.Name)
log.Infof("Successfully stopped vttablet %s", tablet.Name)
}
}(tablet)
}()
}
}
wg.Wait()
return errs.AggrError(vterrors.Aggregate)
}

Expand Down
103 changes: 62 additions & 41 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,57 +518,78 @@ func TestVStreamFlushBinlog(t *testing.T) {
require.Equal(t, flushCount, int64(1), "VStreamerFlushedBinlogs should still be 1")
}

// TestMoveTablesCompleteIgnoreSourceKeyspace confirms that we are able to
// complete a MoveTables operation even if the source keyspace is gone.
func TestMoveTablesCompleteIgnoreSourceKeyspace(t *testing.T) {
// TestMoveTablesIgnoreSourceKeyspace confirms that we are able to
// cancel/delete and complete a MoveTables workflow even if the source
// keyspace is gone.
func TestMoveTablesIgnoreSourceKeyspace(t *testing.T) {
defaultCellName := "zone1"
workflow := "mtcompnosource"
workflow := "mtnosource"
ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow)
shard := "0"
targetShard := fmt.Sprintf("%s:%s", targetKs, shard)
vc = NewVitessCluster(t, nil)
require.NotNil(t, vc)
defer vc.TearDown()
defaultCell := vc.Cells[defaultCellName]
tables := []string{"product", "customer", "merchant", "orders"}
var defaultCell *Cell

_, err := vc.AddKeyspace(t, []*Cell{defaultCell}, sourceKs, shard, initialProductVSchema, initialProductSchema, 0, 0, 100, nil)
require.NoError(t, err)
_, err = vc.AddKeyspace(t, []*Cell{defaultCell}, targetKs, shard, "", "", 0, 0, 200, nil)
require.NoError(t, err)
verifyClusterHealth(t, vc)
run := func(t *testing.T, switchTraffic bool, args []string) {
vc = NewVitessCluster(t, nil)
require.NotNil(t, vc)
defaultCell = vc.Cells[defaultCellName]
t.Cleanup(vc.TearDown)

insertInitialData(t)
_, err := vc.AddKeyspace(t, []*Cell{defaultCell}, sourceKs, shard, initialProductVSchema, initialProductSchema, 0, 0, 100, nil)
require.NoError(t, err)
_, err = vc.AddKeyspace(t, []*Cell{defaultCell}, targetKs, shard, "", "", 0, 0, 200, nil)
require.NoError(t, err)
verifyClusterHealth(t, vc)

tables := []string{"product", "customer", "merchant", "orders"}
moveTablesAction(t, "Create", defaultCellName, workflow, sourceKs, targetKs, strings.Join(tables, ","))
// Wait until we get through the copy phase...
catchup(t, vc.getPrimaryTablet(t, targetKs, shard), workflow, "MoveTables")
insertInitialData(t)

switchReads(t, "MoveTables", defaultCellName, ksWorkflow, false)
switchWrites(t, "MoveTables", ksWorkflow, false)
moveTablesAction(t, "Create", defaultCellName, workflow, sourceKs, targetKs, strings.Join(tables, ","))
// Wait until we get through the copy phase...
catchup(t, vc.getPrimaryTablet(t, targetKs, shard), workflow, "MoveTables")

// Decommission the source keyspace.
require.NotZero(t, len(vc.Cells[defaultCellName].Keyspaces))
require.NotNil(t, vc.Cells[defaultCellName].Keyspaces[sourceKs])
err = vc.TearDownKeyspace(vc.Cells[defaultCellName].Keyspaces[sourceKs])
require.NoError(t, err)
vc.DeleteKeyspace(t, sourceKs)

// The complete should now fail.
args := []string{"MoveTables", "--workflow=" + workflow, "--target-keyspace=" + targetKs, "complete"}
out, err := vc.VtctldClient.ExecuteCommandWithOutput(args...)
require.Error(t, err, out)

// But it should succeed if we ignore the source keyspace.
confirmRoutingRulesExist(t)
args = append(args, "--ignore-source-keyspace")
out, err = vc.VtctldClient.ExecuteCommandWithOutput(args...)
require.NoError(t, err, out)
confirmNoRoutingRules(t)
for _, table := range tables {
validateTableInDenyList(t, vc, targetShard, table, false)
if switchTraffic {
switchReads(t, "MoveTables", defaultCellName, ksWorkflow, false)
switchWrites(t, "MoveTables", ksWorkflow, false)
}

// Decommission the source keyspace.
require.NotZero(t, len(vc.Cells[defaultCellName].Keyspaces))
require.NotNil(t, vc.Cells[defaultCellName].Keyspaces[sourceKs])
err = vc.TearDownKeyspace(vc.Cells[defaultCellName].Keyspaces[sourceKs])
require.NoError(t, err)
vc.DeleteKeyspace(t, sourceKs)

// The command should fail.
out, err := vc.VtctldClient.ExecuteCommandWithOutput(args...)
require.Error(t, err, out)

// But it should succeed if we ignore the source keyspace.
confirmRoutingRulesExist(t)
args = append(args, "--ignore-source-keyspace")
out, err = vc.VtctldClient.ExecuteCommandWithOutput(args...)
require.NoError(t, err, out)
confirmNoRoutingRules(t)
for _, table := range tables {
validateTableInDenyList(t, vc, targetShard, table, false)
}
confirmNoWorkflows(t, targetKs)
}
confirmNoWorkflows(t, targetKs)

t.Run("Workflow Delete", func(t *testing.T) {
args := []string{"Workflow", "--keyspace=" + targetKs, "delete", "--workflow=" + workflow}
run(t, false, args)
})

t.Run("MoveTables Cancel", func(t *testing.T) {
args := []string{"MoveTables", "--workflow=" + workflow, "--target-keyspace=" + targetKs, "cancel"}
run(t, false, args)
})

t.Run("MoveTables Complete", func(t *testing.T) {
args := []string{"MoveTables", "--workflow=" + workflow, "--target-keyspace=" + targetKs, "complete"}
run(t, true, args)
})
}

func testVStreamCellFlag(t *testing.T) {
Expand Down
Loading

0 comments on commit 667e62b

Please sign in to comment.