Skip to content

Commit

Permalink
Better support partial MoveTables
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Feb 17, 2025
1 parent 1d55e96 commit d2002fa
Show file tree
Hide file tree
Showing 4 changed files with 92 additions and 24 deletions.
95 changes: 78 additions & 17 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,20 +33,23 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tidwall/gjson"
"google.golang.org/protobuf/encoding/protojson"

"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/utils"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topotools"
"vitess.io/vitess/go/vt/vtgate/vtgateconn"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
"vitess.io/vitess/go/vt/vttablet/tabletserver/vstreamer"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtgatepb "vitess.io/vitess/go/vt/proto/vtgate"
throttlebase "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base"
)
Expand Down Expand Up @@ -525,28 +528,65 @@ func TestMoveTablesIgnoreSourceKeyspace(t *testing.T) {
defaultCellName := "zone1"
workflow := "mtnosource"
ksWorkflow := fmt.Sprintf("%s.%s", targetKs, workflow)
shard := "0"
targetShard := fmt.Sprintf("%s:%s", targetKs, shard)
tables := []string{"product", "customer", "merchant", "orders"}
defaultShard := "0"
tables := []string{"customer"}
var defaultCell *Cell

run := func(t *testing.T, switchTraffic bool, args []string) {
unshardedVSchema := `
{
"tables": {
"customer": {}
}
}`
shardedVSchema := `
{
"sharded": true,
"vindexes": {
"xxhash": {
"type": "xxhash"
}
},
"tables": {
"customer": {
"column_vindexes": [
{
"column": "customer_id",
"name": "xxhash"
}
]
}
}
}`

run := func(t *testing.T, sourceShards, targetShards string, createArgs, completeArgs []string, switchTraffic bool) {
vc = NewVitessCluster(t, nil)
require.NotNil(t, vc)
defaultCell = vc.Cells[defaultCellName]
t.Cleanup(vc.TearDown)

_, err := vc.AddKeyspace(t, []*Cell{defaultCell}, sourceKs, shard, initialProductVSchema, initialProductSchema, 0, 0, 100, nil)
sourceVSchema, targetVSchema := unshardedVSchema, unshardedVSchema
if len(sourceShards) > 0 {
sourceVSchema = shardedVSchema
}
if len(targetShards) > 0 {
targetVSchema = shardedVSchema
}
targetShardNames := strings.Split(targetShards, ",")

_, err := vc.AddKeyspace(t, []*Cell{defaultCell}, sourceKs, sourceShards, sourceVSchema, customerTable, 0, 0, 100, nil)
require.NoError(t, err)
_, err = vc.AddKeyspace(t, []*Cell{defaultCell}, targetKs, shard, "", "", 0, 0, 200, nil)
_, err = vc.AddKeyspace(t, []*Cell{defaultCell}, targetKs, targetShards, targetVSchema, "", 0, 0, 500, nil)
require.NoError(t, err)
verifyClusterHealth(t, vc)

insertInitialData(t)
if len(sourceShards) == 0 {
insertInitialData(t)
}

moveTablesAction(t, "Create", defaultCellName, workflow, sourceKs, targetKs, strings.Join(tables, ","))
moveTablesAction(t, "Create", defaultCellName, workflow, sourceKs, targetKs, strings.Join(tables, ","), createArgs...)
// Wait until we get through the copy phase...
catchup(t, vc.getPrimaryTablet(t, targetKs, shard), workflow, "MoveTables")
for _, targetShard := range targetShardNames {
catchup(t, vc.getPrimaryTablet(t, targetKs, targetShard), workflow, "MoveTables")
}

if switchTraffic {
switchReads(t, "MoveTables", defaultCellName, ksWorkflow, false)
Expand All @@ -561,34 +601,55 @@ func TestMoveTablesIgnoreSourceKeyspace(t *testing.T) {
vc.DeleteKeyspace(t, sourceKs)

// The command should fail.
out, err := vc.VtctldClient.ExecuteCommandWithOutput(args...)
out, err := vc.VtctldClient.ExecuteCommandWithOutput(completeArgs...)
require.Error(t, err, out)

// But it should succeed if we ignore the source keyspace.
confirmRoutingRulesExist(t)
args = append(args, "--ignore-source-keyspace")
out, err = vc.VtctldClient.ExecuteCommandWithOutput(args...)
completeArgs = append(completeArgs, "--ignore-source-keyspace")
out, err = vc.VtctldClient.ExecuteCommandWithOutput(completeArgs...)
require.NoError(t, err, out)
confirmNoRoutingRules(t)
for _, table := range tables {
validateTableInDenyList(t, vc, targetShard, table, false)
for _, targetShard := range targetShardNames {
tksShard := fmt.Sprintf("%s/%s", targetKs, targetShard)
validateTableInDenyList(t, vc, tksShard, table, false)
}
}

// Confirm that we cleaned up the proper shard routing rules.
out, err = vc.VtctldClient.ExecuteCommandWithOutput("GetShardRoutingRules")
require.NoError(t, err, out)
var srr vschemapb.ShardRoutingRules
err = protojson.Unmarshal([]byte(out), &srr)
require.NoError(t, err)
srrMap := topotools.GetShardRoutingRulesMap(&srr)
for _, shard := range targetShardNames {
ksShard := fmt.Sprintf("%s.%s", targetKs, shard)
require.NotEqual(t, srrMap[ksShard], targetKs)
}

confirmNoWorkflows(t, targetKs)
}

t.Run("Workflow Delete", func(t *testing.T) {
args := []string{"Workflow", "--keyspace=" + targetKs, "delete", "--workflow=" + workflow}
run(t, false, args)
run(t, defaultShard, defaultShard, nil, args, false)
})

t.Run("MoveTables Cancel", func(t *testing.T) {
args := []string{"MoveTables", "--workflow=" + workflow, "--target-keyspace=" + targetKs, "cancel"}
run(t, false, args)
run(t, defaultShard, defaultShard, nil, args, false)
})
t.Run("MoveTables Partial Cancel", func(t *testing.T) {
createArgs := []string{"--source-shards", "-80"}
args := []string{"MoveTables", "--workflow=" + workflow, "--target-keyspace=" + targetKs, "cancel"}
run(t, "-80,80-", "-80,80-", createArgs, args, true)
})

t.Run("MoveTables Complete", func(t *testing.T) {
args := []string{"MoveTables", "--workflow=" + workflow, "--target-keyspace=" + targetKs, "complete"}
run(t, true, args)
run(t, defaultShard, defaultShard, nil, args, true)
})
}

Expand Down
16 changes: 12 additions & 4 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2012,7 +2012,7 @@ func (s *Server) dropTargets(ctx context.Context, ts *trafficSwitcher, keepData,
}
}
}
if err := s.dropRelatedArtifacts(ctx, keepRoutingRules, sw); err != nil {
if err := s.dropRelatedArtifacts(ctx, keepRoutingRules, sw, opts...); err != nil {
return nil, err
}
if err := ts.TopoServer().RebuildSrvVSchema(ctx, nil); err != nil {
Expand Down Expand Up @@ -2175,6 +2175,11 @@ func (s *Server) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workf
}
}
if wopts.ignoreSourceKeyspace {
// We cannot build the source schema then.
// And since we cannot compare the source and target shards we rely on
// the workflow sub type, which is set when creating a partial MoveTables
// workflow, for the determination.
ts.isPartialMigration = ts.workflowSubType == binlogdatapb.VReplicationWorkflowSubType_Partial
return ts, nil
}
vs, err := sourceTopo.GetVSchema(ctx, ts.sourceKeyspace)
Expand All @@ -2198,9 +2203,12 @@ func (s *Server) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workf
return ts, nil
}

func (s *Server) dropRelatedArtifacts(ctx context.Context, keepRoutingRules bool, sw iswitcher) error {
if err := sw.dropSourceReverseVReplicationStreams(ctx); err != nil {
return err
func (s *Server) dropRelatedArtifacts(ctx context.Context, keepRoutingRules bool, sw iswitcher, opts ...WorkflowActionOption) error {
wopts := processWorkflowActionOptions(opts)
if !wopts.ignoreSourceKeyspace {
if err := sw.dropSourceReverseVReplicationStreams(ctx); err != nil {
return err
}
}
if !keepRoutingRules {
if err := sw.deleteRoutingRules(ctx); err != nil {
Expand Down
1 change: 0 additions & 1 deletion go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,6 @@ func (ts *trafficSwitcher) isPartialMoveTables(sourceShards, targetShards []stri
if err != nil {
return false, err
}

if key.KeyRangeIsComplete(skr) || key.KeyRangeIsComplete(tkr) || len(sourceShards) != len(targetShards) {
return false, nil
}
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vtctl/workflow/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,11 +460,11 @@ func getSourceAndTargetKeyRanges(sourceShards, targetShards []string) (*topodata
sort.Strings(targetShards)
getFullKeyRange := func(shards []string) (*topodatapb.KeyRange, error) {
// Expect sorted shards.
kr1, err := getKeyRange(sourceShards[0])
kr1, err := getKeyRange(shards[0])
if err != nil {
return nil, err
}
kr2, err := getKeyRange(sourceShards[len(sourceShards)-1])
kr2, err := getKeyRange(shards[len(shards)-1])
if err != nil {
return nil, err
}
Expand Down

0 comments on commit d2002fa

Please sign in to comment.