From 3b2461269936e3d703a421b24c32c43439b872b6 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 11 Feb 2025 15:18:58 -0500 Subject: [PATCH] Allow complete when keyspace is completely gone Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/server.go | 104 ++++++++++++++++------- go/vt/vtctl/workflow/server_options.go | 35 ++++++++ go/vt/vtctl/workflow/server_test.go | 8 ++ go/vt/vtctl/workflow/traffic_switcher.go | 2 +- 4 files changed, 117 insertions(+), 32 deletions(-) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index 8112cd386d9..b835b2d1b61 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -428,8 +428,12 @@ func (s *Server) GetWorkflowState(ctx context.Context, targetKeyspace, workflowN return s.getWorkflowState(ctx, targetKeyspace, workflowName) } -func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowName string) (*trafficSwitcher, *State, error) { - ts, err := s.buildTrafficSwitcher(ctx, targetKeyspace, workflowName) +func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowName string, opts ...WorkflowOption) (*trafficSwitcher, *State, error) { + var options workflowOptions + for _, o := range opts { + o.apply(&options) + } + ts, err := s.buildTrafficSwitcher(ctx, targetKeyspace, workflowName, opts...) if err != nil { s.Logger().Errorf("buildTrafficSwitcher failed: %v", err) return nil, nil, err @@ -442,6 +446,13 @@ func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowN IsPartialMigration: ts.isPartialMigration, } + if ts.workflowType == binlogdatapb.VReplicationWorkflowType_MoveTables && options.ignoreSourceKeyspace { + if err := s.updateTablesTrafficState(ctx, state, ts.tables); err != nil { + return nil, nil, err + } + return ts, state, nil + } + if ts.workflowType == binlogdatapb.VReplicationWorkflowType_CreateLookupIndex { // Nothing left to do. return ts, state, nil @@ -471,7 +482,6 @@ func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowN if len(ts.Tables()) == 0 { return nil, nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "no tables in workflow %s.%s", targetKeyspace, workflowName) } - table := ts.Tables()[0] if ts.IsMultiTenantMigration() { // Deduce which traffic has been switched by looking at the current keyspace routing rules. @@ -497,29 +507,9 @@ func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowN } } } else { - state.RdonlyCellsSwitched, state.RdonlyCellsNotSwitched, err = s.GetCellsWithTableReadsSwitched(ctx, sourceKeyspace, targetKeyspace, table, topodatapb.TabletType_RDONLY) - if err != nil { - return nil, nil, err - } - - state.ReplicaCellsSwitched, state.ReplicaCellsNotSwitched, err = s.GetCellsWithTableReadsSwitched(ctx, sourceKeyspace, targetKeyspace, table, topodatapb.TabletType_REPLICA) - if err != nil { + if err := s.updateTablesTrafficState(ctx, state, ts.tables); err != nil { return nil, nil, err } - globalRules, err := topotools.GetRoutingRules(ctx, ts.TopoServer()) - if err != nil { - return nil, nil, err - } - for _, table := range ts.Tables() { - // If a rule for the primary tablet type exists for any table and points to the target keyspace, - // then writes have been switched. - ruleKey := fmt.Sprintf("%s.%s", sourceKeyspace, table) - rr := globalRules[ruleKey] - if len(rr) > 0 && rr[0] != ruleKey { - state.WritesSwitched = true - break - } - } } } else { state.WorkflowType = TypeReshard @@ -1076,7 +1066,7 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl err = vterrors.Wrapf(err, "failed to cleanup denied table entries: %v", cerr) } } - if cerr := s.dropArtifacts(ctx, false, false, &switcher{s: s, ts: ts}); cerr != nil { + if cerr := s.dropArtifacts(ctx, false, &switcher{s: s, ts: ts}); cerr != nil { err = vterrors.Wrapf(err, "failed to cleanup workflow artifacts: %v", cerr) } if origVSchema == nil { // There's no previous version to restore @@ -1249,7 +1239,11 @@ func (s *Server) MoveTablesComplete(ctx context.Context, req *vtctldatapb.MoveTa span, ctx := trace.NewSpan(ctx, "workflow.Server.MoveTablesComplete") defer span.Finish() - ts, state, err := s.getWorkflowState(ctx, req.GetTargetKeyspace(), req.GetWorkflow()) + opts := []WorkflowOption{} + if req.IgnoreSourceKeyspace { + opts = append(opts, IgnoreSourceKeyspace()) + } + ts, state, err := s.getWorkflowState(ctx, req.GetTargetKeyspace(), req.GetWorkflow(), opts...) if err != nil { return nil, err } @@ -1291,7 +1285,7 @@ func (s *Server) MoveTablesComplete(ctx context.Context, req *vtctldatapb.MoveTa } if req.IgnoreSourceKeyspace { - if err := s.dropArtifacts(ctx, req.KeepRoutingRules, true, &switcher{s: s, ts: ts}); err != nil { + if err := s.dropArtifacts(ctx, req.KeepRoutingRules, &switcher{s: s, ts: ts}, opts...); err != nil { return nil, vterrors.Wrapf(err, "failed to cleanup workflow artifacts") } if err := ts.TopoServer().RebuildSrvVSchema(ctx, nil); err != nil { @@ -2053,7 +2047,11 @@ func (s *Server) deleteTenantData(ctx context.Context, ts *trafficSwitcher, batc }) } -func (s *Server) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workflowName string) (*trafficSwitcher, error) { +func (s *Server) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workflowName string, opts ...WorkflowOption) (*trafficSwitcher, error) { + var options workflowOptions + for _, o := range opts { + o.apply(&options) + } tgtInfo, err := BuildTargets(ctx, s.ts, s.tmc, targetKeyspace, workflowName) if err != nil { s.Logger().Infof("Error building targets: %s", err) @@ -2120,6 +2118,9 @@ func (s *Server) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workf } } + if options.ignoreSourceKeyspace { + continue + } if _, ok := ts.sources[bls.Shard]; ok { continue } @@ -2152,6 +2153,10 @@ func (s *Server) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workf } } } + if ts.workflowType == binlogdatapb.VReplicationWorkflowType_MoveTables && options.ignoreSourceKeyspace { + log.Errorf("DEBUG: Ignoring source keyspace for MoveTables workflow with source Keyspace %s", ts.sourceKeyspace) + return ts, nil + } vs, err := sourceTopo.GetVSchema(ctx, ts.sourceKeyspace) if err != nil { return nil, err @@ -2247,7 +2252,7 @@ func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalTy } } } - if err := s.dropArtifacts(ctx, keepRoutingRules, false, sw); err != nil { + if err := s.dropArtifacts(ctx, keepRoutingRules, sw); err != nil { return nil, err } if err := ts.TopoServer().RebuildSrvVSchema(ctx, nil); err != nil { @@ -2257,8 +2262,12 @@ func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalTy return sw.logs(), nil } -func (s *Server) dropArtifacts(ctx context.Context, keepRoutingRules, ignoreSourceKeyspace bool, sw iswitcher) error { - if !ignoreSourceKeyspace { +func (s *Server) dropArtifacts(ctx context.Context, keepRoutingRules bool, sw iswitcher, opts ...WorkflowOption) error { + var options workflowOptions + for _, o := range opts { + o.apply(&options) + } + if !options.ignoreSourceKeyspace { if err := sw.dropSourceReverseVReplicationStreams(ctx); err != nil { return err } @@ -3456,6 +3465,39 @@ func (s *Server) validateShardsHaveVReplicationPermissions(ctx context.Context, return nil } +func (s *Server) updateTablesTrafficState(ctx context.Context, state *State, tables []string) error { + // We assume a consistent state, so only choose routing rule for one table. + if len(tables) == 0 { + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "no tables in workflow %s.%s", state.TargetKeyspace, state.Workflow) + } + + var err error + state.RdonlyCellsSwitched, state.RdonlyCellsNotSwitched, err = s.GetCellsWithTableReadsSwitched(ctx, state.SourceKeyspace, state.TargetKeyspace, tables[0], topodatapb.TabletType_RDONLY) + if err != nil { + return err + } + state.ReplicaCellsSwitched, state.ReplicaCellsNotSwitched, err = s.GetCellsWithTableReadsSwitched(ctx, state.SourceKeyspace, state.TargetKeyspace, tables[0], topodatapb.TabletType_REPLICA) + if err != nil { + return err + } + globalRules, err := topotools.GetRoutingRules(ctx, s.ts) + if err != nil { + return err + } + for _, table := range tables { + // If a rule for the primary tablet type exists for any table and points to the target keyspace, + // then writes have been switched. + ruleKey := fmt.Sprintf("%s.%s", state.SourceKeyspace, table) + rr := globalRules[ruleKey] + if len(rr) > 0 && rr[0] != ruleKey { + state.WritesSwitched = true + break + } + } + log.Errorf("DEBUG: state: %+v", state) + return err +} + func (s *Server) Logger() logutil.Logger { if s.options.logger == nil { s.options.logger = logutil.NewConsoleLogger() // Use default system logger diff --git a/go/vt/vtctl/workflow/server_options.go b/go/vt/vtctl/workflow/server_options.go index ed6fdf284a9..bcf1d911f87 100644 --- a/go/vt/vtctl/workflow/server_options.go +++ b/go/vt/vtctl/workflow/server_options.go @@ -54,3 +54,38 @@ func WithLogger(l logutil.Logger) ServerOption { o.logger = l }) } + +// workflowOptions configure a workflow's optional behavior when +// performing actions in the worfklow server. +// workflowOptions are set by the WorkflowOption values passed +// to the server functions. +type workflowOptions struct { + ignoreSourceKeyspace bool +} + +// WorkflowOption alters how we perform the certain workflow operations. +type WorkflowOption interface { + apply(*workflowOptions) +} + +// funcWorkflowOption wraps a function that modifies workflowOptions into +// an implementation of the WorkflowOption interface. +type funcWorkflowOption struct { + f func(*workflowOptions) +} + +func (fwo *funcWorkflowOption) apply(wo *workflowOptions) { + fwo.f(wo) +} + +func newFuncWorkflowOption(f func(*workflowOptions)) *funcWorkflowOption { + return &funcWorkflowOption{ + f: f, + } +} + +func IgnoreSourceKeyspace() WorkflowOption { + return newFuncWorkflowOption(func(o *workflowOptions) { + o.ignoreSourceKeyspace = true + }) +} diff --git a/go/vt/vtctl/workflow/server_test.go b/go/vt/vtctl/workflow/server_test.go index c87ad1c3d13..a836df13123 100644 --- a/go/vt/vtctl/workflow/server_test.go +++ b/go/vt/vtctl/workflow/server_test.go @@ -387,6 +387,14 @@ func TestMoveTablesComplete(t *testing.T) { Workflow: workflowName, IgnoreSourceKeyspace: true, }, + preFunc: func(t *testing.T, env *testEnv) { + err := env.ts.DeleteKeyspace(ctx, sourceKeyspaceName) + require.NoError(t, err) + }, + postFunc: func(t *testing.T, env *testEnv) { + err := env.ts.CreateKeyspace(ctx, sourceKeyspaceName, &topodatapb.Keyspace{}) + require.NoError(t, err) + }, expectedTargetQueries: []*queryResult{ { query: fmt.Sprintf("delete from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'", diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 4463e30b711..970e5a9734a 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -268,7 +268,7 @@ func (ts *trafficSwitcher) ExternalTopo() *topo.Server { ret func (ts *trafficSwitcher) MigrationType() binlogdatapb.MigrationType { return ts.migrationType } func (ts *trafficSwitcher) IsPartialMigration() bool { return ts.isPartialMigration } func (ts *trafficSwitcher) ReverseWorkflowName() string { return ts.reverseWorkflow } -func (ts *trafficSwitcher) SourceKeyspaceName() string { return ts.sourceKSSchema.Keyspace.Name } +func (ts *trafficSwitcher) SourceKeyspaceName() string { return ts.sourceKeyspace } func (ts *trafficSwitcher) SourceKeyspaceSchema() *vindexes.KeyspaceSchema { return ts.sourceKSSchema } func (ts *trafficSwitcher) Sources() map[string]*MigrationSource { return ts.sources } func (ts *trafficSwitcher) Tables() []string { return ts.tables }