Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Return lock error everywhere that LockName fails #16560

Merged
merged 8 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions go/vt/vtctl/workflow/framework_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,23 @@ func (env *testEnv) deleteTablet(tablet *topodatapb.Tablet) {
delete(env.tablets[tablet.Keyspace], int(tablet.Alias.Uid))
}

func (env *testEnv) confirmRoutingAllTablesToTarget(t *testing.T) {
t.Helper()
env.tmc.mu.Lock()
defer env.tmc.mu.Unlock()
wantRR := make(map[string][]string)
for _, sd := range env.tmc.schema {
for _, td := range sd.TableDefinitions {
for _, tt := range []string{"", "@rdonly", "@replica"} {
wantRR[td.Name+tt] = []string{fmt.Sprintf("%s.%s", env.targetKeyspace.KeyspaceName, td.Name)}
wantRR[fmt.Sprintf("%s.%s", env.sourceKeyspace.KeyspaceName, td.Name+tt)] = []string{fmt.Sprintf("%s.%s", env.targetKeyspace.KeyspaceName, td.Name)}
wantRR[fmt.Sprintf("%s.%s", env.targetKeyspace.KeyspaceName, td.Name+tt)] = []string{fmt.Sprintf("%s.%s", env.targetKeyspace.KeyspaceName, td.Name)}
}
}
}
checkRouting(t, env.ws, wantRR)
}

type testTMClient struct {
tmclient.TabletManagerClient
schema map[string]*tabletmanagerdatapb.SchemaDefinition
Expand All @@ -240,6 +257,7 @@ type testTMClient struct {

env *testEnv // For access to the env config from tmc methods.
reverse atomic.Bool // Are we reversing traffic?
frozen atomic.Bool // Are the workflows frozen?
}

func newTestTMClient(env *testEnv) *testTMClient {
Expand Down Expand Up @@ -306,6 +324,9 @@ func (tmc *testTMClient) ReadVReplicationWorkflow(ctx context.Context, tablet *t
},
},
}
if tmc.frozen.Load() {
stream.Message = Frozen
}
res.Streams = append(res.Streams, stream)
}

Expand Down Expand Up @@ -503,3 +524,90 @@ func (tmc *testTMClient) WaitForPosition(ctx context.Context, tablet *topodatapb
func (tmc *testTMClient) VReplicationWaitForPos(ctx context.Context, tablet *topodatapb.Tablet, id int32, pos string) error {
return nil
}

//
// Utility / helper functions.
//

func checkRouting(t *testing.T, ws *Server, want map[string][]string) {
t.Helper()
ctx := context.Background()
got, err := topotools.GetRoutingRules(ctx, ws.ts)
require.NoError(t, err)
require.EqualValues(t, got, want, "routing rules don't match: got: %v, want: %v", got, want)
cells, err := ws.ts.GetCellInfoNames(ctx)
require.NoError(t, err)
for _, cell := range cells {
checkCellRouting(t, ws, cell, want)
}
}

func checkCellRouting(t *testing.T, ws *Server, cell string, want map[string][]string) {
t.Helper()
ctx := context.Background()
svs, err := ws.ts.GetSrvVSchema(ctx, cell)
require.NoError(t, err)
got := make(map[string][]string, len(svs.RoutingRules.Rules))
for _, rr := range svs.RoutingRules.Rules {
got[rr.FromTable] = append(got[rr.FromTable], rr.ToTables...)
}
require.EqualValues(t, got, want, "routing rules don't match for cell %s: got: %v, want: %v", cell, got, want)
}

func checkDenyList(t *testing.T, ts *topo.Server, keyspace, shard string, want []string) {
t.Helper()
ctx := context.Background()
si, err := ts.GetShard(ctx, keyspace, shard)
require.NoError(t, err)
tc := si.GetTabletControl(topodatapb.TabletType_PRIMARY)
var got []string
if tc != nil {
got = tc.DeniedTables
}
require.EqualValues(t, got, want, "denied tables for %s/%s: got: %v, want: %v", keyspace, shard, got, want)
}

func checkServedTypes(t *testing.T, ts *topo.Server, keyspace, shard string, want int) {
t.Helper()
ctx := context.Background()
si, err := ts.GetShard(ctx, keyspace, shard)
require.NoError(t, err)
servedTypes, err := ts.GetShardServingTypes(ctx, si)
require.NoError(t, err)
require.Equal(t, want, len(servedTypes), "shard %s/%s has wrong served types: got: %v, want: %v",
keyspace, shard, len(servedTypes), want)
}

func checkCellServedTypes(t *testing.T, ts *topo.Server, keyspace, shard, cell string, want int) {
t.Helper()
ctx := context.Background()
srvKeyspace, err := ts.GetSrvKeyspace(ctx, cell, keyspace)
require.NoError(t, err)
count := 0
outer:
for _, partition := range srvKeyspace.GetPartitions() {
for _, ref := range partition.ShardReferences {
if ref.Name == shard {
count++
continue outer
}
}
}
require.Equal(t, want, count, "serving types for %s/%s in cell %s: got: %d, want: %d", keyspace, shard, cell, count, want)
}

func checkIfPrimaryServing(t *testing.T, ts *topo.Server, keyspace, shard string, want bool) {
t.Helper()
ctx := context.Background()
si, err := ts.GetShard(ctx, keyspace, shard)
require.NoError(t, err)
require.Equal(t, want, si.IsPrimaryServing, "primary serving for %s/%s: got: %v, want: %v", keyspace, shard, si.IsPrimaryServing, want)
}

func checkIfTableExistInVSchema(ctx context.Context, t *testing.T, ts *topo.Server, keyspace, table string) bool {
vschema, err := ts.GetVSchema(ctx, keyspace)
require.NoError(t, err)
require.NotNil(t, vschema)
_, ok := vschema.Tables[table]
return ok
}
63 changes: 28 additions & 35 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -2581,20 +2581,18 @@ func (s *Server) DropTargets(ctx context.Context, ts *trafficSwitcher, keepData,
lockName := fmt.Sprintf("%s/%s", ts.TargetKeyspaceName(), ts.WorkflowName())
ctx, workflowUnlock, lockErr := s.ts.LockName(ctx, lockName, "DropTargets")
if lockErr != nil {
ts.Logger().Errorf("Locking the workflow %s failed: %v", lockName, lockErr)
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s workflow", lockName), lockErr)
}
defer workflowUnlock(&err)
ctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "DropTargets")
if lockErr != nil {
ts.Logger().Errorf("Source LockKeyspace failed: %v", lockErr)
return nil, lockErr
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr)
}
defer sourceUnlock(&err)
if ts.TargetKeyspaceName() != ts.SourceKeyspaceName() {
lockCtx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "DropTargets")
if lockErr != nil {
ts.Logger().Errorf("Target LockKeyspace failed: %v", lockErr)
return nil, lockErr
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s keyspace", ts.TargetKeyspaceName()), lockErr)
}
defer targetUnlock(&err)
ctx = lockCtx
Expand Down Expand Up @@ -2779,20 +2777,18 @@ func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalTy
lockName := fmt.Sprintf("%s/%s", ts.TargetKeyspaceName(), ts.WorkflowName())
ctx, workflowUnlock, lockErr := s.ts.LockName(ctx, lockName, "DropSources")
if lockErr != nil {
ts.Logger().Errorf("Locking the workflow %s failed: %v", lockName, lockErr)
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s workflow", lockName), lockErr)
}
defer workflowUnlock(&err)
ctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "DropSources")
if lockErr != nil {
ts.Logger().Errorf("Source LockKeyspace failed: %v", lockErr)
return nil, lockErr
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr)
}
defer sourceUnlock(&err)
if ts.TargetKeyspaceName() != ts.SourceKeyspaceName() {
lockCtx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "DropSources")
if lockErr != nil {
ts.Logger().Errorf("Target LockKeyspace failed: %v", lockErr)
return nil, lockErr
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s keyspace", ts.TargetKeyspaceName()), lockErr)
}
defer targetUnlock(&err)
ctx = lockCtx
Expand Down Expand Up @@ -3020,13 +3016,12 @@ func (s *Server) finalizeMigrateWorkflow(ctx context.Context, ts *trafficSwitche
lockName := fmt.Sprintf("%s/%s", ts.TargetKeyspaceName(), ts.WorkflowName())
ctx, workflowUnlock, lockErr := s.ts.LockName(ctx, lockName, "completeMigrateWorkflow")
if lockErr != nil {
ts.Logger().Errorf("Locking the workflow %s failed: %v", lockName, lockErr)
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s workflow", lockName), lockErr)
}
defer workflowUnlock(&err)
ctx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "completeMigrateWorkflow")
if lockErr != nil {
ts.Logger().Errorf("Target LockKeyspace failed: %v", lockErr)
return nil, lockErr
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s keyspace", ts.TargetKeyspaceName()), lockErr)
}
defer targetUnlock(&err)

Expand Down Expand Up @@ -3193,16 +3188,10 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc

cellsStr := strings.Join(req.Cells, ",")

// Consistently handle errors by logging and returning them.
handleError := func(message string, err error) (*[]string, error) {
werr := vterrors.Wrapf(err, message)
ts.Logger().Error(werr)
return nil, werr
}

log.Infof("Switching reads: %s.%s tablet types: %s, cells: %s, workflow state: %s", ts.targetKeyspace, ts.workflow, roTypesToSwitchStr, cellsStr, state.String())
if !switchReplica && !switchRdonly {
return handleError("invalid tablet types", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "tablet types must be REPLICA or RDONLY: %s", roTypesToSwitchStr))
return defaultErrorHandler(ts.Logger(), "invalid tablet types",
vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "tablet types must be REPLICA or RDONLY: %s", roTypesToSwitchStr))
}
// For partial (shard-by-shard migrations) or multi-tenant migrations, traffic for all tablet types
// is expected to be switched at once. For other MoveTables migrations where we use table routing rules
Expand All @@ -3214,24 +3203,28 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc
trafficSwitchingIsAllOrNothing = true
case ts.MigrationType() == binlogdatapb.MigrationType_TABLES && ts.IsMultiTenantMigration():
if direction == DirectionBackward {
return handleError("invalid request", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "requesting reversal of read traffic for multi-tenant migrations is not supported"))
return defaultErrorHandler(ts.Logger(), "invalid request", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT,
"requesting reversal of read traffic for multi-tenant migrations is not supported"))
}
// For multi-tenant migrations, we only support switching traffic to all cells at once
allCells, err := ts.TopoServer().GetCellInfoNames(ctx)
if err != nil {
return nil, err
}
if len(req.GetCells()) != 0 && len(req.GetCells()) != len(allCells) {
return handleError("invalid request", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "requesting read traffic for multi-tenant migrations must include all cells"))
return defaultErrorHandler(ts.Logger(), "invalid request", vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT,
"requesting read traffic for multi-tenant migrations must include all cells"))
}
}

if !trafficSwitchingIsAllOrNothing {
if direction == DirectionBackward && switchReplica && len(state.ReplicaCellsSwitched) == 0 {
return handleError("invalid request", vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "requesting reversal of read traffic for REPLICAs but REPLICA reads have not been switched"))
return defaultErrorHandler(ts.Logger(), "invalid request", vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION,
"requesting reversal of read traffic for REPLICAs but REPLICA reads have not been switched"))
}
if direction == DirectionBackward && switchRdonly && len(state.RdonlyCellsSwitched) == 0 {
return handleError("invalid request", vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "requesting reversal of SwitchReads for RDONLYs but RDONLY reads have not been switched"))
return defaultErrorHandler(ts.Logger(), "invalid request", vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION,
"requesting reversal of SwitchReads for RDONLYs but RDONLY reads have not been switched"))
}
}

Expand All @@ -3253,7 +3246,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc
// If journals exist notify user and fail.
journalsExist, _, err := ts.checkJournals(ctx)
if err != nil {
return handleError(fmt.Sprintf("failed to read journal in the %s keyspace", ts.SourceKeyspaceName()), err)
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to read journal in the %s keyspace", ts.SourceKeyspaceName()), err)
}
if journalsExist {
log.Infof("Found a previous journal entry for %d", ts.id)
Expand All @@ -3266,7 +3259,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc
}

if err := ts.validate(ctx); err != nil {
return handleError("workflow validation failed", err)
return defaultErrorHandler(ts.Logger(), "workflow validation failed", err)
}

// For switching reads, locking the source keyspace is sufficient.
Expand All @@ -3282,7 +3275,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc
// For reads, locking the source keyspace is sufficient.
ctx, unlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "SwitchReads", topo.WithTTL(ksLockTTL))
if lockErr != nil {
return handleError(fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr)
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr)
}
defer unlock(&err)
confirmKeyspaceLocksHeld := func() error {
Expand All @@ -3297,7 +3290,7 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc

// Remove mirror rules for the specified tablet types.
if err := sw.mirrorTableTraffic(ctx, roTabletTypes, 0); err != nil {
return handleError(fmt.Sprintf("failed to remove mirror rules from source keyspace %s to target keyspace %s, workflow %s, for read-only tablet types",
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to remove mirror rules from source keyspace %s to target keyspace %s, workflow %s, for read-only tablet types",
ts.SourceKeyspaceName(), ts.TargetKeyspaceName(), ts.WorkflowName()), err)
}

Expand All @@ -3306,36 +3299,36 @@ func (s *Server) switchReads(ctx context.Context, req *vtctldatapb.WorkflowSwitc
case ts.IsMultiTenantMigration():
err := sw.switchKeyspaceReads(ctx, roTabletTypes)
if err != nil {
return handleError(fmt.Sprintf("failed to switch read traffic, from source keyspace %s to target keyspace %s, workflow %s",
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to switch read traffic, from source keyspace %s to target keyspace %s, workflow %s",
ts.SourceKeyspaceName(), ts.TargetKeyspaceName(), ts.WorkflowName()), err)
}
case ts.isPartialMigration:
ts.Logger().Infof("Partial migration, skipping switchTableReads as traffic is all or nothing per shard and overridden for reads AND writes in the ShardRoutingRule created when switching writes.")
default:
err := sw.switchTableReads(ctx, req.Cells, roTabletTypes, rebuildSrvVSchema, direction)
if err != nil {
return handleError("failed to switch read traffic for the tables", err)
return defaultErrorHandler(ts.Logger(), "failed to switch read traffic for the tables", err)
}
}
return sw.logs(), nil
}

if err := confirmKeyspaceLocksHeld(); err != nil {
return handleError("locks were lost", err)
return defaultErrorHandler(ts.Logger(), "locks were lost", err)
}
ts.Logger().Infof("About to switchShardReads: cells: %s, tablet types: %s, direction: %d", cellsStr, roTypesToSwitchStr, direction)
if err := sw.switchShardReads(ctx, req.Cells, roTabletTypes, direction); err != nil {
return handleError("failed to switch read traffic for the shards", err)
return defaultErrorHandler(ts.Logger(), "failed to switch read traffic for the shards", err)
}

if err := confirmKeyspaceLocksHeld(); err != nil {
return handleError("locks were lost", err)
return defaultErrorHandler(ts.Logger(), "locks were lost", err)
}
ts.Logger().Infof("switchShardReads Completed: cells: %s, tablet types: %s, direction: %d", cellsStr, roTypesToSwitchStr, direction)
if err := s.ts.ValidateSrvKeyspace(ctx, ts.targetKeyspace, cellsStr); err != nil {
err2 := vterrors.Wrapf(err, "after switching shard reads, found SrvKeyspace for %s is corrupt in cell %s",
ts.targetKeyspace, cellsStr)
return handleError("failed to validate SrvKeyspace record", err2)
return defaultErrorHandler(ts.Logger(), "failed to validate SrvKeyspace record", err2)
}
return sw.logs(), nil
}
Expand Down
Loading
Loading