Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Support ignoring the source Keyspace on MoveTables cancel/complete #17729

Merged
merged 14 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 12 additions & 10 deletions go/cmd/vtctldclient/command/vreplication/common/cancel.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@ import (
)

var CancelOptions = struct {
KeepData bool
KeepRoutingRules bool
Shards []string
DeleteBatchSize int64
KeepData bool
KeepRoutingRules bool
Shards []string
DeleteBatchSize int64
IgnoreSourceKeyspace bool
}{}

func GetCancelCommand(opts *SubCommandsOpts) *cobra.Command {
Expand All @@ -58,12 +59,13 @@ func commandCancel(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

req := &vtctldatapb.WorkflowDeleteRequest{
Keyspace: BaseOptions.TargetKeyspace,
Workflow: BaseOptions.Workflow,
KeepData: CancelOptions.KeepData,
KeepRoutingRules: CancelOptions.KeepRoutingRules,
Shards: CancelOptions.Shards,
DeleteBatchSize: CancelOptions.DeleteBatchSize,
Keyspace: BaseOptions.TargetKeyspace,
Workflow: BaseOptions.Workflow,
KeepData: CancelOptions.KeepData,
KeepRoutingRules: CancelOptions.KeepRoutingRules,
Shards: CancelOptions.Shards,
DeleteBatchSize: CancelOptions.DeleteBatchSize,
IgnoreSourceKeyspace: CancelOptions.IgnoreSourceKeyspace,
}
resp, err := GetClient().WorkflowDelete(GetCommandCtx(), req)
if err != nil {
Expand Down
24 changes: 13 additions & 11 deletions go/cmd/vtctldclient/command/vreplication/common/complete.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@ import (
)

var CompleteOptions = struct {
KeepData bool
KeepRoutingRules bool
RenameTables bool
DryRun bool
Shards []string
KeepData bool
KeepRoutingRules bool
RenameTables bool
DryRun bool
Shards []string
IgnoreSourceKeyspace bool
}{}

func GetCompleteCommand(opts *SubCommandsOpts) *cobra.Command {
Expand All @@ -41,12 +42,13 @@ func commandComplete(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

req := &vtctldatapb.MoveTablesCompleteRequest{
Workflow: BaseOptions.Workflow,
TargetKeyspace: BaseOptions.TargetKeyspace,
KeepData: CompleteOptions.KeepData,
KeepRoutingRules: CompleteOptions.KeepRoutingRules,
RenameTables: CompleteOptions.RenameTables,
DryRun: CompleteOptions.DryRun,
Workflow: BaseOptions.Workflow,
TargetKeyspace: BaseOptions.TargetKeyspace,
KeepData: CompleteOptions.KeepData,
KeepRoutingRules: CompleteOptions.KeepRoutingRules,
RenameTables: CompleteOptions.RenameTables,
DryRun: CompleteOptions.DryRun,
IgnoreSourceKeyspace: CompleteOptions.IgnoreSourceKeyspace,
}
resp, err := GetClient().MoveTablesComplete(GetCommandCtx(), req)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,13 +106,15 @@ func registerCommands(root *cobra.Command) {
complete.Flags().BoolVar(&common.CompleteOptions.KeepRoutingRules, "keep-routing-rules", false, "Keep the routing rules in place that direct table traffic from the source keyspace to the target keyspace of the MoveTables workflow.")
complete.Flags().BoolVar(&common.CompleteOptions.RenameTables, "rename-tables", false, "Keep the original source table data that was copied by the MoveTables workflow, but rename each table to '_<tablename>_old'.")
complete.Flags().BoolVar(&common.CompleteOptions.DryRun, "dry-run", false, "Print the actions that would be taken and report any known errors that would have occurred.")
complete.Flags().BoolVar(&common.CompleteOptions.IgnoreSourceKeyspace, "ignore-source-keyspace", false, "WARNING: This option should only be used when absolutely necessary. Ignore the source keyspace as the workflow is completed and cleaned up. This allows the workflow to be completed if the source keyspace has been deleted or is not currently available.")
common.AddShardSubsetFlag(complete, &common.CompleteOptions.Shards)
base.AddCommand(complete)

cancel := common.GetCancelCommand(opts)
cancel.Flags().BoolVar(&common.CancelOptions.KeepData, "keep-data", false, "Keep the partially copied table data from the MoveTables workflow in the target keyspace.")
cancel.Flags().BoolVar(&common.CancelOptions.KeepRoutingRules, "keep-routing-rules", false, "Keep the routing rules created for the MoveTables workflow.")
cancel.Flags().Int64Var(&common.CancelOptions.DeleteBatchSize, "delete-batch-size", DefaultDeleteBatchSize, "When cleaning up the migrated data in tables moved as part of a multi-tenant workflow, delete the records in batches of this size.")
cancel.Flags().BoolVar(&common.CancelOptions.IgnoreSourceKeyspace, "ignore-source-keyspace", false, "WARNING: This option should only be used when absolutely necessary. Ignore the source keyspace as the workflow is canceled and cleaned up. This allows the workflow to be canceled if the source keyspace has been deleted or is not currently available.")
common.AddShardSubsetFlag(cancel, &common.CancelOptions.Shards)
base.AddCommand(cancel)
}
Expand Down
20 changes: 11 additions & 9 deletions go/cmd/vtctldclient/command/vreplication/workflow/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,10 @@ import (

var (
deleteOptions = struct {
KeepData bool
KeepRoutingRules bool
DeleteBatchSize int64
KeepData bool
KeepRoutingRules bool
DeleteBatchSize int64
IgnoreSourceKeyspace bool
}{}

// delete makes a WorkflowDelete gRPC call to a vtctld.
Expand All @@ -51,12 +52,13 @@ func commandDelete(cmd *cobra.Command, args []string) error {
cli.FinishedParsing(cmd)

req := &vtctldatapb.WorkflowDeleteRequest{
Keyspace: baseOptions.Keyspace,
Workflow: baseOptions.Workflow,
KeepData: deleteOptions.KeepData,
KeepRoutingRules: deleteOptions.KeepRoutingRules,
Shards: baseOptions.Shards,
DeleteBatchSize: deleteOptions.DeleteBatchSize,
Keyspace: baseOptions.Keyspace,
Workflow: baseOptions.Workflow,
KeepData: deleteOptions.KeepData,
KeepRoutingRules: deleteOptions.KeepRoutingRules,
Shards: baseOptions.Shards,
DeleteBatchSize: deleteOptions.DeleteBatchSize,
IgnoreSourceKeyspace: deleteOptions.IgnoreSourceKeyspace,
}
resp, err := common.GetClient().WorkflowDelete(common.GetCommandCtx(), req)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func registerCommands(root *cobra.Command) {
delete.Flags().BoolVar(&deleteOptions.KeepData, "keep-data", false, "Keep the partially copied table data from the workflow in the target keyspace.")
delete.Flags().BoolVar(&deleteOptions.KeepRoutingRules, "keep-routing-rules", false, "Keep the routing rules created for the workflow.")
delete.Flags().Int64Var(&deleteOptions.DeleteBatchSize, "delete-batch-size", movetables.DefaultDeleteBatchSize, "When cleaning up the migrated data in tables moved as part of a multi-tenant MoveTables workflow, delete the records in batches of this size.")
delete.Flags().BoolVar(&deleteOptions.IgnoreSourceKeyspace, "ignore-source-keyspace", false, "WARNING: This option should only be used when absolutely necessary. Ignore the source keyspace as the workflow is deleted and cleaned up. This allows the workflow to be deleted if the source keyspace has been deleted or is not currently available. NOTE: this is only used with MoveTables.")
common.AddShardSubsetFlag(delete, &baseOptions.Shards)
base.AddCommand(delete)

Expand Down
64 changes: 39 additions & 25 deletions go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ import (
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/test/endtoend/throttler"
"vitess.io/vitess/go/vt/concurrency"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"

vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
Expand Down Expand Up @@ -415,10 +417,7 @@ func (vc *VitessCluster) setupVtctldClient() {
// CleanupDataroot deletes the vtdataroot directory. Since we run multiple tests sequentially in a single CI test shard,
// we can run out of disk space due to all the leftover artifacts from previous tests.
func (vc *VitessCluster) CleanupDataroot(t *testing.T, recreate bool) {
// This is always set to "true" on GitHub Actions runners:
// https://docs.github.com/en/actions/learn-github-actions/variables#default-environment-variables
ci, ok := os.LookupEnv("CI")
if !ok || strings.ToLower(ci) != "true" {
if debugMode {
// Leave the directory in place to support local debugging.
return
}
Expand Down Expand Up @@ -792,29 +791,9 @@ func (vc *VitessCluster) teardown() {
}
}

var wg sync.WaitGroup

for _, keyspace := range keyspaces {
for _, shard := range keyspace.Shards {
for _, tablet := range shard.Tablets {
wg.Add(1)
go func(tablet2 *Tablet) {
defer wg.Done()
if tablet2.DbServer != nil && tablet2.DbServer.TabletUID > 0 {
if err := tablet2.DbServer.Stop(); err != nil {
log.Infof("Error stopping mysql process: %s", err.Error())
}
}
if err := tablet2.Vttablet.TearDown(); err != nil {
log.Infof("Error stopping vttablet %s %s", tablet2.Name, err.Error())
} else {
log.Infof("Successfully stopped vttablet %s", tablet2.Name)
}
}(tablet)
}
}
_ = vc.TearDownKeyspace(keyspace)
}
wg.Wait()
if err := vc.Vtctld.TearDown(); err != nil {
log.Infof("Error stopping Vtctld: %s", err.Error())
} else {
Expand All @@ -836,6 +815,41 @@ func (vc *VitessCluster) teardown() {
}
}

func (vc *VitessCluster) TearDownKeyspace(ks *Keyspace) error {
wg := sync.WaitGroup{}
errs := concurrency.AllErrorRecorder{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a good candidate for errgroup, but fine as it is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I can switch that over. I had only moved this code in the PR which is why I left it as-is.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then leave it as it is.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went ahead and migrated it since I was going to merge in origin/main anyway and thus do another CI run: 7946667

for _, shard := range ks.Shards {
for _, tablet := range shard.Tablets {
wg.Add(1)
go func() {
defer wg.Done()
if tablet.DbServer != nil && tablet.DbServer.TabletUID > 0 {
if err := tablet.DbServer.Stop(); err != nil {
log.Infof("Error stopping mysql process: %s", err.Error())
errs.RecordError(err)
}
}
if err := tablet.Vttablet.TearDown(); err != nil {
log.Infof("Error stopping vttablet %s %s", tablet.Name, err.Error())
errs.RecordError(err)
} else {
log.Infof("Successfully stopped vttablet %s", tablet.Name)
}
}()
}
}
wg.Wait()
return errs.AggrError(vterrors.Aggregate)
}

func (vc *VitessCluster) DeleteKeyspace(t testing.TB, ksName string) {
out, err := vc.VtctldClient.ExecuteCommandWithOutput("DeleteKeyspace", ksName, "--recursive")
if err != nil {
log.Error("DeleteKeyspace failed with error: , output: %s", err, out)
}
require.NoError(t, err)
}

// TearDown brings down a cluster, deleting processes, removing topo keys
func (vc *VitessCluster) TearDown() {
if debugMode {
Expand Down
7 changes: 7 additions & 0 deletions go/test/endtoend/vreplication/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,13 @@ func confirmKeyspacesRoutedTo(t *testing.T, keyspace string, routedKeyspace, tab
}
}

// confirmNoWorkflows confirms that there are no active workflows in the given keyspace.
func confirmNoWorkflows(t *testing.T, keyspace string) {
output, err := vc.VtctldClient.ExecuteCommandWithOutput("GetWorkflows", keyspace, "--compact", "--include-logs=false")
require.NoError(t, err)
require.True(t, isEmptyWorkflowShowOutput(output))
}

// getVReplicationConfig returns the vreplication config for one random workflow for a given tablet. Currently, this is
// used when there is only one workflow, so we are using this simple method to get the config.
func getVReplicationConfig(t *testing.T, tab *cluster.VttabletProcess) map[string]string {
Expand Down
Loading
Loading