Skip to content

Commit

Permalink
Allow complete when keyspace is completely gone
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Feb 11, 2025
1 parent 59658e4 commit 3b24612
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 32 deletions.
104 changes: 73 additions & 31 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,8 +428,12 @@ func (s *Server) GetWorkflowState(ctx context.Context, targetKeyspace, workflowN
return s.getWorkflowState(ctx, targetKeyspace, workflowName)
}

func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowName string) (*trafficSwitcher, *State, error) {
ts, err := s.buildTrafficSwitcher(ctx, targetKeyspace, workflowName)
func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowName string, opts ...WorkflowOption) (*trafficSwitcher, *State, error) {
var options workflowOptions
for _, o := range opts {
o.apply(&options)
}
ts, err := s.buildTrafficSwitcher(ctx, targetKeyspace, workflowName, opts...)
if err != nil {
s.Logger().Errorf("buildTrafficSwitcher failed: %v", err)
return nil, nil, err
Expand All @@ -442,6 +446,13 @@ func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowN
IsPartialMigration: ts.isPartialMigration,
}

if ts.workflowType == binlogdatapb.VReplicationWorkflowType_MoveTables && options.ignoreSourceKeyspace {
if err := s.updateTablesTrafficState(ctx, state, ts.tables); err != nil {
return nil, nil, err
}
return ts, state, nil
}

if ts.workflowType == binlogdatapb.VReplicationWorkflowType_CreateLookupIndex {
// Nothing left to do.
return ts, state, nil
Expand Down Expand Up @@ -471,7 +482,6 @@ func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowN
if len(ts.Tables()) == 0 {
return nil, nil, vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "no tables in workflow %s.%s", targetKeyspace, workflowName)
}
table := ts.Tables()[0]

if ts.IsMultiTenantMigration() {
// Deduce which traffic has been switched by looking at the current keyspace routing rules.
Expand All @@ -497,29 +507,9 @@ func (s *Server) getWorkflowState(ctx context.Context, targetKeyspace, workflowN
}
}
} else {
state.RdonlyCellsSwitched, state.RdonlyCellsNotSwitched, err = s.GetCellsWithTableReadsSwitched(ctx, sourceKeyspace, targetKeyspace, table, topodatapb.TabletType_RDONLY)
if err != nil {
return nil, nil, err
}

state.ReplicaCellsSwitched, state.ReplicaCellsNotSwitched, err = s.GetCellsWithTableReadsSwitched(ctx, sourceKeyspace, targetKeyspace, table, topodatapb.TabletType_REPLICA)
if err != nil {
if err := s.updateTablesTrafficState(ctx, state, ts.tables); err != nil {
return nil, nil, err
}
globalRules, err := topotools.GetRoutingRules(ctx, ts.TopoServer())
if err != nil {
return nil, nil, err
}
for _, table := range ts.Tables() {
// If a rule for the primary tablet type exists for any table and points to the target keyspace,
// then writes have been switched.
ruleKey := fmt.Sprintf("%s.%s", sourceKeyspace, table)
rr := globalRules[ruleKey]
if len(rr) > 0 && rr[0] != ruleKey {
state.WritesSwitched = true
break
}
}
}
} else {
state.WorkflowType = TypeReshard
Expand Down Expand Up @@ -1076,7 +1066,7 @@ func (s *Server) moveTablesCreate(ctx context.Context, req *vtctldatapb.MoveTabl
err = vterrors.Wrapf(err, "failed to cleanup denied table entries: %v", cerr)
}
}
if cerr := s.dropArtifacts(ctx, false, false, &switcher{s: s, ts: ts}); cerr != nil {
if cerr := s.dropArtifacts(ctx, false, &switcher{s: s, ts: ts}); cerr != nil {
err = vterrors.Wrapf(err, "failed to cleanup workflow artifacts: %v", cerr)
}
if origVSchema == nil { // There's no previous version to restore
Expand Down Expand Up @@ -1249,7 +1239,11 @@ func (s *Server) MoveTablesComplete(ctx context.Context, req *vtctldatapb.MoveTa
span, ctx := trace.NewSpan(ctx, "workflow.Server.MoveTablesComplete")
defer span.Finish()

ts, state, err := s.getWorkflowState(ctx, req.GetTargetKeyspace(), req.GetWorkflow())
opts := []WorkflowOption{}
if req.IgnoreSourceKeyspace {
opts = append(opts, IgnoreSourceKeyspace())
}
ts, state, err := s.getWorkflowState(ctx, req.GetTargetKeyspace(), req.GetWorkflow(), opts...)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1291,7 +1285,7 @@ func (s *Server) MoveTablesComplete(ctx context.Context, req *vtctldatapb.MoveTa
}

if req.IgnoreSourceKeyspace {
if err := s.dropArtifacts(ctx, req.KeepRoutingRules, true, &switcher{s: s, ts: ts}); err != nil {
if err := s.dropArtifacts(ctx, req.KeepRoutingRules, &switcher{s: s, ts: ts}, opts...); err != nil {
return nil, vterrors.Wrapf(err, "failed to cleanup workflow artifacts")
}
if err := ts.TopoServer().RebuildSrvVSchema(ctx, nil); err != nil {
Expand Down Expand Up @@ -2053,7 +2047,11 @@ func (s *Server) deleteTenantData(ctx context.Context, ts *trafficSwitcher, batc
})
}

func (s *Server) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workflowName string) (*trafficSwitcher, error) {
func (s *Server) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workflowName string, opts ...WorkflowOption) (*trafficSwitcher, error) {
var options workflowOptions
for _, o := range opts {
o.apply(&options)
}
tgtInfo, err := BuildTargets(ctx, s.ts, s.tmc, targetKeyspace, workflowName)
if err != nil {
s.Logger().Infof("Error building targets: %s", err)
Expand Down Expand Up @@ -2120,6 +2118,9 @@ func (s *Server) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workf
}
}

if options.ignoreSourceKeyspace {
continue
}
if _, ok := ts.sources[bls.Shard]; ok {
continue
}
Expand Down Expand Up @@ -2152,6 +2153,10 @@ func (s *Server) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workf
}
}
}
if ts.workflowType == binlogdatapb.VReplicationWorkflowType_MoveTables && options.ignoreSourceKeyspace {
log.Errorf("DEBUG: Ignoring source keyspace for MoveTables workflow with source Keyspace %s", ts.sourceKeyspace)
return ts, nil
}
vs, err := sourceTopo.GetVSchema(ctx, ts.sourceKeyspace)
if err != nil {
return nil, err
Expand Down Expand Up @@ -2247,7 +2252,7 @@ func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalTy
}
}
}
if err := s.dropArtifacts(ctx, keepRoutingRules, false, sw); err != nil {
if err := s.dropArtifacts(ctx, keepRoutingRules, sw); err != nil {
return nil, err
}
if err := ts.TopoServer().RebuildSrvVSchema(ctx, nil); err != nil {
Expand All @@ -2257,8 +2262,12 @@ func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalTy
return sw.logs(), nil
}

func (s *Server) dropArtifacts(ctx context.Context, keepRoutingRules, ignoreSourceKeyspace bool, sw iswitcher) error {
if !ignoreSourceKeyspace {
func (s *Server) dropArtifacts(ctx context.Context, keepRoutingRules bool, sw iswitcher, opts ...WorkflowOption) error {
var options workflowOptions
for _, o := range opts {
o.apply(&options)
}
if !options.ignoreSourceKeyspace {
if err := sw.dropSourceReverseVReplicationStreams(ctx); err != nil {
return err
}
Expand Down Expand Up @@ -3456,6 +3465,39 @@ func (s *Server) validateShardsHaveVReplicationPermissions(ctx context.Context,
return nil
}

func (s *Server) updateTablesTrafficState(ctx context.Context, state *State, tables []string) error {
// We assume a consistent state, so only choose routing rule for one table.
if len(tables) == 0 {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "no tables in workflow %s.%s", state.TargetKeyspace, state.Workflow)
}

var err error
state.RdonlyCellsSwitched, state.RdonlyCellsNotSwitched, err = s.GetCellsWithTableReadsSwitched(ctx, state.SourceKeyspace, state.TargetKeyspace, tables[0], topodatapb.TabletType_RDONLY)
if err != nil {
return err
}
state.ReplicaCellsSwitched, state.ReplicaCellsNotSwitched, err = s.GetCellsWithTableReadsSwitched(ctx, state.SourceKeyspace, state.TargetKeyspace, tables[0], topodatapb.TabletType_REPLICA)
if err != nil {
return err
}
globalRules, err := topotools.GetRoutingRules(ctx, s.ts)
if err != nil {
return err
}
for _, table := range tables {
// If a rule for the primary tablet type exists for any table and points to the target keyspace,
// then writes have been switched.
ruleKey := fmt.Sprintf("%s.%s", state.SourceKeyspace, table)
rr := globalRules[ruleKey]
if len(rr) > 0 && rr[0] != ruleKey {
state.WritesSwitched = true
break
}
}
log.Errorf("DEBUG: state: %+v", state)
return err
}

func (s *Server) Logger() logutil.Logger {
if s.options.logger == nil {
s.options.logger = logutil.NewConsoleLogger() // Use default system logger
Expand Down
35 changes: 35 additions & 0 deletions go/vt/vtctl/workflow/server_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,38 @@ func WithLogger(l logutil.Logger) ServerOption {
o.logger = l
})
}

// workflowOptions configure a workflow's optional behavior when
// performing actions in the worfklow server.
// workflowOptions are set by the WorkflowOption values passed
// to the server functions.
type workflowOptions struct {
ignoreSourceKeyspace bool
}

// WorkflowOption alters how we perform the certain workflow operations.
type WorkflowOption interface {
apply(*workflowOptions)
}

// funcWorkflowOption wraps a function that modifies workflowOptions into
// an implementation of the WorkflowOption interface.
type funcWorkflowOption struct {
f func(*workflowOptions)
}

func (fwo *funcWorkflowOption) apply(wo *workflowOptions) {
fwo.f(wo)
}

func newFuncWorkflowOption(f func(*workflowOptions)) *funcWorkflowOption {
return &funcWorkflowOption{
f: f,
}
}

func IgnoreSourceKeyspace() WorkflowOption {
return newFuncWorkflowOption(func(o *workflowOptions) {
o.ignoreSourceKeyspace = true
})
}
8 changes: 8 additions & 0 deletions go/vt/vtctl/workflow/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,14 @@ func TestMoveTablesComplete(t *testing.T) {
Workflow: workflowName,
IgnoreSourceKeyspace: true,
},
preFunc: func(t *testing.T, env *testEnv) {
err := env.ts.DeleteKeyspace(ctx, sourceKeyspaceName)
require.NoError(t, err)
},
postFunc: func(t *testing.T, env *testEnv) {
err := env.ts.CreateKeyspace(ctx, sourceKeyspaceName, &topodatapb.Keyspace{})
require.NoError(t, err)
},
expectedTargetQueries: []*queryResult{
{
query: fmt.Sprintf("delete from _vt.vreplication where db_name = 'vt_%s' and workflow = '%s'",
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtctl/workflow/traffic_switcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (ts *trafficSwitcher) ExternalTopo() *topo.Server { ret
func (ts *trafficSwitcher) MigrationType() binlogdatapb.MigrationType { return ts.migrationType }
func (ts *trafficSwitcher) IsPartialMigration() bool { return ts.isPartialMigration }
func (ts *trafficSwitcher) ReverseWorkflowName() string { return ts.reverseWorkflow }
func (ts *trafficSwitcher) SourceKeyspaceName() string { return ts.sourceKSSchema.Keyspace.Name }
func (ts *trafficSwitcher) SourceKeyspaceName() string { return ts.sourceKeyspace }
func (ts *trafficSwitcher) SourceKeyspaceSchema() *vindexes.KeyspaceSchema { return ts.sourceKSSchema }
func (ts *trafficSwitcher) Sources() map[string]*MigrationSource { return ts.sources }
func (ts *trafficSwitcher) Tables() []string { return ts.tables }
Expand Down

0 comments on commit 3b24612

Please sign in to comment.