Skip to content

Commit

Permalink
[release-19.0] Multi-tenant workflow SwitchWrites: Don't add denied t…
Browse files Browse the repository at this point in the history
…ables on cancelMigration() (#17782) (#17795)

Signed-off-by: Rohit Nayak <rohit@planetscale.com>
Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com>
Co-authored-by: Rohit Nayak <rohit@planetscale.com>
  • Loading branch information
vitess-bot[bot] and rohit-nayak-ps authored Feb 17, 2025
1 parent df3bd30 commit 4def0fc
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 10 deletions.
1 change: 0 additions & 1 deletion go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3271,7 +3271,6 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit
time.Sleep(lockTablesCycleDelay)
}
}

ts.Logger().Infof("Waiting for streams to catchup")
if err := sw.waitForCatchup(ctx, timeout); err != nil {
if cerr := sw.cancelMigration(ctx, sm); cerr != nil {
Expand Down
19 changes: 13 additions & 6 deletions go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -1003,9 +1003,9 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat
if ctx.Err() != nil {
// Even though we create a new context later on we still record any context error:
// for forensics in case of failures.
ts.Logger().Infof("In Cancel migration: original context invalid: %s", ctx.Err())
ts.Logger().Infof("cancelMigration (%v): original context invalid: %s", ts.WorkflowName(), ctx.Err())
}

ts.Logger().Infof("cancelMigration (%v): starting", ts.WorkflowName())
// We create a new context while canceling the migration, so that we are independent of the original
// context being canceled prior to or during the cancel operation itself.
// First we create a copy of the parent context, so that we maintain the locks, but which cannot be
Expand All @@ -1017,20 +1017,23 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat
defer cmCancel()

if ts.MigrationType() == binlogdatapb.MigrationType_TABLES {
ts.Logger().Infof("cancelMigration (%v): allowing writes on source tables", ts.WorkflowName())
err = ts.changeTableSourceWrites(cmCtx, allowWrites)
} else {
ts.Logger().Infof("cancelMigration (%v): allowing writes on source shards", ts.WorkflowName())
err = ts.changeShardsAccess(cmCtx, ts.SourceKeyspaceName(), ts.SourceShards(), allowWrites)
}
if err != nil {
cancelErrs.RecordError(fmt.Errorf("could not revert denied tables / shard access: %v", err))
ts.Logger().Errorf("Cancel migration failed: could not revert denied tables / shard access: %v", err)
ts.Logger().Errorf("Cancel migration failed (%v): could not revert denied tables / shard access: %v", ts.WorkflowName(), err)
}

if err := sm.CancelStreamMigrations(cmCtx); err != nil {
cancelErrs.RecordError(fmt.Errorf("could not cancel stream migrations: %v", err))
ts.Logger().Errorf("Cancel migration failed: could not cancel stream migrations: %v", err)
ts.Logger().Errorf("Cancel migration failed (%v): could not cancel stream migrations: %v", ts.WorkflowName(), err)
}

ts.Logger().Infof("cancelMigration (%v): restarting vreplication workflows", ts.WorkflowName())
err = ts.ForAllTargets(func(target *MigrationTarget) error {
query := fmt.Sprintf("update _vt.vreplication set state='Running', message='' where db_name=%s and workflow=%s",
encodeString(target.GetPrimary().DbName()), encodeString(ts.WorkflowName()))
Expand All @@ -1039,17 +1042,21 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat
})
if err != nil {
cancelErrs.RecordError(fmt.Errorf("could not restart vreplication: %v", err))
ts.Logger().Errorf("Cancel migration failed: could not restart vreplication: %v", err)
ts.Logger().Errorf("Cancel migration failed (%v): could not restart vreplication: %v", ts.WorkflowName(), err)
}

ts.Logger().Infof("cancelMigration (%v): deleting reverse vreplication workflows", ts.WorkflowName())
if err := ts.deleteReverseVReplication(cmCtx); err != nil {
cancelErrs.RecordError(fmt.Errorf("could not delete reverse vreplication streams: %v", err))
ts.Logger().Errorf("Cancel migration failed: could not delete reverse vreplication streams: %v", err)
ts.Logger().Errorf("Cancel migration failed (%v): could not delete reverse vreplication streams: %v", ts.WorkflowName(), err)
}

if cancelErrs.HasErrors() {
ts.Logger().Errorf("Cancel migration failed for %v, manual cleanup work may be necessary: %v", ts.WorkflowName(), cancelErrs.AggrError(vterrors.Aggregate))
return vterrors.Wrap(cancelErrs.AggrError(vterrors.Aggregate), "cancel migration failed, manual cleanup work may be necessary")
}

ts.Logger().Infof("cancelMigration (%v): completed", ts.WorkflowName())
return nil
}

Expand Down
3 changes: 2 additions & 1 deletion go/vt/vttablet/tabletmanager/vreplication/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,15 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor
source: &binlogdatapb.BinlogSource{},
}
ct.sourceTablet.Store(&topodatapb.TabletAlias{})
log.Infof("creating controller with cell: %v, tabletTypes: %v, and params: %v", cell, tabletTypesStr, params)

id, err := strconv.ParseInt(params["id"], 10, 32)
if err != nil {
return nil, err
}
ct.id = int32(id)
ct.workflow = params["workflow"]
log.Infof("creating controller with id: %v, name: %v, cell: %v, tabletTypes: %v", ct.id, ct.workflow, cell, tabletTypesStr)

ct.lastWorkflowError = vterrors.NewLastError(fmt.Sprintf("VReplication controller %d for workflow %q", ct.id, ct.workflow), maxTimeToRetryError)

state := params["state"]
Expand Down
3 changes: 1 addition & 2 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map
settings.StopPos = pausePos
saveStop = false
}

queryFunc := func(ctx context.Context, sql string) (*sqltypes.Result, error) {
return vr.dbClient.ExecuteWithRetry(ctx, sql)
}
Expand Down Expand Up @@ -251,7 +250,7 @@ func (vp *vplayer) updateFKCheck(ctx context.Context, flags2 uint32) error {
// one. This allows for the apply thread to catch up more quickly if
// a backlog builds up.
func (vp *vplayer) fetchAndApply(ctx context.Context) (err error) {
log.Infof("Starting VReplication player id: %v, startPos: %v, stop: %v, filter: %v", vp.vr.id, vp.startPos, vp.stopPos, vp.vr.source)
log.Infof("Starting VReplication player id: %v, name: %v, startPos: %v, stop: %v", vp.vr.id, vp.vr.WorkflowName, vp.startPos, vp.stopPos)

ctx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down

0 comments on commit 4def0fc

Please sign in to comment.