diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index e6a241659ed..cf3e00d86d2 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -1294,7 +1294,7 @@ func (s *Server) MoveTablesComplete(ctx context.Context, req *vtctldatapb.MoveTa } if req.IgnoreSourceKeyspace { - if err := s.dropArtifacts(ctx, req.KeepRoutingRules, &switcher{s: s, ts: ts}, opts...); err != nil { + if err := s.dropArtifacts(ctx, req.KeepRoutingRules, &switcher{s: s, ts: ts}); err != nil { return nil, vterrors.Wrapf(err, "failed to cleanup workflow artifacts") } if err := ts.TopoServer().RebuildSrvVSchema(ctx, nil); err != nil { @@ -1956,6 +1956,10 @@ func (s *Server) dropTargets(ctx context.Context, ts *trafficSwitcher, keepData, defer targetUnlock(&err) ctx = lockCtx + if err := sw.dropTargetVReplicationStreams(ctx); err != nil { + return nil, err + } + // Stop the workflow before we delete the artifacts so that it doesn't try and // continue doing work, and producing errors, as we delete the related artifacts. if err = ts.ForAllTargets(func(target *MigrationTarget) error { @@ -1993,7 +1997,7 @@ func (s *Server) dropTargets(ctx context.Context, ts *trafficSwitcher, keepData, } } } - if err := s.dropRelatedArtifacts(ctx, keepRoutingRules, sw); err != nil { + if err := s.dropArtifacts(ctx, keepRoutingRules, sw); err != nil { return nil, err } if err := ts.TopoServer().RebuildSrvVSchema(ctx, nil); err != nil { @@ -2179,22 +2183,6 @@ func (s *Server) buildTrafficSwitcher(ctx context.Context, targetKeyspace, workf return ts, nil } -func (s *Server) dropRelatedArtifacts(ctx context.Context, keepRoutingRules bool, sw iswitcher) error { - if !keepRoutingRules { - if err := sw.deleteRoutingRules(ctx); err != nil { - return err - } - if err := sw.deleteShardRoutingRules(ctx); err != nil { - return err - } - if err := sw.deleteKeyspaceRoutingRules(ctx); err != nil { - return err - } - } - - return nil -} - // dropSources cleans up source tables, shards and denied tables after a // MoveTables/Reshard is completed. func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalType TableRemovalType, keepData, keepRoutingRules, force, dryRun bool) (*[]string, error) { @@ -2208,20 +2196,12 @@ func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalTy sw = &switcher{ts: ts, s: s} } - // Lock the source and target keyspaces. + // Lock the source keyspace. ctx, sourceUnlock, lockErr := sw.lockKeyspace(ctx, ts.SourceKeyspaceName(), "DropSources") if lockErr != nil { return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s keyspace", ts.SourceKeyspaceName()), lockErr) } defer sourceUnlock(&err) - if ts.TargetKeyspaceName() != ts.SourceKeyspaceName() { - lockCtx, targetUnlock, lockErr := sw.lockKeyspace(ctx, ts.TargetKeyspaceName(), "DropSources") - if lockErr != nil { - return defaultErrorHandler(ts.Logger(), fmt.Sprintf("failed to lock the %s keyspace", ts.TargetKeyspaceName()), lockErr) - } - defer targetUnlock(&err) - ctx = lockCtx - } if !force { if err := sw.validateWorkflowHasCompleted(ctx); err != nil { @@ -2229,6 +2209,11 @@ func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalTy return nil, err } } + + if err := sw.dropSourceReverseVReplicationStreams(ctx); err != nil { + return nil, err + } + if !keepData { switch ts.MigrationType() { case binlogdatapb.MigrationType_TABLES: @@ -2239,10 +2224,6 @@ func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalTy if err := sw.dropSourceDeniedTables(ctx); err != nil { return nil, err } - if err := sw.dropTargetDeniedTables(ctx); err != nil { - return nil, err - } - case binlogdatapb.MigrationType_SHARDS: s.Logger().Infof("Removing shards") if err := sw.dropSourceShards(ctx); err != nil { @@ -2260,16 +2241,7 @@ func (s *Server) dropSources(ctx context.Context, ts *trafficSwitcher, removalTy return sw.logs(), nil } -func (s *Server) dropArtifacts(ctx context.Context, keepRoutingRules bool, sw iswitcher, opts ...WorkflowActionOption) error { - wopts := processWorkflowActionOptions(opts) - if !wopts.ignoreSourceKeyspace { - if err := sw.dropSourceReverseVReplicationStreams(ctx); err != nil { - return err - } - } - if err := sw.dropTargetVReplicationStreams(ctx); err != nil { - return err - } +func (s *Server) dropArtifacts(ctx context.Context, keepRoutingRules bool, sw iswitcher) error { if !keepRoutingRules { if err := sw.deleteRoutingRules(ctx); err != nil { return err