From dbbd1ec1878cd5520b59a7ecf9a4ee8ecbd3b42d Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 12 Feb 2025 10:31:29 -0500 Subject: [PATCH] Simplify the code changes Signed-off-by: Matt Lord --- go/vt/vtctl/workflow/server.go | 90 +++++++++----------------- go/vt/vtctl/workflow/server_options.go | 30 ++++----- go/vt/vtctl/workflow/utils.go | 8 +++ 3 files changed, 53 insertions(+), 75 deletions(-) diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index b835b2d1b61..35986e7e500 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -428,11 +428,7 @@ func (s *Server) GetWorkflowState(ctx context.Context, targetKeyspace, workflowN return s.getWorkflowState(ctx, targetKeyspace, workflowName) } -func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowName string, opts ...WorkflowOption) (*trafficSwitcher, *State, error) { - var options workflowOptions - for _, o := range opts { - o.apply(&options) - } +func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowName string, opts ...WorkflowActionOption) (*trafficSwitcher, *State, error) { ts, err := s.buildTrafficSwitcher(ctx, targetKeyspace, workflowName, opts...) if err != nil { s.Logger().Errorf("buildTrafficSwitcher failed: %v", err) @@ -446,13 +442,6 @@ func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowN IsPartialMigration: ts.isPartialMigration, } - if ts.workflowType == binlogdatapb.VReplicationWorkflowType_MoveTables && options.ignoreSourceKeyspace { - if err := s.updateTablesTrafficState(ctx, state, ts.tables); err != nil { - return nil, nil, err - } - return ts, state, nil - } - if ts.workflowType == binlogdatapb.VReplicationWorkflowType_CreateLookupIndex { // Nothing left to do. return ts, state, nil @@ -482,6 +471,7 @@ func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowN if len(ts.Tables()) == 0 { return nil, nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "no tables in workflow %s.%s", targetKeyspace, workflowName) } + table := ts.Tables()[0] if ts.IsMultiTenantMigration() { // Deduce which traffic has been switched by looking at the current keyspace routing rules. @@ -507,9 +497,28 @@ func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowN } } } else { - if err := s.updateTablesTrafficState(ctx, state, ts.tables); err != nil { + state.RdonlyCellsSwitched, state.RdonlyCellsNotSwitched, err = s.GetCellsWithTableReadsSwitched(ctx, sourceKeyspace, targetKeyspace, table, topodatapb.TabletType_RDONLY) + if err != nil { + return nil, nil, err + } + state.ReplicaCellsSwitched, state.ReplicaCellsNotSwitched, err = s.GetCellsWithTableReadsSwitched(ctx, sourceKeyspace, targetKeyspace, table, topodatapb.TabletType_REPLICA) + if err != nil { return nil, nil, err } + globalRules, err := topotools.GetRoutingRules(ctx, ts.TopoServer()) + if err != nil { + return nil, nil, err + } + for _, table := range ts.Tables() { + // If a rule for the primary tablet type exists for any table and points to the target keyspace, + // then writes have been switched. + ruleKey := fmt.Sprintf("%s.%s", sourceKeyspace, table) + rr := globalRules[ruleKey] + if len(rr) > 0 && rr[0] != ruleKey { + state.WritesSwitched = true + break + } + } } } else { state.WorkflowType = TypeReshard @@ -1239,7 +1248,7 @@ func (s *Server) MoveTablesComplete(ctx context.Context, req *vtctldatapb.MoveTa span, ctx := trace.NewSpan(ctx, "workflow.Server.MoveTablesComplete") defer span.Finish() - opts := []WorkflowOption{} + opts := []WorkflowActionOption{} if req.IgnoreSourceKeyspace { opts = append(opts, IgnoreSourceKeyspace()) } @@ -2047,11 +2056,8 @@ func (s *Server) deleteTenantData(ctx context.Context, ts *trafficSwitcher, batc }) } -func (s *Server) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workflowName string, opts ...WorkflowOption) (*trafficSwitcher, error) { - var options workflowOptions - for _, o := range opts { - o.apply(&options) - } +func (s *Server) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workflowName string, opts ...WorkflowActionOption) (*trafficSwitcher, error) { + wopts := processWorkflowActionOptions(opts) tgtInfo, err := BuildTargets(ctx, s.ts, s.tmc, targetKeyspace, workflowName) if err != nil { s.Logger().Infof("Error building targets: %s", err) @@ -2118,7 +2124,7 @@ func (s *Server) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workf } } - if options.ignoreSourceKeyspace { + if wopts.ignoreSourceKeyspace { continue } if _, ok := ts.sources[bls.Shard]; ok { @@ -2153,7 +2159,7 @@ func (s *Server) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workf } } } - if ts.workflowType == binlogdatapb.VReplicationWorkflowType_MoveTables && options.ignoreSourceKeyspace { + if ts.workflowType == binlogdatapb.VReplicationWorkflowType_MoveTables && wopts.ignoreSourceKeyspace { log.Errorf("DEBUG: Ignoring source keyspace for MoveTables workflow with source Keyspace %s", ts.sourceKeyspace) return ts, nil } @@ -2262,12 +2268,9 @@ func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalTy return sw.logs(), nil } -func (s *Server) dropArtifacts(ctx context.Context, keepRoutingRules bool, sw iswitcher, opts ...WorkflowOption) error { - var options workflowOptions - for _, o := range opts { - o.apply(&options) - } - if !options.ignoreSourceKeyspace { +func (s *Server) dropArtifacts(ctx context.Context, keepRoutingRules bool, sw iswitcher, opts ...WorkflowActionOption) error { + wopts := processWorkflowActionOptions(opts) + if !wopts.ignoreSourceKeyspace { if err := sw.dropSourceReverseVReplicationStreams(ctx); err != nil { return err } @@ -3465,39 +3468,6 @@ func (s *Server) validateShardsHaveVReplicationPermissions(ctx context.Context, return nil } -func (s *Server) updateTablesTrafficState(ctx context.Context, state *State, tables []string) error { - // We assume a consistent state, so only choose routing rule for one table. - if len(tables) == 0 { - return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "no tables in workflow %s.%s", state.TargetKeyspace, state.Workflow) - } - - var err error - state.RdonlyCellsSwitched, state.RdonlyCellsNotSwitched, err = s.GetCellsWithTableReadsSwitched(ctx, state.SourceKeyspace, state.TargetKeyspace, tables[0], topodatapb.TabletType_RDONLY) - if err != nil { - return err - } - state.ReplicaCellsSwitched, state.ReplicaCellsNotSwitched, err = s.GetCellsWithTableReadsSwitched(ctx, state.SourceKeyspace, state.TargetKeyspace, tables[0], topodatapb.TabletType_REPLICA) - if err != nil { - return err - } - globalRules, err := topotools.GetRoutingRules(ctx, s.ts) - if err != nil { - return err - } - for _, table := range tables { - // If a rule for the primary tablet type exists for any table and points to the target keyspace, - // then writes have been switched. - ruleKey := fmt.Sprintf("%s.%s", state.SourceKeyspace, table) - rr := globalRules[ruleKey] - if len(rr) > 0 && rr[0] != ruleKey { - state.WritesSwitched = true - break - } - } - log.Errorf("DEBUG: state: %+v", state) - return err -} - func (s *Server) Logger() logutil.Logger { if s.options.logger == nil { s.options.logger = logutil.NewConsoleLogger() // Use default system logger diff --git a/go/vt/vtctl/workflow/server_options.go b/go/vt/vtctl/workflow/server_options.go index bcf1d911f87..0384f7ddda0 100644 --- a/go/vt/vtctl/workflow/server_options.go +++ b/go/vt/vtctl/workflow/server_options.go @@ -55,37 +55,37 @@ func WithLogger(l logutil.Logger) ServerOption { }) } -// workflowOptions configure a workflow's optional behavior when +// workflowActionOptions configure a workflow's optional behavior when // performing actions in the worfklow server. -// workflowOptions are set by the WorkflowOption values passed +// workflowActionOptions are set by the WorkflowActionOption values passed // to the server functions. -type workflowOptions struct { +type workflowActionOptions struct { ignoreSourceKeyspace bool } -// WorkflowOption alters how we perform the certain workflow operations. -type WorkflowOption interface { - apply(*workflowOptions) +// WorkflowActionOption alters how we perform the certain workflow operations. +type WorkflowActionOption interface { + apply(*workflowActionOptions) } -// funcWorkflowOption wraps a function that modifies workflowOptions into -// an implementation of the WorkflowOption interface. -type funcWorkflowOption struct { - f func(*workflowOptions) +// funcWorkflowActionOption wraps a function that modifies workflowActionOptions +// into an implementation of the WorkflowActionOption interface. +type funcWorkflowActionOption struct { + f func(*workflowActionOptions) } -func (fwo *funcWorkflowOption) apply(wo *workflowOptions) { +func (fwo *funcWorkflowActionOption) apply(wo *workflowActionOptions) { fwo.f(wo) } -func newFuncWorkflowOption(f func(*workflowOptions)) *funcWorkflowOption { - return &funcWorkflowOption{ +func newFuncWorkflowActionOption(f func(*workflowActionOptions)) *funcWorkflowActionOption { + return &funcWorkflowActionOption{ f: f, } } -func IgnoreSourceKeyspace() WorkflowOption { - return newFuncWorkflowOption(func(o *workflowOptions) { +func IgnoreSourceKeyspace() WorkflowActionOption { + return newFuncWorkflowActionOption(func(o *workflowActionOptions) { o.ignoreSourceKeyspace = true }) } diff --git a/go/vt/vtctl/workflow/utils.go b/go/vt/vtctl/workflow/utils.go index 2f7f51ed206..2a6aa88cd10 100644 --- a/go/vt/vtctl/workflow/utils.go +++ b/go/vt/vtctl/workflow/utils.go @@ -1063,3 +1063,11 @@ func getVindexAndVSchema(ctx context.Context, ts *topo.Server, keyspace string, } return vindex, vschema, nil } + +func processWorkflowActionOptions(opts []WorkflowActionOption) workflowActionOptions { + var options workflowActionOptions + for _, o := range opts { + o.apply(&options) + } + return options +}