From 70114ad687660d8fe4d6cb50bf4bf890ee0c66f5 Mon Sep 17 00:00:00 2001 From: Rohit Nayak <57520317+rohit-nayak-ps@users.noreply.github.com> Date: Fri, 14 Feb 2025 16:53:10 +0100 Subject: [PATCH] Multi-tenant workflow SwitchWrites: Don't add denied tables on cancelMigration() (#17782) Signed-off-by: Rohit Nayak --- go/vt/topotools/mirror_rules.go | 2 +- go/vt/vtctl/workflow/server.go | 1 - go/vt/vtctl/workflow/traffic_switcher.go | 25 +++++++++++++------ .../tabletmanager/vreplication/controller.go | 3 ++- .../tabletmanager/vreplication/vplayer.go | 5 ++-- 5 files changed, 24 insertions(+), 12 deletions(-) diff --git a/go/vt/topotools/mirror_rules.go b/go/vt/topotools/mirror_rules.go index 3076a12b394..067b0544dab 100644 --- a/go/vt/topotools/mirror_rules.go +++ b/go/vt/topotools/mirror_rules.go @@ -55,7 +55,7 @@ func GetMirrorRules(ctx context.Context, ts *topo.Server) (map[string]map[string // SaveMirrorRules converts a mapping of fromTable=>[]toTables into a // vschemapb.MirrorRules protobuf message and saves it in the topology. func SaveMirrorRules(ctx context.Context, ts *topo.Server, rules map[string]map[string]float32) error { - log.Infof("Saving mirror rules %v\n", rules) + log.V(2).Infof("Saving mirror rules %v\n", rules) rrs := &vschemapb.MirrorRules{Rules: make([]*vschemapb.MirrorRule, 0)} for fromTable, mrs := range rules { diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 4a1fb2e461a..efa7d6448b0 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -2977,7 +2977,6 @@ func (s *Server) switchWrites(ctx context.Context, req *vtctldatapb.WorkflowSwit time.Sleep(lockTablesCycleDelay) } } - // Get the source positions now that writes are stopped, the streams were stopped (e.g. // intra-keyspace materializations that write on the source), and we know for certain // that any in progress writes are done. diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 4463e30b711..7e1df4a07ce 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -1154,9 +1154,9 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat if ctx.Err() != nil { // Even though we create a new context later on we still record any context error: // for forensics in case of failures. - ts.Logger().Infof("In Cancel migration: original context invalid: %s", ctx.Err()) + ts.Logger().Infof("cancelMigration (%v): original context invalid: %s", ts.WorkflowName(), ctx.Err()) } - + ts.Logger().Infof("cancelMigration (%v): starting", ts.WorkflowName()) // We create a new context while canceling the migration, so that we are independent of the original // context being canceled prior to or during the cancel operation itself. // First we create a copy of the parent context, so that we maintain the locks, but which cannot be @@ -1168,20 +1168,27 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat defer cmCancel() if ts.MigrationType() == binlogdatapb.MigrationType_TABLES { - err = ts.switchDeniedTables(cmCtx, true /* revert */) + if !ts.IsMultiTenantMigration() { + ts.Logger().Infof("cancelMigration (%v): adding denied tables to target", ts.WorkflowName()) + err = ts.switchDeniedTables(cmCtx, true /* revert */) + } else { + ts.Logger().Infof("cancelMigration (%v): multi-tenant, not adding denied tables to target", ts.WorkflowName()) + } } else { + ts.Logger().Infof("cancelMigration (%v): allowing writes on source shards", ts.WorkflowName()) err = ts.changeShardsAccess(cmCtx, ts.SourceKeyspaceName(), ts.SourceShards(), allowWrites) } if err != nil { cancelErrs.RecordError(fmt.Errorf("could not revert denied tables / shard access: %v", err)) - ts.Logger().Errorf("Cancel migration failed: could not revert denied tables / shard access: %v", err) + ts.Logger().Errorf("Cancel migration failed (%v): could not revert denied tables / shard access: %v", ts.WorkflowName(), err) } if err := sm.CancelStreamMigrations(cmCtx); err != nil { cancelErrs.RecordError(fmt.Errorf("could not cancel stream migrations: %v", err)) - ts.Logger().Errorf("Cancel migration failed: could not cancel stream migrations: %v", err) + ts.Logger().Errorf("Cancel migration failed (%v): could not cancel stream migrations: %v", ts.WorkflowName(), err) } + ts.Logger().Infof("cancelMigration (%v): restarting vreplication workflows", ts.WorkflowName()) err = ts.ForAllTargets(func(target *MigrationTarget) error { query := fmt.Sprintf("update _vt.vreplication set state='Running', message='' where db_name=%s and workflow=%s", encodeString(target.GetPrimary().DbName()), encodeString(ts.WorkflowName())) @@ -1190,17 +1197,21 @@ func (ts *trafficSwitcher) cancelMigration(ctx context.Context, sm *StreamMigrat }) if err != nil { cancelErrs.RecordError(fmt.Errorf("could not restart vreplication: %v", err)) - ts.Logger().Errorf("Cancel migration failed: could not restart vreplication: %v", err) + ts.Logger().Errorf("Cancel migration failed (%v): could not restart vreplication: %v", ts.WorkflowName(), err) } + ts.Logger().Infof("cancelMigration (%v): deleting reverse vreplication workflows", ts.WorkflowName()) if err := ts.deleteReverseVReplication(cmCtx); err != nil { cancelErrs.RecordError(fmt.Errorf("could not delete reverse vreplication streams: %v", err)) - ts.Logger().Errorf("Cancel migration failed: could not delete reverse vreplication streams: %v", err) + ts.Logger().Errorf("Cancel migration failed (%v): could not delete reverse vreplication streams: %v", ts.WorkflowName(), err) } if cancelErrs.HasErrors() { + ts.Logger().Errorf("Cancel migration failed for %v, manual cleanup work may be necessary: %v", ts.WorkflowName(), cancelErrs.AggrError(vterrors.Aggregate)) return vterrors.Wrap(cancelErrs.AggrError(vterrors.Aggregate), "cancel migration failed, manual cleanup work may be necessary") } + + ts.Logger().Infof("cancelMigration (%v): completed", ts.WorkflowName()) return nil } diff --git a/go/vt/vttablet/tabletmanager/vreplication/controller.go b/go/vt/vttablet/tabletmanager/vreplication/controller.go index 7067211ff10..113cb2314a0 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/controller.go +++ b/go/vt/vttablet/tabletmanager/vreplication/controller.go @@ -115,7 +115,6 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor } blpStats.WorkflowConfig = workflowConfig.String() ct.sourceTablet.Store(&topodatapb.TabletAlias{}) - log.Infof("creating controller with cell: %v, tabletTypes: %v, and params: %v", cell, tabletTypesStr, params) id, err := strconv.ParseInt(params["id"], 10, 32) if err != nil { @@ -123,6 +122,8 @@ func newController(ctx context.Context, params map[string]string, dbClientFactor } ct.id = int32(id) ct.workflow = params["workflow"] + log.Infof("creating controller with id: %v, name: %v, cell: %v, tabletTypes: %v", ct.id, ct.workflow, cell, tabletTypesStr) + ct.lastWorkflowError = vterrors.NewLastError(fmt.Sprintf("VReplication controller %d for workflow %q", ct.id, ct.workflow), workflowConfig.MaxTimeToRetryError) state := params["state"] diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go index 31ab895934c..ae7a19db4dd 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer.go @@ -125,7 +125,8 @@ func newVPlayer(vr *vreplicator, settings binlogplayer.VRSettings, copyState map settings.StopPos = pausePos saveStop = false } - log.Infof("Starting VReplication player id: %v, startPos: %v, stop: %v, filter: %+v", + log.Infof("Starting VReplication player id: %v, name: %v, startPos: %v, stop: %v", vr.id, vr.WorkflowName, settings.StartPos, settings.StopPos) + log.V(2).Infof("Starting VReplication player id: %v, startPos: %v, stop: %v, filter: %+v", vr.id, settings.StartPos, settings.StopPos, vr.source.Filter) queryFunc := func(ctx context.Context, sql string) (*sqltypes.Result, error) { return vr.dbClient.ExecuteWithRetry(ctx, sql) @@ -266,7 +267,7 @@ func (vp *vplayer) updateFKCheck(ctx context.Context, flags2 uint32) error { // one. This allows for the apply thread to catch up more quickly if // a backlog builds up. func (vp *vplayer) fetchAndApply(ctx context.Context) (err error) { - log.Infof("Starting VReplication player id: %v, startPos: %v, stop: %v, filter: %v", vp.vr.id, vp.startPos, vp.stopPos, vp.vr.source) + log.Infof("Starting VReplication player id: %v, name: %v, startPos: %v, stop: %v", vp.vr.id, vp.vr.WorkflowName, vp.startPos, vp.stopPos) ctx, cancel := context.WithCancel(ctx) defer cancel()