diff --git a/go/test/endtoend/vreplication/cluster_test.go b/go/test/endtoend/vreplication/cluster_test.go index e6181cad31b..e8fb1e0eee3 100644 --- a/go/test/endtoend/vreplication/cluster_test.go +++ b/go/test/endtoend/vreplication/cluster_test.go @@ -35,9 +35,11 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/test/endtoend/throttler" + "vitess.io/vitess/go/vt/concurrency" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/mysqlctl" "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/vterrors" "vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext" vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" @@ -795,24 +797,7 @@ func (vc *VitessCluster) teardown() { var wg sync.WaitGroup for _, keyspace := range keyspaces { - for _, shard := range keyspace.Shards { - for _, tablet := range shard.Tablets { - wg.Add(1) - go func(tablet2 *Tablet) { - defer wg.Done() - if tablet2.DbServer != nil && tablet2.DbServer.TabletUID > 0 { - if err := tablet2.DbServer.Stop(); err != nil { - log.Infof("Error stopping mysql process: %s", err.Error()) - } - } - if err := tablet2.Vttablet.TearDown(); err != nil { - log.Infof("Error stopping vttablet %s %s", tablet2.Name, err.Error()) - } else { - log.Infof("Successfully stopped vttablet %s", tablet2.Name) - } - }(tablet) - } - } + _ = vc.TearDownKeyspace(keyspace) } wg.Wait() if err := vc.Vtctld.TearDown(); err != nil { @@ -836,6 +821,40 @@ func (vc *VitessCluster) teardown() { } } +func (vc *VitessCluster) TearDownKeyspace(ks *Keyspace) error { + wg := sync.WaitGroup{} + errs := concurrency.AllErrorRecorder{} + for _, shard := range ks.Shards { + for _, tablet := range shard.Tablets { + wg.Add(1) + go func(tablet2 *Tablet) { + defer wg.Done() + if tablet2.DbServer != nil && tablet2.DbServer.TabletUID > 0 { + if err := tablet2.DbServer.Stop(); err != nil { + log.Infof("Error stopping mysql process: %s", err.Error()) + errs.RecordError(err) + } + } + if err := tablet2.Vttablet.TearDown(); err != nil { + log.Infof("Error stopping vttablet %s %s", tablet2.Name, err.Error()) + errs.RecordError(err) + } else { + log.Infof("Successfully stopped vttablet %s", tablet2.Name) + } + }(tablet) + } + } + return errs.AggrError(vterrors.Aggregate) +} + +func (vc *VitessCluster) DeleteKeyspace(t testing.TB, ksName string) { + out, err := vc.VtctldClient.ExecuteCommandWithOutput("DeleteKeyspace", ksName, "--recursive") + if err != nil { + log.Error("DeleteKeyspace failed with error: , output: %s", err, out) + } + require.NoError(t, err) +} + // TearDown brings down a cluster, deleting processes, removing topo keys func (vc *VitessCluster) TearDown() { if debugMode { diff --git a/go/test/endtoend/vreplication/helper_test.go b/go/test/endtoend/vreplication/helper_test.go index b3c55becc4a..f58e4ee49fc 100644 --- a/go/test/endtoend/vreplication/helper_test.go +++ b/go/test/endtoend/vreplication/helper_test.go @@ -1047,6 +1047,13 @@ func confirmKeyspacesRoutedTo(t *testing.T, keyspace string, routedKeyspace, tab } } +// confirmNoWorkflows confirms that there are no active workflows in the given keyspace. +func confirmNoWorkflows(t *testing.T, keyspace string) { + output, err := vc.VtctldClient.ExecuteCommandWithOutput("GetWorkflows", keyspace, "--compact", "--include-logs=false") + require.NoError(t, err) + require.True(t, isEmptyWorkflowShowOutput(output)) +} + // getVReplicationConfig returns the vreplication config for one random workflow for a given tablet. Currently, this is // used when there is only one workflow, so we are using this simple method to get the config. func getVReplicationConfig(t *testing.T, tab *cluster.VttabletProcess) map[string]string { diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index f12e8edc759..e306485eab8 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -518,6 +518,62 @@ func TestVStreamFlushBinlog(t *testing.T) { require.Equal(t, flushCount, int64(1), "VStreamerFlushedBinlogs should still be 1") } +// TestMoveTablesCompleteIgnoreSourceKeyspace confirms that we are able to +// complete a MoveTables operation even if the source keyspace is gone. +func TestMoveTablesCompleteIgnoreSourceKeyspace(t *testing.T) { + defaultCellName := "zone1" + workflow := "mtcompnosource" + ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow) + shard := "0" + targetShard := fmt.Sprintf("%s:%s", targetKs, shard) + vc = NewVitessCluster(t, nil) + require.NotNil(t, vc) + defer vc.TearDown() + defaultCell := vc.Cells[defaultCellName] + + // Keep the cluster processes minimal (no rdonly and no replica tablets) + // to deal with CI resource constraints. + // This also makes it easier to confirm the behavior as we know exactly + // what tablets will be involved. + _, err := vc.AddKeyspace(t, []*Cell{defaultCell}, sourceKs, shard, initialProductVSchema, initialProductSchema, 0, 0, 100, nil) + require.NoError(t, err) + _, err = vc.AddKeyspace(t, []*Cell{defaultCell}, targetKs, shard, "", "", 0, 0, 200, nil) + require.NoError(t, err) + verifyClusterHealth(t, vc) + + sourceTab = vc.getPrimaryTablet(t, sourceKs, shard) + + insertInitialData(t) + + tables := []string{"product", "customer", "merchant", "orders"} + moveTablesAction(t, "Create", defaultCellName, workflow, sourceKs, targetKs, strings.Join(tables, ",")) + // Wait until we get through the copy phase... + catchup(t, vc.getPrimaryTablet(t, targetKs, shard), workflow, "MoveTables") + + switchReads(t, "MoveTables", defaultCellName, ksWorkflow, false) + switchWrites(t, "MoveTables", ksWorkflow, false) + + require.NotZero(t, len(vc.Cells[defaultCellName].Keyspaces)) + require.NotNil(t, vc.Cells[defaultCellName].Keyspaces[sourceKs]) + err = vc.TearDownKeyspace(vc.Cells[defaultCellName].Keyspaces[sourceKs]) + require.NoError(t, err) + vc.DeleteKeyspace(t, sourceKs) + + args := []string{"MoveTables", "--workflow=" + workflow, "--target-keyspace=" + targetKs, "complete"} + out, err := vc.VtctldClient.ExecuteCommandWithOutput(args...) + require.Error(t, err, out) + + confirmRoutingRulesExist(t) + args = append(args, "--ignore-source-keyspace") + out, err = vc.VtctldClient.ExecuteCommandWithOutput(args...) + require.NoError(t, err, out) + confirmNoRoutingRules(t) + for _, table := range tables { + validateTableInDenyList(t, vc, targetShard, table, false) + } + confirmNoWorkflows(t, targetKs) +} + func testVStreamCellFlag(t *testing.T) { vgtid := &binlogdatapb.VGtid{ ShardGtids: []*binlogdatapb.ShardGtid{{ diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 35986e7e500..acdce2da3c0 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -2159,8 +2159,7 @@ func (s *Server) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workf } } } - if ts.workflowType == binlogdatapb.VReplicationWorkflowType_MoveTables && wopts.ignoreSourceKeyspace { - log.Errorf("DEBUG: Ignoring source keyspace for MoveTables workflow with source Keyspace %s", ts.sourceKeyspace) + if wopts.ignoreSourceKeyspace { return ts, nil } vs, err := sourceTopo.GetVSchema(ctx, ts.sourceKeyspace) diff --git a/test/config.json b/test/config.json index 17cdb019e97..6368b4e9d0c 100644 --- a/test/config.json +++ b/test/config.json @@ -1101,6 +1101,15 @@ "RetryMax": 0, "Tags": [] }, + "vreplication_complete_no_source_keyspace": { + "File": "unused.go", + "Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestMoveTablesCompleteIgnoreSourceKeyspace"], + "Command": [], + "Manual": false, + "Shard": "vreplication_cellalias", + "RetryMax": 0, + "Tags": [] + }, "vreplication_multi_tenant": { "File": "unused.go", "Args": ["vitess.io/vitess/go/test/endtoend/vreplication","-run", "MultiTenant"],