Skip to content

Commit

Permalink
More seperation of source and target in cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Feb 12, 2025
1 parent a016515 commit 173512d
Showing 1 changed file with 13 additions and 41 deletions.
54 changes: 13 additions & 41 deletions go/vt/vtctl/workflow/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -1294,7 +1294,7 @@ func (s *Server) MoveTablesComplete(ctx context.Context, req *vtctldatapb.MoveTa
}

if req.IgnoreSourceKeyspace {
if err := s.dropArtifacts(ctx, req.KeepRoutingRules, &switcher{s: s, ts: ts}, opts...); err != nil {
if err := s.dropArtifacts(ctx, req.KeepRoutingRules, &switcher{s: s, ts: ts}); err != nil {
return nil, vterrors.Wrapf(err, "failed to cleanup workflow artifacts")
}
if err := ts.TopoServer().RebuildSrvVSchema(ctx, nil); err != nil {
Expand Down Expand Up @@ -1956,6 +1956,10 @@ func (s *Server) dropTargets(ctx context.Context, ts *trafficSwitcher, keepData,
defer targetUnlock(&err)
ctx = lockCtx

if err := sw.dropTargetVReplicationStreams(ctx); err != nil {
return nil, err
}

// Stop the workflow before we delete the artifacts so that it doesn't try and
// continue doing work, and producing errors, as we delete the related artifacts.
if err = ts.ForAllTargets(func(target *MigrationTarget) error {
Expand Down Expand Up @@ -1993,7 +1997,7 @@ func (s *Server) dropTargets(ctx context.Context, ts *trafficSwitcher, keepData,
}
}
}
if err := s.dropRelatedArtifacts(ctx, keepRoutingRules, sw); err != nil {
if err := s.dropArtifacts(ctx, keepRoutingRules, sw); err != nil {
return nil, err
}
if err := ts.TopoServer().RebuildSrvVSchema(ctx, nil); err != nil {
Expand Down Expand Up @@ -2179,22 +2183,6 @@ func (s *Server) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workf
return ts, nil
}

func (s *Server) dropRelatedArtifacts(ctx context.Context, keepRoutingRules bool, sw iswitcher) error {
if !keepRoutingRules {
if err := sw.deleteRoutingRules(ctx); err != nil {
return err
}
if err := sw.deleteShardRoutingRules(ctx); err != nil {
return err
}
if err := sw.deleteKeyspaceRoutingRules(ctx); err != nil {
return err
}
}

return nil
}

// dropSources cleans up source tables, shards and denied tables after a
// MoveTables/Reshard is completed.
func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalType TableRemovalType, keepData, keepRoutingRules, force, dryRun bool) (*[]string, error) {
Expand All @@ -2208,27 +2196,24 @@ func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalTy
sw = &switcher{ts: ts, s: s}
}

// Lock the source and target keyspaces.
// Lock the source keyspace.
ctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "DropSources")
if lockErr != nil {
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr)
}
defer sourceUnlock(&err)
if ts.TargetKeyspaceName() != ts.SourceKeyspaceName() {
lockCtx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "DropSources")
if lockErr != nil {
return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s keyspace", ts.TargetKeyspaceName()), lockErr)
}
defer targetUnlock(&err)
ctx = lockCtx
}

if !force {
if err := sw.validateWorkflowHasCompleted(ctx); err != nil {
ts.Logger().Errorf("Workflow has not completed, cannot DropSources: %v", err)
return nil, err
}
}

if err := sw.dropSourceReverseVReplicationStreams(ctx); err != nil {
return nil, err
}

if !keepData {
switch ts.MigrationType() {
case binlogdatapb.MigrationType_TABLES:
Expand All @@ -2239,10 +2224,6 @@ func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalTy
if err := sw.dropSourceDeniedTables(ctx); err != nil {
return nil, err
}
if err := sw.dropTargetDeniedTables(ctx); err != nil {
return nil, err
}

case binlogdatapb.MigrationType_SHARDS:
s.Logger().Infof("Removing shards")
if err := sw.dropSourceShards(ctx); err != nil {
Expand All @@ -2260,16 +2241,7 @@ func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalTy
return sw.logs(), nil
}

func (s *Server) dropArtifacts(ctx context.Context, keepRoutingRules bool, sw iswitcher, opts ...WorkflowActionOption) error {
wopts := processWorkflowActionOptions(opts)
if !wopts.ignoreSourceKeyspace {
if err := sw.dropSourceReverseVReplicationStreams(ctx); err != nil {
return err
}
}
if err := sw.dropTargetVReplicationStreams(ctx); err != nil {
return err
}
func (s *Server) dropArtifacts(ctx context.Context, keepRoutingRules bool, sw iswitcher) error {
if !keepRoutingRules {
if err := sw.deleteRoutingRules(ctx); err != nil {
return err
Expand Down

0 comments on commit 173512d

Please sign in to comment.