Skip to content

Commit

Permalink
Add e2e test
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Feb 12, 2025
1 parent dbbd1ec commit 68a4c75
Show file tree
Hide file tree
Showing 5 changed files with 110 additions and 20 deletions.
55 changes: 37 additions & 18 deletions go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ import (
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/throttler"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"

vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
Expand Down Expand Up @@ -795,24 +797,7 @@ func (vc *VitessCluster) teardown() {
var wg sync.WaitGroup

for _, keyspace := range keyspaces {
for _, shard := range keyspace.Shards {
for _, tablet := range shard.Tablets {
wg.Add(1)
go func(tablet2 *Tablet) {
defer wg.Done()
if tablet2.DbServer != nil && tablet2.DbServer.TabletUID > 0 {
if err := tablet2.DbServer.Stop(); err != nil {
log.Infof("Error stopping mysql process: %s", err.Error())
}
}
if err := tablet2.Vttablet.TearDown(); err != nil {
log.Infof("Error stopping vttablet %s %s", tablet2.Name, err.Error())
} else {
log.Infof("Successfully stopped vttablet %s", tablet2.Name)
}
}(tablet)
}
}
_ = vc.TearDownKeyspace(keyspace)
}
wg.Wait()
if err := vc.Vtctld.TearDown(); err != nil {
Expand All @@ -836,6 +821,40 @@ func (vc *VitessCluster) teardown() {
}
}

func (vc *VitessCluster) TearDownKeyspace(ks *Keyspace) error {
wg := sync.WaitGroup{}
errs := concurrency.AllErrorRecorder{}
for _, shard := range ks.Shards {
for _, tablet := range shard.Tablets {
wg.Add(1)
go func(tablet2 *Tablet) {
defer wg.Done()
if tablet2.DbServer != nil && tablet2.DbServer.TabletUID > 0 {
if err := tablet2.DbServer.Stop(); err != nil {
log.Infof("Error stopping mysql process: %s", err.Error())
errs.RecordError(err)
}
}
if err := tablet2.Vttablet.TearDown(); err != nil {
log.Infof("Error stopping vttablet %s %s", tablet2.Name, err.Error())
errs.RecordError(err)
} else {
log.Infof("Successfully stopped vttablet %s", tablet2.Name)
}
}(tablet)
}
}
return errs.AggrError(vterrors.Aggregate)
}

func (vc *VitessCluster) DeleteKeyspace(t testing.TB, ksName string) {
out, err := vc.VtctldClient.ExecuteCommandWithOutput("DeleteKeyspace", ksName, "--recursive")
if err != nil {
log.Error("DeleteKeyspace failed with error: , output: %s", err, out)
}
require.NoError(t, err)
}

// TearDown brings down a cluster, deleting processes, removing topo keys
func (vc *VitessCluster) TearDown() {
if debugMode {
Expand Down
7 changes: 7 additions & 0 deletions go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,13 @@ func confirmKeyspacesRoutedTo(t *testing.T, keyspace string, routedKeyspace, tab
}
}

// confirmNoWorkflows confirms that there are no active workflows in the given keyspace.
func confirmNoWorkflows(t *testing.T, keyspace string) {
output, err := vc.VtctldClient.ExecuteCommandWithOutput("GetWorkflows", keyspace, "--compact", "--include-logs=false")
require.NoError(t, err)
require.True(t, isEmptyWorkflowShowOutput(output))
}

// getVReplicationConfig returns the vreplication config for one random workflow for a given tablet. Currently, this is
// used when there is only one workflow, so we are using this simple method to get the config.
func getVReplicationConfig(t *testing.T, tab *cluster.VttabletProcess) map[string]string {
Expand Down
56 changes: 56 additions & 0 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,62 @@ func TestVStreamFlushBinlog(t *testing.T) {
require.Equal(t, flushCount, int64(1), "VStreamerFlushedBinlogs should still be 1")
}

// TestMoveTablesCompleteIgnoreSourceKeyspace confirms that we are able to
// complete a MoveTables operation even if the source keyspace is gone.
func TestMoveTablesCompleteIgnoreSourceKeyspace(t *testing.T) {
defaultCellName := "zone1"
workflow := "mtcompnosource"
ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow)
shard := "0"
targetShard := fmt.Sprintf("%s:%s", targetKs, shard)
vc = NewVitessCluster(t, nil)
require.NotNil(t, vc)
defer vc.TearDown()
defaultCell := vc.Cells[defaultCellName]

// Keep the cluster processes minimal (no rdonly and no replica tablets)
// to deal with CI resource constraints.
// This also makes it easier to confirm the behavior as we know exactly
// what tablets will be involved.
_, err := vc.AddKeyspace(t, []*Cell{defaultCell}, sourceKs, shard, initialProductVSchema, initialProductSchema, 0, 0, 100, nil)
require.NoError(t, err)
_, err = vc.AddKeyspace(t, []*Cell{defaultCell}, targetKs, shard, "", "", 0, 0, 200, nil)
require.NoError(t, err)
verifyClusterHealth(t, vc)

sourceTab = vc.getPrimaryTablet(t, sourceKs, shard)

insertInitialData(t)

tables := []string{"product", "customer", "merchant", "orders"}
moveTablesAction(t, "Create", defaultCellName, workflow, sourceKs, targetKs, strings.Join(tables, ","))
// Wait until we get through the copy phase...
catchup(t, vc.getPrimaryTablet(t, targetKs, shard), workflow, "MoveTables")

switchReads(t, "MoveTables", defaultCellName, ksWorkflow, false)
switchWrites(t, "MoveTables", ksWorkflow, false)

require.NotZero(t, len(vc.Cells[defaultCellName].Keyspaces))
require.NotNil(t, vc.Cells[defaultCellName].Keyspaces[sourceKs])
err = vc.TearDownKeyspace(vc.Cells[defaultCellName].Keyspaces[sourceKs])
require.NoError(t, err)
vc.DeleteKeyspace(t, sourceKs)

args := []string{"MoveTables", "--workflow=" + workflow, "--target-keyspace=" + targetKs, "complete"}
out, err := vc.VtctldClient.ExecuteCommandWithOutput(args...)
require.Error(t, err, out)

confirmRoutingRulesExist(t)
args = append(args, "--ignore-source-keyspace")
out, err = vc.VtctldClient.ExecuteCommandWithOutput(args...)
require.NoError(t, err, out)
confirmNoRoutingRules(t)
for _, table := range tables {
validateTableInDenyList(t, vc, targetShard, table, false)
}
confirmNoWorkflows(t, targetKs)
}

func testVStreamCellFlag(t *testing.T) {
vgtid := &binlogdatapb.VGtid{
ShardGtids: []*binlogdatapb.ShardGtid{{
Expand Down
3 changes: 1 addition & 2 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2159,8 +2159,7 @@ func (s *Server) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workf
}
}
}
if ts.workflowType == binlogdatapb.VReplicationWorkflowType_MoveTables && wopts.ignoreSourceKeyspace {
log.Errorf("DEBUG: Ignoring source keyspace for MoveTables workflow with source Keyspace %s", ts.sourceKeyspace)
if wopts.ignoreSourceKeyspace {
return ts, nil
}
vs, err := sourceTopo.GetVSchema(ctx, ts.sourceKeyspace)
Expand Down
9 changes: 9 additions & 0 deletions test/config.json
Original file line number Diff line number Diff line change
Expand Up @@ -1101,6 +1101,15 @@
"RetryMax": 0,
"Tags": []
},
"vreplication_complete_no_source_keyspace": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication", "-run", "TestMoveTablesCompleteIgnoreSourceKeyspace"],
"Command": [],
"Manual": false,
"Shard": "vreplication_cellalias",
"RetryMax": 0,
"Tags": []
},
"vreplication_multi_tenant": {
"File": "unused.go",
"Args": ["vitess.io/vitess/go/test/endtoend/vreplication","-run", "MultiTenant"],
Expand Down

0 comments on commit 68a4c75

Please sign in to comment.