From 587f7ff5dcb6c92d09d454423297a34c32aeb318 Mon Sep 17 00:00:00 2001 From: Rohit Nayak Date: Fri, 21 Feb 2025 15:46:44 +0100 Subject: [PATCH] Refactor code around setting sequences while switching primary traffic Signed-off-by: Rohit Nayak --- go/vt/vtctl/workflow/sequences.go | 751 ++++++++++++++++++ go/vt/vtctl/workflow/sequences_test.go | 601 ++++++++++++++ go/vt/vtctl/workflow/server.go | 17 - go/vt/vtctl/workflow/traffic_switcher.go | 539 ------------- go/vt/vtctl/workflow/traffic_switcher_test.go | 589 +------------- 5 files changed, 1354 insertions(+), 1143 deletions(-) create mode 100644 go/vt/vtctl/workflow/sequences.go create mode 100644 go/vt/vtctl/workflow/sequences_test.go diff --git a/go/vt/vtctl/workflow/sequences.go b/go/vt/vtctl/workflow/sequences.go new file mode 100644 index 00000000000..c66596c2727 --- /dev/null +++ b/go/vt/vtctl/workflow/sequences.go @@ -0,0 +1,751 @@ +package workflow + +import ( + "context" + "fmt" + "sort" + "strings" + "sync" + + "golang.org/x/exp/maps" + "golang.org/x/sync/errgroup" + + "vitess.io/vitess/go/mysql/sqlerror" + "vitess.io/vitess/go/sqlescape" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/mysqlctl/tmutils" + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + vschemapb "vitess.io/vitess/go/vt/proto/vschema" + vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/topo/topoproto" + "vitess.io/vitess/go/vt/vterrors" + "vitess.io/vitess/go/vt/vtgate/vindexes" + "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" +) + +// sequenceMetadata contains all of the relevant metadata for a sequence that +// is being used by a table involved in a vreplication workflow. +type sequenceMetadata struct { + // The name of the sequence table. + backingTableName string + // The keyspace where the backing table lives. + backingTableKeyspace string + // The dbName in use by the keyspace where the backing table lives. + backingTableDBName string + // The name of the table using the sequence. + usingTableName string + // The dbName in use by the keyspace where the using table lives. + usingTableDBName string + // The using table definition. + usingTableDefinition *vschemapb.Table + + // escaped values + usingCol, usingDB, usingTable string + backingDB, backingTable string +} + +func (sm *sequenceMetadata) escapeValues() error { + usingCol, err := sqlescape.EnsureEscaped(sm.usingTableDefinition.AutoIncrement.Column) + if err != nil { + err = vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid column name %s specified for sequence in table %s: %v", + sm.usingTableDefinition.AutoIncrement.Column, sm.usingTableName, err) + return err + } + sm.usingCol = usingCol + usingDB, err := sqlescape.EnsureEscaped(sm.usingTableDBName) + if err != nil { + err = vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid database name %s specified for sequence in table %s: %v", + sm.usingTableDBName, sm.usingTableName, err) + return err + } + sm.usingDB = usingDB + usingTable, err := sqlescape.EnsureEscaped(sm.usingTableName) + if err != nil { + err = vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid table name %s specified for sequence: %v", + sm.usingTableName, err) + return err + } + sm.usingTable = usingTable + backingDB, err := sqlescape.EnsureEscaped(sm.backingTableDBName) + if err != nil { + err = vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid database name %s specified for sequence backing table: %v", + sm.backingTableDBName, err) + return err + } + sm.backingDB = backingDB + backingTable, err := sqlescape.EnsureEscaped(sm.backingTableName) + if err != nil { + err = vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid table name %s specified for sequence backing table: %v", + sm.backingTableName, err) + return err + } + sm.backingTable = backingTable + return nil +} + +func (ts *trafficSwitcher) getMaxSequenceValues(ctx context.Context, sequences map[string]*sequenceMetadata) (map[string]int64, error) { + maxValues := make(map[string]int64, len(sequences)) + mu := sync.Mutex{} + initGroup, gctx := errgroup.WithContext(ctx) + for _, sm := range sequences { + initGroup.Go(func() error { + maxId, err := ts.getMaxSequenceValue(gctx, sm) + if err != nil { + return err + } + mu.Lock() + defer mu.Unlock() + maxValues[sm.backingTableName] = maxId + return nil + }) + } + errs := initGroup.Wait() + if errs != nil { + return nil, errs + } + return maxValues, nil + +} + +func (ts *trafficSwitcher) getMaxSequenceValue(ctx context.Context, seq *sequenceMetadata) (int64, error) { + var maxSequenceValue int64 + var mu sync.Mutex + setMaxSequenceValue := func(id int64) { + mu.Lock() + defer mu.Unlock() + if id > maxSequenceValue { + maxSequenceValue = id + } + } + if err := seq.escapeValues(); err != nil { + return 0, err + } + errs := ts.ForAllTargets(func(target *MigrationTarget) error { + primary := target.GetPrimary() + if primary == nil || primary.GetAlias() == nil { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "no primary tablet found for target shard %s/%s", + ts.targetKeyspace, target.GetShard().ShardName()) + } + + query := sqlparser.BuildParsedQuery(sqlGetMaxSequenceVal, + seq.usingCol, + seq.usingDB, + seq.usingTable, + ) + qr, terr := ts.ws.tmc.ExecuteFetchAsApp(ctx, primary.Tablet, true, &tabletmanagerdatapb.ExecuteFetchAsAppRequest{ + Query: []byte(query.Query), + MaxRows: 1, + }) + if terr != nil || len(qr.Rows) != 1 { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, + "failed to get the max used sequence value for target table %s.%s on tablet %s in order to initialize the backing sequence table: %v", + ts.targetKeyspace, seq.usingTableName, topoproto.TabletAliasString(primary.Alias), terr) + } + rawVal := sqltypes.Proto3ToResult(qr).Rows[0][0] + maxID := int64(0) + if !rawVal.IsNull() { // If it's NULL then there are no rows and 0 remains the max + maxID, terr = rawVal.ToInt64() + if terr != nil { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get the max used sequence value for target table %s.%s on tablet %s in order to initialize the backing sequence table: %v", + ts.targetKeyspace, seq.usingTableName, topoproto.TabletAliasString(primary.Alias), terr) + } + } + setMaxSequenceValue(maxID) + return nil + }) + return maxSequenceValue, errs +} + +func (ts *trafficSwitcher) updateSequenceValue(ctx context.Context, seq *sequenceMetadata, currentMaxValue int64) error { + if err := seq.escapeValues(); err != nil { + return err + } + nextVal := currentMaxValue + 1 + + // Now we need to update the sequence table, if needed, in order to + // ensure that that the next value it provides is > the current max. + sequenceShard, ierr := ts.TopoServer().GetOnlyShard(ctx, seq.backingTableKeyspace) + if ierr != nil || sequenceShard == nil || sequenceShard.PrimaryAlias == nil { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get the primary tablet for keyspace %s: %v", + seq.backingTableKeyspace, ierr) + } + sequenceTablet, ierr := ts.TopoServer().GetTablet(ctx, sequenceShard.PrimaryAlias) + if ierr != nil || sequenceTablet == nil { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get the primary tablet for keyspace %s: %v", + seq.backingTableKeyspace, ierr) + } + // FIXME: check for db name override + query := sqlparser.BuildParsedQuery(sqlInitSequenceTable, + seq.backingTableDBName, + seq.backingTableName, + nextVal, + nextVal, + nextVal, + ) + _, ierr = ts.ws.tmc.ExecuteFetchAsApp(ctx, sequenceTablet.Tablet, true, &tabletmanagerdatapb.ExecuteFetchAsAppRequest{ + Query: []byte(query.Query), + MaxRows: 1, + }) + if ierr != nil { + vterr := vterrors.Errorf(vtrpcpb.Code_INTERNAL, + "failed to initialize the backing sequence table %s.%s: %v", + seq.backingTableDBName, seq.backingTableName, ierr) + return vterr + } + return nil +} + +func (ts *trafficSwitcher) initializeTargetSequences(ctx context.Context, sequencesByBackingTable map[string]*sequenceMetadata) error { + maxValues, err := ts.getMaxSequenceValues(ctx, sequencesByBackingTable) + if err != nil { + return err + } + for _, sm := range sequencesByBackingTable { + maxValue := maxValues[sm.backingTableName] + if maxValue == 0 { + continue + } + if err := ts.updateSequenceValue(ctx, sm, maxValue); err != nil { + return err + } + } + return nil +} + +func (ts *trafficSwitcher) isSequenceParticipating(ctx context.Context) (bool, error) { + vschema, err := ts.TopoServer().GetVSchema(ctx, ts.targetKeyspace) + if err != nil { + return false, err + } + if vschema == nil || len(vschema.Tables) == 0 { + return false, nil + } + sequenceFound := false + for _, table := range ts.Tables() { + vs, ok := vschema.Tables[table] + if !ok || vs == nil { + continue + } + if vs.Type == vindexes.TypeSequence { + sequenceFound = true + break + } + } + return sequenceFound, nil +} + +// getTargetSequenceMetadata returns a map of sequence metadata keyed by the +// backing sequence table name. If the target keyspace has no tables +// defined that use sequences for auto_increment generation then a nil +// map will be returned. +func (ts *trafficSwitcher) getTargetSequenceMetadata(ctx context.Context) (map[string]*sequenceMetadata, error) { + vschema, err := ts.TopoServer().GetVSchema(ctx, ts.targetKeyspace) + if err != nil { + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get vschema for target keyspace %s: %v", + ts.targetKeyspace, err) + } + if vschema == nil || len(vschema.Tables) == 0 { // Nothing to do + return nil, nil + } + + sequencesByBackingTable, backingTablesFound, err := ts.findSequenceUsageInKeyspace(vschema.Keyspace) + if err != nil { + return nil, err + } + // If all of the sequence tables were defined using qualified table + // names then we don't need to search for them in other keyspaces. + if len(sequencesByBackingTable) == 0 || backingTablesFound { + return sequencesByBackingTable, nil + } + + if err := ctx.Err(); err != nil { + return nil, err + } + + // Now we need to locate the backing sequence table(s) which will + // be in another unsharded keyspace. + smMu := sync.Mutex{} + tableCount := len(sequencesByBackingTable) + tablesFound := make(map[string]struct{}) // Used to short circuit the search + // Define the function used to search each keyspace. + searchKeyspace := func(sctx context.Context, done chan struct{}, keyspace string) error { + kvs, kerr := ts.TopoServer().GetVSchema(sctx, keyspace) + if kerr != nil { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get vschema for keyspace %s: %v", + keyspace, kerr) + } + if kvs == nil || kvs.Sharded || len(kvs.Tables) == 0 { + return nil + } + for tableName, tableDef := range kvs.Tables { + // The table name can be escaped in the vschema definition. + unescapedTableName, err := sqlescape.UnescapeID(tableName) + if err != nil { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid table name %q in keyspace %s: %v", + tableName, keyspace, err) + } + select { + case <-sctx.Done(): + return sctx.Err() + case <-done: // We've found everything we need in other goroutines + return nil + default: + } + if complete := func() bool { + smMu.Lock() // Prevent concurrent access to the map + defer smMu.Unlock() + sm := sequencesByBackingTable[unescapedTableName] + if tableDef != nil && tableDef.Type == vindexes.TypeSequence && + sm != nil && unescapedTableName == sm.backingTableName { + tablesFound[tableName] = struct{}{} // This is also protected by the mutex + sm.backingTableKeyspace = keyspace + // Set the default keyspace name. We will later check to + // see if the tablet we send requests to is using a dbname + // override and use that if it is. + sm.backingTableDBName = "vt_" + keyspace + if len(tablesFound) == tableCount { // Short circuit the search + select { + case <-done: // It's already been closed + return true + default: + close(done) // Mark the search as completed + return true + } + } + } + return false + }(); complete { + return nil + } + } + return nil + } + keyspaces, err := ts.TopoServer().GetKeyspaces(ctx) + if err != nil { + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get keyspaces: %v", err) + } + searchGroup, gctx := errgroup.WithContext(ctx) + searchCompleted := make(chan struct{}) + for _, keyspace := range keyspaces { + // The keyspace name could be escaped so we need to unescape it. + ks, err := sqlescape.UnescapeID(keyspace) + if err != nil { // Should never happen + return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid keyspace name %q: %v", keyspace, err) + } + searchGroup.Go(func() error { + return searchKeyspace(gctx, searchCompleted, ks) + }) + } + if err := searchGroup.Wait(); err != nil { + return nil, err + } + + if len(tablesFound) != tableCount { + // Try and create the missing backing sequence tables if we can. + if err := ts.createMissingSequenceTables(ctx, sequencesByBackingTable, tablesFound); err != nil { + return nil, err + } + } + + return sequencesByBackingTable, nil +} + +// createMissingSequenceTables will create the backing sequence tables for those that +// could not be found in any current keyspace. +func (ts trafficSwitcher) createMissingSequenceTables(ctx context.Context, sequencesByBackingTable map[string]*sequenceMetadata, tablesFound map[string]struct{}) error { + globalKeyspace := ts.options.GetGlobalKeyspace() + if globalKeyspace == "" { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to locate all of the backing sequence tables being used and no global-keyspace was provided to auto create them in: %s", + strings.Join(maps.Keys(sequencesByBackingTable), ",")) + } + shards, err := ts.ws.ts.GetShardNames(ctx, globalKeyspace) + if err != nil { + return err + } + if len(shards) != 1 { + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "global-keyspace %s is not unsharded", globalKeyspace) + } + globalVSchema, err := ts.ws.ts.GetVSchema(ctx, globalKeyspace) + if err != nil { + return err + } + updatedGlobalVSchema := false + for tableName, sequenceMetadata := range sequencesByBackingTable { + if _, ok := tablesFound[tableName]; !ok { + // Create the backing table. + shard, err := ts.ws.ts.GetShard(ctx, globalKeyspace, shards[0]) + if err != nil { + return err + } + if shard.PrimaryAlias == nil { + return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "global-keyspace %s does not currently have a primary tablet", + globalKeyspace) + } + primary, err := ts.ws.ts.GetTablet(ctx, shard.PrimaryAlias) + if err != nil { + return err + } + escapedTableName, err := sqlescape.EnsureEscaped(tableName) + if err != nil { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid table name %s: %v", + tableName, err) + } + stmt := sqlparser.BuildParsedQuery(sqlCreateSequenceTable, escapedTableName) + _, err = ts.ws.tmc.ApplySchema(ctx, primary.Tablet, &tmutils.SchemaChange{ + SQL: stmt.Query, + Force: false, + AllowReplication: true, + SQLMode: vreplication.SQLMode, + DisableForeignKeyChecks: true, + }) + if err != nil { + return vterrors.Wrapf(err, "failed to create sequence backing table %s in global-keyspace %s", + tableName, globalKeyspace) + } + if bt := globalVSchema.Tables[sequenceMetadata.backingTableName]; bt == nil { + if globalVSchema.Tables == nil { + globalVSchema.Tables = make(map[string]*vschemapb.Table) + } + globalVSchema.Tables[tableName] = &vschemapb.Table{ + Type: vindexes.TypeSequence, + } + updatedGlobalVSchema = true + sequenceMetadata.backingTableDBName = "vt_" + globalKeyspace // This will be overridden later if needed + sequenceMetadata.backingTableKeyspace = globalKeyspace + } + } + } + if updatedGlobalVSchema { + err = ts.ws.ts.SaveVSchema(ctx, globalVSchema) + if err != nil { + return vterrors.Wrapf(err, "failed to update vschema in the global-keyspace %s", globalKeyspace) + } + } + return nil +} + +// findSequenceUsageInKeyspace searches the keyspace's vschema for usage +// of sequences. It returns a map of sequence metadata keyed by the backing +// sequence table name -- if any usage is found -- along with a boolean to +// indicate if all of the backing sequence tables were defined using +// qualified table names (so we know where they all live) along with an +// error if any is seen. +func (ts *trafficSwitcher) findSequenceUsageInKeyspace(vschema *vschemapb.Keyspace) (map[string]*sequenceMetadata, bool, error) { + allFullyQualified := true + targets := maps.Values(ts.Targets()) + if len(targets) == 0 || targets[0].GetPrimary() == nil { // This should never happen + return nil, false, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "no primary tablet found for target keyspace %s", ts.targetKeyspace) + } + targetDBName := targets[0].GetPrimary().DbName() + sequencesByBackingTable := make(map[string]*sequenceMetadata) + + for _, table := range ts.tables { + seqTable, ok := vschema.Tables[table] + if !ok || seqTable.GetAutoIncrement().GetSequence() == "" { + continue + } + // Be sure that the table name is unescaped as it can be escaped + // in the vschema. + unescapedTable, err := sqlescape.UnescapeID(table) + if err != nil { + return nil, false, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid table name %q defined in the sequence table %+v: %v", + table, seqTable, err) + } + sm := &sequenceMetadata{ + usingTableName: unescapedTable, + usingTableDBName: targetDBName, + } + // If the sequence table is fully qualified in the vschema then + // we don't need to find it later. + if strings.Contains(seqTable.AutoIncrement.Sequence, ".") { + keyspace, tableName, found := strings.Cut(seqTable.AutoIncrement.Sequence, ".") + if !found { + return nil, false, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid sequence table name %q defined in the %s keyspace", + seqTable.AutoIncrement.Sequence, ts.targetKeyspace) + } + // Unescape the table name and keyspace name as they may be escaped in the + // vschema definition if they e.g. contain dashes. + if keyspace, err = sqlescape.UnescapeID(keyspace); err != nil { + return nil, false, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid keyspace in qualified sequence table name %q defined in sequence table %+v: %v", + seqTable.AutoIncrement.Sequence, seqTable, err) + } + if tableName, err = sqlescape.UnescapeID(tableName); err != nil { + return nil, false, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid qualified sequence table name %q defined in sequence table %+v: %v", + seqTable.AutoIncrement.Sequence, seqTable, err) + } + sm.backingTableKeyspace = keyspace + sm.backingTableName = tableName + // Update the definition with the unescaped values. + seqTable.AutoIncrement.Sequence = fmt.Sprintf("%s.%s", keyspace, tableName) + // Set the default keyspace name. We will later check to + // see if the tablet we send requests to is using a dbname + // override and use that if it is. + sm.backingTableDBName = "vt_" + keyspace + } else { + sm.backingTableName, err = sqlescape.UnescapeID(seqTable.AutoIncrement.Sequence) + if err != nil { + return nil, false, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid sequence table name %q defined in sequence table %+v: %v", + seqTable.AutoIncrement.Sequence, seqTable, err) + } + seqTable.AutoIncrement.Sequence = sm.backingTableName + allFullyQualified = false + } + // The column names can be escaped in the vschema definition. + for i := range seqTable.ColumnVindexes { + var ( + unescapedColumn string + err error + ) + if len(seqTable.ColumnVindexes[i].Columns) > 0 { + for n := range seqTable.ColumnVindexes[i].Columns { + unescapedColumn, err = sqlescape.UnescapeID(seqTable.ColumnVindexes[i].Columns[n]) + seqTable.ColumnVindexes[i].Columns[n] = unescapedColumn + } + } else { + // This is the legacy vschema definition. + unescapedColumn, err = sqlescape.UnescapeID(seqTable.ColumnVindexes[i].Column) + seqTable.ColumnVindexes[i].Column = unescapedColumn + } + if err != nil { + return nil, false, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid sequence column vindex name %q defined in sequence table %+v: %v", + seqTable.ColumnVindexes[i].Column, seqTable, err) + } + } + unescapedAutoIncCol, err := sqlescape.UnescapeID(seqTable.AutoIncrement.Column) + if err != nil { + return nil, false, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid auto-increment column name %q defined in sequence table %+v: %v", + seqTable.AutoIncrement.Column, seqTable, err) + } + seqTable.AutoIncrement.Column = unescapedAutoIncCol + sm.usingTableDefinition = seqTable + sequencesByBackingTable[sm.backingTableName] = sm + } + + return sequencesByBackingTable, allFullyQualified, nil +} + +// initializeTargetSequences initializes the backing sequence tables +// using a map keyed by the backing sequence table name. +// +// The backing tables must have already been created, unless a default +// global keyspace exists for the trafficSwitcher -- in which case we +// will create the backing table there if needed. + +// This function will then ensure that the next value is set to a value +// greater than any currently stored in the using table on the target +// keyspace. If the backing table is updated to a new higher value then +// it will also tell the primary tablet serving the sequence to +// refresh/reset its cache to be sure that it does not provide a value +// that is less than the current max. +func (ts *trafficSwitcher) initializeTargetSequences2(ctx context.Context, sequencesByBackingTable map[string]*sequenceMetadata) error { + initSequenceTable := func(ictx context.Context, sequenceMetadata *sequenceMetadata) error { + // Now we need to run this query on the target shards in order + // to get the max value and set the next id for the sequence to + // a higher value. + shardResults := make([]int64, 0, len(ts.TargetShards())) + srMu := sync.Mutex{} + ierr := ts.ForAllTargets(func(target *MigrationTarget) error { + primary := target.GetPrimary() + if primary == nil || primary.GetAlias() == nil { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "no primary tablet found for target shard %s/%s", + ts.targetKeyspace, target.GetShard().ShardName()) + } + usingCol, err := sqlescape.EnsureEscaped(sequenceMetadata.usingTableDefinition.AutoIncrement.Column) + if err != nil { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid column name %s specified for sequence in table %s: %v", + sequenceMetadata.usingTableDefinition.AutoIncrement.Column, sequenceMetadata.usingTableName, err) + } + usingDB, err := sqlescape.EnsureEscaped(sequenceMetadata.usingTableDBName) + if err != nil { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid database name %s specified for sequence in table %s: %v", + sequenceMetadata.usingTableDBName, sequenceMetadata.usingTableName, err) + } + usingTable, err := sqlescape.EnsureEscaped(sequenceMetadata.usingTableName) + if err != nil { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid sequence table name specified for sequence in table %s: %v", + sequenceMetadata.usingTableName, err) + } + query := sqlparser.BuildParsedQuery(sqlGetMaxSequenceVal, + usingCol, + usingDB, + usingTable, + ) + qr, terr := ts.ws.tmc.ExecuteFetchAsApp(ictx, primary.Tablet, true, &tabletmanagerdatapb.ExecuteFetchAsAppRequest{ + Query: []byte(query.Query), + MaxRows: 1, + }) + if terr != nil || len(qr.Rows) != 1 { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get the max used sequence value for target table %s.%s on tablet %s in order to initialize the backing sequence table: %v", + ts.targetKeyspace, sequenceMetadata.usingTableName, topoproto.TabletAliasString(primary.Alias), terr) + } + rawVal := sqltypes.Proto3ToResult(qr).Rows[0][0] + maxID := int64(0) + if !rawVal.IsNull() { // If it's NULL then there are no rows and 0 remains the max + maxID, terr = rawVal.ToInt64() + if terr != nil { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get the max used sequence value for target table %s.%s on tablet %s in order to initialize the backing sequence table: %v", + ts.targetKeyspace, sequenceMetadata.usingTableName, topoproto.TabletAliasString(primary.Alias), terr) + } + } + srMu.Lock() + defer srMu.Unlock() + shardResults = append(shardResults, maxID) + return nil + }) + if ierr != nil { + return ierr + } + select { + case <-ictx.Done(): + return ictx.Err() + default: + } + if len(shardResults) == 0 { // This should never happen + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "did not get any results for the max used sequence value for target table %s.%s in order to initialize the backing sequence table", + ts.targetKeyspace, sequenceMetadata.usingTableName) + } + // Sort the values to find the max value across all shards. + sort.Slice(shardResults, func(i, j int) bool { + return shardResults[i] < shardResults[j] + }) + nextVal := shardResults[len(shardResults)-1] + 1 + // Now we need to update the sequence table, if needed, in order to + // ensure that that the next value it provides is > the current max. + sequenceShard, ierr := ts.TopoServer().GetOnlyShard(ictx, sequenceMetadata.backingTableKeyspace) + if ierr != nil || sequenceShard == nil || sequenceShard.PrimaryAlias == nil { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get the primary tablet for keyspace %s: %v", + sequenceMetadata.backingTableKeyspace, ierr) + } + sequenceTablet, ierr := ts.TopoServer().GetTablet(ictx, sequenceShard.PrimaryAlias) + if ierr != nil || sequenceTablet == nil { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get the primary tablet for keyspace %s: %v", + sequenceMetadata.backingTableKeyspace, ierr) + } + select { + case <-ictx.Done(): + return ictx.Err() + default: + } + if sequenceTablet.DbNameOverride != "" { + sequenceMetadata.backingTableDBName = sequenceTablet.DbNameOverride + } + backingDB, err := sqlescape.EnsureEscaped(sequenceMetadata.backingTableDBName) + if err != nil { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid database name %s in sequence backing table %s: %v", + sequenceMetadata.backingTableDBName, sequenceMetadata.backingTableName, err) + } + backingTable, err := sqlescape.EnsureEscaped(sequenceMetadata.backingTableName) + if err != nil { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid sequence backing table name %s: %v", + sequenceMetadata.backingTableName, err) + } + query := sqlparser.BuildParsedQuery(sqlInitSequenceTable, + backingDB, + backingTable, + nextVal, + nextVal, + nextVal, + ) + // Now execute this on the primary tablet of the unsharded keyspace + // housing the backing table. + initialize: + qr, ierr := ts.ws.tmc.ExecuteFetchAsApp(ictx, sequenceTablet.Tablet, true, &tabletmanagerdatapb.ExecuteFetchAsAppRequest{ + Query: []byte(query.Query), + MaxRows: 1, + }) + if ierr != nil { + vterr := vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to initialize the backing sequence table %s.%s: %v", + sequenceMetadata.backingTableDBName, sequenceMetadata.backingTableName, ierr) + // If the sequence table doesn't exist, let's try and create it, otherwise + // return the error. + if sqlErr, ok := sqlerror.NewSQLErrorFromError(ierr).(*sqlerror.SQLError); !ok || + (sqlErr.Num != sqlerror.ERNoSuchTable && sqlErr.Num != sqlerror.ERBadTable) { + return vterr + } + stmt := sqlparser.BuildParsedQuery(sqlCreateSequenceTable, backingTable) + _, ierr = ts.ws.tmc.ApplySchema(ctx, sequenceTablet.Tablet, &tmutils.SchemaChange{ + SQL: stmt.Query, + Force: false, + AllowReplication: true, + SQLMode: vreplication.SQLMode, + DisableForeignKeyChecks: true, + }) + if ierr != nil { + return vterrors.Wrapf(vterr, "could not create missing sequence table: %v", err) + } + select { + case <-ctx.Done(): + return vterrors.Wrapf(vterr, "could not create missing sequence table: %v", ctx.Err()) + default: + goto initialize + } + } + // If we actually updated the backing sequence table, then we need + // to tell the primary tablet managing the sequence to refresh/reset + // its cache for the table. + if qr.RowsAffected == 0 { + return nil + } + select { + case <-ictx.Done(): + return ictx.Err() + default: + } + ts.Logger().Infof("Resetting sequence cache for backing table %s on shard %s/%s using tablet %s", + sequenceMetadata.backingTableName, sequenceShard.Keyspace(), sequenceShard.ShardName(), sequenceShard.PrimaryAlias) + ti, ierr := ts.TopoServer().GetTablet(ictx, sequenceShard.PrimaryAlias) + if ierr != nil { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get primary tablet for keyspace %s: %v", + sequenceMetadata.backingTableKeyspace, ierr) + } + // ResetSequences interfaces with the schema engine and the actual + // table identifiers DO NOT contain the backticks. So we have to + // ensure that the table name is unescaped. + unescapedBackingTable, err := sqlescape.UnescapeID(backingTable) + if err != nil { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid sequence backing table name %s: %v", backingTable, err) + } + ierr = ts.TabletManagerClient().ResetSequences(ictx, ti.Tablet, []string{unescapedBackingTable}) + if ierr != nil { + return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to reset the sequence cache for backing table %s on shard %s/%s using tablet %s: %v", + sequenceMetadata.backingTableName, sequenceShard.Keyspace(), sequenceShard.ShardName(), sequenceShard.PrimaryAlias, ierr) + } + return nil + } + + initGroup, gctx := errgroup.WithContext(ctx) + for _, sequenceMetadata := range sequencesByBackingTable { + initGroup.Go(func() error { + return initSequenceTable(gctx, sequenceMetadata) + }) + } + return initGroup.Wait() +} + +func (ts *trafficSwitcher) mustResetSequences(ctx context.Context) (bool, error) { + switch ts.workflowType { + case binlogdatapb.VReplicationWorkflowType_Migrate, + binlogdatapb.VReplicationWorkflowType_MoveTables: + return ts.isSequenceParticipating(ctx) + default: + return false, nil + } +} + +func (ts *trafficSwitcher) resetSequences(ctx context.Context) error { + var err error + mustReset := false + if mustReset, err = ts.mustResetSequences(ctx); err != nil { + return err + } + if !mustReset { + return nil + } + return ts.ForAllSources(func(source *MigrationSource) error { + ts.Logger().Infof("Resetting sequences for source shard %s.%s on tablet %s", + source.GetShard().Keyspace(), source.GetShard().ShardName(), topoproto.TabletAliasString(source.GetPrimary().GetAlias())) + return ts.TabletManagerClient().ResetSequences(ctx, source.GetPrimary().Tablet, ts.Tables()) + }) +} diff --git a/go/vt/vtctl/workflow/sequences_test.go b/go/vt/vtctl/workflow/sequences_test.go new file mode 100644 index 00000000000..537eb204558 --- /dev/null +++ b/go/vt/vtctl/workflow/sequences_test.go @@ -0,0 +1,601 @@ +package workflow + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/sqlescape" + "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/vt/mysqlctl/tmutils" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + "vitess.io/vitess/go/vt/proto/vschema" + vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" + "vitess.io/vitess/go/vt/sqlparser" + "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" +) + +func TestInitializeTargetSequences(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + workflowName := "wf1" + tableName := "t1" + sourceKeyspaceName := "sourceks" + targetKeyspaceName := "targetks" + + schema := map[string]*tabletmanagerdatapb.SchemaDefinition{ + tableName: { + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ + { + Name: tableName, + Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", tableName), + }, + }, + }, + } + + sourceKeyspace := &testKeyspace{ + KeyspaceName: sourceKeyspaceName, + ShardNames: []string{"0"}, + } + targetKeyspace := &testKeyspace{ + KeyspaceName: targetKeyspaceName, + ShardNames: []string{"0"}, + } + + env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) + defer env.close() + env.tmc.schema = schema + + ts, _, err := env.ws.getWorkflowState(ctx, targetKeyspaceName, workflowName) + require.NoError(t, err) + sw := &switcher{ts: ts, s: env.ws} + + sequencesByBackingTable := map[string]*sequenceMetadata{ + "my-seq1": { + backingTableName: "my-seq1", + backingTableKeyspace: sourceKeyspaceName, + backingTableDBName: fmt.Sprintf("vt_%s", sourceKeyspaceName), + usingTableName: tableName, + usingTableDBName: "vt_targetks", + usingTableDefinition: &vschema.Table{ + AutoIncrement: &vschema.AutoIncrement{ + Column: "my-col", + Sequence: fmt.Sprintf("%s.my-seq1", sourceKeyspace.KeyspaceName), + }, + }, + }, + } + + env.tmc.expectVRQuery(200, "/select max.*", sqltypes.MakeTestResult(sqltypes.MakeTestFields("maxval", "int64"), "34")) + // Expect the insert query to be executed with 35 as a params, since we provide a maxID of 34 in the last query + env.tmc.expectVRQuery(100, "/insert into.*35.*", &sqltypes.Result{RowsAffected: 1}) + + err = sw.initializeTargetSequences(ctx, sequencesByBackingTable) + assert.NoError(t, err) + + // Expect the queries to be cleared + assert.Emptyf(t, env.tmc.vrQueries[100], "expected no queries to be executed, found: %q", env.tmc.vrQueries[100]) + assert.Empty(t, env.tmc.vrQueries[200], "expected no queries to be executed, found: %q", env.tmc.vrQueries[200]) +} + +func TestGetTargetSequenceMetadata(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + cell := "cell1" + workflow := "wf1" + table := "`t1`" + tableDDL := "create table t1 (id int not null auto_increment primary key, c1 varchar(10))" + table2 := "t2" + unescapedTable := "t1" + sourceKeyspace := &testKeyspace{ + KeyspaceName: "source-ks", + ShardNames: []string{"0"}, + } + targetKeyspace := &testKeyspace{ + KeyspaceName: "targetks", + ShardNames: []string{"-80", "80-"}, + } + vindexes := map[string]*vschema.Vindex{ + "xxhash": { + Type: "xxhash", + }, + } + env := newTestEnv(t, ctx, cell, sourceKeyspace, targetKeyspace) + defer env.close() + + env.tmc.schema = map[string]*tabletmanagerdatapb.SchemaDefinition{ + unescapedTable: { + TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ + { + Name: unescapedTable, + Schema: tableDDL, + }, + }, + }, + } + + type testCase struct { + name string + sourceVSchema *vschema.Keyspace + targetVSchema *vschema.Keyspace + options *vtctldatapb.WorkflowOptions + want map[string]*sequenceMetadata + expectSourceApplySchemaRequestResponse *applySchemaRequestResponse + err string + } + tests := []testCase{ + { + name: "no sequences", + want: nil, + }, + { + name: "sequences with backticks and qualified table", + sourceVSchema: &vschema.Keyspace{ + Vindexes: vindexes, + Tables: map[string]*vschema.Table{ + "`my-seq1`": { + Type: "sequence", + }, + }, + }, + targetVSchema: &vschema.Keyspace{ + Vindexes: vindexes, + Tables: map[string]*vschema.Table{ + table: { + ColumnVindexes: []*vschema.ColumnVindex{ + { + Name: "xxhash", + Column: "`my-col`", + }, + }, + AutoIncrement: &vschema.AutoIncrement{ + Column: "`my-col`", + Sequence: fmt.Sprintf("`%s`.`my-seq1`", sourceKeyspace.KeyspaceName), + }, + }, + }, + }, + want: map[string]*sequenceMetadata{ + "my-seq1": { + backingTableName: "my-seq1", + backingTableKeyspace: "source-ks", + backingTableDBName: "vt_source-ks", + usingTableName: unescapedTable, + usingTableDBName: "vt_targetks", + usingTableDefinition: &vschema.Table{ + ColumnVindexes: []*vschema.ColumnVindex{ + { + Column: "my-col", + Name: "xxhash", + }, + }, + AutoIncrement: &vschema.AutoIncrement{ + Column: "my-col", + Sequence: fmt.Sprintf("%s.my-seq1", sourceKeyspace.KeyspaceName), + }, + }, + }, + }, + }, + { + name: "auto_increment replaced with sequence", + sourceVSchema: &vschema.Keyspace{ + Vindexes: vindexes, + Tables: map[string]*vschema.Table{}, // Sequence table will be created + }, + options: &vtctldatapb.WorkflowOptions{ + ShardedAutoIncrementHandling: vtctldatapb.ShardedAutoIncrementHandling_REPLACE, + GlobalKeyspace: sourceKeyspace.KeyspaceName, + }, + expectSourceApplySchemaRequestResponse: &applySchemaRequestResponse{ + change: &tmutils.SchemaChange{ + SQL: sqlparser.BuildParsedQuery(sqlCreateSequenceTable, + sqlescape.EscapeID(fmt.Sprintf(autoSequenceTableFormat, unescapedTable))).Query, + Force: false, + AllowReplication: true, + SQLMode: vreplication.SQLMode, + DisableForeignKeyChecks: true, + }, + res: &tabletmanagerdatapb.SchemaChangeResult{}, + }, + targetVSchema: &vschema.Keyspace{ + Vindexes: vindexes, + Tables: map[string]*vschema.Table{ + table: { + ColumnVindexes: []*vschema.ColumnVindex{ + { + Name: "xxhash", + Column: "`my-col`", + }, + }, + AutoIncrement: &vschema.AutoIncrement{ + Column: "my-col", + Sequence: fmt.Sprintf(autoSequenceTableFormat, unescapedTable), + }, + }, + }, + }, + want: map[string]*sequenceMetadata{ + fmt.Sprintf(autoSequenceTableFormat, unescapedTable): { + backingTableName: fmt.Sprintf(autoSequenceTableFormat, unescapedTable), + backingTableKeyspace: "source-ks", + backingTableDBName: "vt_source-ks", + usingTableName: unescapedTable, + usingTableDBName: "vt_targetks", + usingTableDefinition: &vschema.Table{ + ColumnVindexes: []*vschema.ColumnVindex{ + { + Column: "my-col", + Name: "xxhash", + }, + }, + AutoIncrement: &vschema.AutoIncrement{ + Column: "my-col", + Sequence: fmt.Sprintf(autoSequenceTableFormat, unescapedTable), + }, + }, + }, + }, + }, + { + name: "sequences with backticks", + sourceVSchema: &vschema.Keyspace{ + Vindexes: vindexes, + Tables: map[string]*vschema.Table{ + "`my-seq1`": { + Type: "sequence", + }, + }, + }, + targetVSchema: &vschema.Keyspace{ + Vindexes: vindexes, + Tables: map[string]*vschema.Table{ + table: { + ColumnVindexes: []*vschema.ColumnVindex{ + { + Name: "xxhash", + Column: "`my-col`", + }, + }, + AutoIncrement: &vschema.AutoIncrement{ + Column: "`my-col`", + Sequence: "`my-seq1`", + }, + }, + }, + }, + want: map[string]*sequenceMetadata{ + "my-seq1": { + backingTableName: "my-seq1", + backingTableKeyspace: "source-ks", + backingTableDBName: "vt_source-ks", + usingTableName: unescapedTable, + usingTableDBName: "vt_targetks", + usingTableDefinition: &vschema.Table{ + ColumnVindexes: []*vschema.ColumnVindex{ + { + Column: "my-col", + Name: "xxhash", + }, + }, + AutoIncrement: &vschema.AutoIncrement{ + Column: "my-col", + Sequence: "my-seq1", + }, + }, + }, + }, + }, + { + name: "sequences using vindexes with both column definition structures", + sourceVSchema: &vschema.Keyspace{ + Vindexes: vindexes, + Tables: map[string]*vschema.Table{ + "seq1": { + Type: "sequence", + }, + "seq2": { + Type: "sequence", + }, + }, + }, + targetVSchema: &vschema.Keyspace{ + Vindexes: vindexes, + Tables: map[string]*vschema.Table{ + table: { + ColumnVindexes: []*vschema.ColumnVindex{ + { + Name: "xxhash", + Column: "col1", + }, + }, + AutoIncrement: &vschema.AutoIncrement{ + Column: "col1", + Sequence: fmt.Sprintf("%s.seq1", sourceKeyspace.KeyspaceName), + }, + }, + table2: { + ColumnVindexes: []*vschema.ColumnVindex{ + { + Name: "xxhash", + Columns: []string{"col2"}, + }, + }, + AutoIncrement: &vschema.AutoIncrement{ + Column: "col2", + Sequence: fmt.Sprintf("%s.seq2", sourceKeyspace.KeyspaceName), + }, + }, + }, + }, + want: map[string]*sequenceMetadata{ + "seq1": { + backingTableName: "seq1", + backingTableKeyspace: "source-ks", + backingTableDBName: "vt_source-ks", + usingTableName: unescapedTable, + usingTableDBName: "vt_targetks", + usingTableDefinition: &vschema.Table{ + ColumnVindexes: []*vschema.ColumnVindex{ + { + Column: "col1", + Name: "xxhash", + }, + }, + AutoIncrement: &vschema.AutoIncrement{ + Column: "col1", + Sequence: fmt.Sprintf("%s.seq1", sourceKeyspace.KeyspaceName), + }, + }, + }, + "seq2": { + backingTableName: "seq2", + backingTableKeyspace: "source-ks", + backingTableDBName: "vt_source-ks", + usingTableName: table2, + usingTableDBName: "vt_targetks", + usingTableDefinition: &vschema.Table{ + ColumnVindexes: []*vschema.ColumnVindex{ + { + Columns: []string{"col2"}, + Name: "xxhash", + }, + }, + AutoIncrement: &vschema.AutoIncrement{ + Column: "col2", + Sequence: fmt.Sprintf("%s.seq2", sourceKeyspace.KeyspaceName), + }, + }, + }, + }, + }, + { + name: "sequence with table having mult-col vindex", + sourceVSchema: &vschema.Keyspace{ + Vindexes: vindexes, + Tables: map[string]*vschema.Table{ + "seq1": { + Type: "sequence", + }, + }, + }, + targetVSchema: &vschema.Keyspace{ + Vindexes: vindexes, + Tables: map[string]*vschema.Table{ + table: { + ColumnVindexes: []*vschema.ColumnVindex{ + { + Name: "xxhash", + Columns: []string{"col3", "col4"}, + }, + }, + AutoIncrement: &vschema.AutoIncrement{ + Column: "col1", + Sequence: fmt.Sprintf("%s.seq1", sourceKeyspace.KeyspaceName), + }, + }, + }, + }, + want: map[string]*sequenceMetadata{ + "seq1": { + backingTableName: "seq1", + backingTableKeyspace: "source-ks", + backingTableDBName: "vt_source-ks", + usingTableName: unescapedTable, + usingTableDBName: "vt_targetks", + usingTableDefinition: &vschema.Table{ + ColumnVindexes: []*vschema.ColumnVindex{ + { + Columns: []string{"col3", "col4"}, + Name: "xxhash", + }, + }, + AutoIncrement: &vschema.AutoIncrement{ + Column: "col1", + Sequence: fmt.Sprintf("%s.seq1", sourceKeyspace.KeyspaceName), + }, + }, + }, + }, + }, + { + name: "invalid table name", + sourceVSchema: &vschema.Keyspace{ + Vindexes: vindexes, + Tables: map[string]*vschema.Table{ + "`my-`seq1`": { + Type: "sequence", + }, + }, + }, + targetVSchema: &vschema.Keyspace{ + Vindexes: vindexes, + Tables: map[string]*vschema.Table{ + table: { + ColumnVindexes: []*vschema.ColumnVindex{ + { + Name: "xxhash", + Column: "`my-col`", + }, + }, + AutoIncrement: &vschema.AutoIncrement{ + Column: "`my-col`", + Sequence: "`my-seq1`", + }, + }, + }, + }, + err: "invalid table name \"`my-`seq1`\" in keyspace source-ks: UnescapeID err: unexpected single backtick at position 3 in 'my-`seq1'", + }, + { + name: "invalid keyspace name", + sourceVSchema: &vschema.Keyspace{ + Vindexes: vindexes, + Tables: map[string]*vschema.Table{ + "`my-seq1`": { + Type: "sequence", + }, + }, + }, + targetVSchema: &vschema.Keyspace{ + Vindexes: vindexes, + Tables: map[string]*vschema.Table{ + table: { + ColumnVindexes: []*vschema.ColumnVindex{ + { + Name: "xxhash", + Column: "`my-col`", + }, + }, + AutoIncrement: &vschema.AutoIncrement{ + Column: "`my-col`", + Sequence: "`ks`1`.`my-seq1`", + }, + }, + }, + }, + err: "invalid keyspace in qualified sequence table name \"`ks`1`.`my-seq1`\" defined in sequence table column_vindexes:{column:\"`my-col`\" name:\"xxhash\"} auto_increment:{column:\"`my-col`\" sequence:\"`ks`1`.`my-seq1`\"}: UnescapeID err: unexpected single backtick at position 2 in 'ks`1'", + }, + { + name: "invalid auto-inc column name", + sourceVSchema: &vschema.Keyspace{ + Vindexes: vindexes, + Tables: map[string]*vschema.Table{ + "`my-seq1`": { + Type: "sequence", + }, + }, + }, + targetVSchema: &vschema.Keyspace{ + Vindexes: vindexes, + Tables: map[string]*vschema.Table{ + table: { + ColumnVindexes: []*vschema.ColumnVindex{ + { + Name: "xxhash", + Column: "`my-col`", + }, + }, + AutoIncrement: &vschema.AutoIncrement{ + Column: "`my`-col`", + Sequence: "`my-seq1`", + }, + }, + }, + }, + err: "invalid auto-increment column name \"`my`-col`\" defined in sequence table column_vindexes:{column:\"my-col\" name:\"xxhash\"} auto_increment:{column:\"`my`-col`\" sequence:\"my-seq1\"}: UnescapeID err: unexpected single backtick at position 2 in 'my`-col'", + }, + { + name: "invalid sequence name", + sourceVSchema: &vschema.Keyspace{ + Vindexes: vindexes, + Tables: map[string]*vschema.Table{ + "`my-seq1`": { + Type: "sequence", + }, + }, + }, + targetVSchema: &vschema.Keyspace{ + Vindexes: vindexes, + Tables: map[string]*vschema.Table{ + table: { + ColumnVindexes: []*vschema.ColumnVindex{ + { + Name: "xxhash", + Column: "`my-col`", + }, + }, + AutoIncrement: &vschema.AutoIncrement{ + Column: "`my-col`", + Sequence: "`my-`seq1`", + }, + }, + }, + }, + err: "invalid sequence table name \"`my-`seq1`\" defined in sequence table column_vindexes:{column:\"`my-col`\" name:\"xxhash\"} auto_increment:{column:\"`my-col`\" sequence:\"`my-`seq1`\"}: UnescapeID err: unexpected single backtick at position 3 in 'my-`seq1'", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + err := env.ts.SaveVSchema(ctx, &topo.KeyspaceVSchemaInfo{ + Name: sourceKeyspace.KeyspaceName, + Keyspace: tc.sourceVSchema, + }) + require.NoError(t, err) + err = env.ts.SaveVSchema(ctx, &topo.KeyspaceVSchemaInfo{ + Name: targetKeyspace.KeyspaceName, + Keyspace: tc.targetVSchema, + }) + require.NoError(t, err) + err = env.ts.RebuildSrvVSchema(ctx, nil) + require.NoError(t, err) + sources := make(map[string]*MigrationSource, len(sourceKeyspace.ShardNames)) + targets := make(map[string]*MigrationTarget, len(targetKeyspace.ShardNames)) + for i, shard := range sourceKeyspace.ShardNames { + tablet := env.tablets[sourceKeyspace.KeyspaceName][startingSourceTabletUID+(i*tabletUIDStep)] + sources[shard] = &MigrationSource{ + primary: &topo.TabletInfo{ + Tablet: tablet, + }, + } + if tc.expectSourceApplySchemaRequestResponse != nil { + env.tmc.expectApplySchemaRequest(tablet.Alias.Uid, tc.expectSourceApplySchemaRequestResponse) + } + } + for i, shard := range targetKeyspace.ShardNames { + tablet := env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID+(i*tabletUIDStep)] + targets[shard] = &MigrationTarget{ + primary: &topo.TabletInfo{ + Tablet: tablet, + }, + } + } + ts := &trafficSwitcher{ + id: 1, + ws: env.ws, + workflow: workflow, + tables: []string{table, table2}, + sourceKeyspace: sourceKeyspace.KeyspaceName, + targetKeyspace: targetKeyspace.KeyspaceName, + sources: sources, + targets: targets, + options: tc.options, + } + got, err := ts.getTargetSequenceMetadata(ctx) + if tc.err != "" { + require.EqualError(t, err, tc.err) + } else { + require.NoError(t, err) + } + require.EqualValues(t, tc.want, got) + }) + } +} diff --git a/go/vt/vtctl/workflow/server.go b/go/vt/vtctl/workflow/server.go index efa7d6448b0..b36e51dc253 100644 --- a/go/vt/vtctl/workflow/server.go +++ b/go/vt/vtctl/workflow/server.go @@ -88,23 +88,6 @@ type tableCopyProgress struct { // copyProgress stores the tableCopyProgress for all tables still being copied type copyProgress map[string]*tableCopyProgress -// sequenceMetadata contains all of the relevant metadata for a sequence that -// is being used by a table involved in a vreplication workflow. -type sequenceMetadata struct { - // The name of the sequence table. - backingTableName string - // The keyspace where the backing table lives. - backingTableKeyspace string - // The dbName in use by the keyspace where the backing table lives. - backingTableDBName string - // The name of the table using the sequence. - usingTableName string - // The dbName in use by the keyspace where the using table lives. - usingTableDBName string - // The using table definition. - usingTableDefinition *vschemapb.Table -} - // vdiffOutput holds the data from all shards that is needed to generate // the full summary results of the vdiff in the vdiff show command output. type vdiffOutput struct { diff --git a/go/vt/vtctl/workflow/traffic_switcher.go b/go/vt/vtctl/workflow/traffic_switcher.go index 7e1df4a07ce..1d86c1704c9 100644 --- a/go/vt/vtctl/workflow/traffic_switcher.go +++ b/go/vt/vtctl/workflow/traffic_switcher.go @@ -26,11 +26,9 @@ import ( "sync" "time" - "golang.org/x/exp/maps" "golang.org/x/sync/errgroup" "vitess.io/vitess/go/json2" - "vitess.io/vitess/go/mysql/sqlerror" "vitess.io/vitess/go/sqlescape" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/binlog/binlogplayer" @@ -38,7 +36,6 @@ import ( "vitess.io/vitess/go/vt/key" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/logutil" - "vitess.io/vitess/go/vt/mysqlctl/tmutils" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/topo/topoproto" @@ -1441,542 +1438,6 @@ func (ts *trafficSwitcher) gatherSourcePositions(ctx context.Context) error { }) } -func (ts *trafficSwitcher) isSequenceParticipating(ctx context.Context) (bool, error) { - vschema, err := ts.TopoServer().GetVSchema(ctx, ts.targetKeyspace) - if err != nil { - return false, err - } - if vschema == nil || len(vschema.Tables) == 0 { - return false, nil - } - sequenceFound := false - for _, table := range ts.Tables() { - vs, ok := vschema.Tables[table] - if !ok || vs == nil { - continue - } - if vs.Type == vindexes.TypeSequence { - sequenceFound = true - break - } - } - return sequenceFound, nil -} - -// getTargetSequenceMetadata returns a map of sequence metadata keyed by the -// backing sequence table name. If the target keyspace has no tables -// defined that use sequences for auto_increment generation then a nil -// map will be returned. -func (ts *trafficSwitcher) getTargetSequenceMetadata(ctx context.Context) (map[string]*sequenceMetadata, error) { - vschema, err := ts.TopoServer().GetVSchema(ctx, ts.targetKeyspace) - if err != nil { - return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get vschema for target keyspace %s: %v", - ts.targetKeyspace, err) - } - if vschema == nil || len(vschema.Tables) == 0 { // Nothing to do - return nil, nil - } - - sequencesByBackingTable, backingTablesFound, err := ts.findSequenceUsageInKeyspace(vschema.Keyspace) - if err != nil { - return nil, err - } - // If all of the sequence tables were defined using qualified table - // names then we don't need to search for them in other keyspaces. - if len(sequencesByBackingTable) == 0 || backingTablesFound { - return sequencesByBackingTable, nil - } - - if err := ctx.Err(); err != nil { - return nil, err - } - - // Now we need to locate the backing sequence table(s) which will - // be in another unsharded keyspace. - smMu := sync.Mutex{} - tableCount := len(sequencesByBackingTable) - tablesFound := make(map[string]struct{}) // Used to short circuit the search - // Define the function used to search each keyspace. - searchKeyspace := func(sctx context.Context, done chan struct{}, keyspace string) error { - kvs, kerr := ts.TopoServer().GetVSchema(sctx, keyspace) - if kerr != nil { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get vschema for keyspace %s: %v", - keyspace, kerr) - } - if kvs == nil || kvs.Sharded || len(kvs.Tables) == 0 { - return nil - } - for tableName, tableDef := range kvs.Tables { - // The table name can be escaped in the vschema definition. - unescapedTableName, err := sqlescape.UnescapeID(tableName) - if err != nil { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid table name %q in keyspace %s: %v", - tableName, keyspace, err) - } - select { - case <-sctx.Done(): - return sctx.Err() - case <-done: // We've found everything we need in other goroutines - return nil - default: - } - if complete := func() bool { - smMu.Lock() // Prevent concurrent access to the map - defer smMu.Unlock() - sm := sequencesByBackingTable[unescapedTableName] - if tableDef != nil && tableDef.Type == vindexes.TypeSequence && - sm != nil && unescapedTableName == sm.backingTableName { - tablesFound[tableName] = struct{}{} // This is also protected by the mutex - sm.backingTableKeyspace = keyspace - // Set the default keyspace name. We will later check to - // see if the tablet we send requests to is using a dbname - // override and use that if it is. - sm.backingTableDBName = "vt_" + keyspace - if len(tablesFound) == tableCount { // Short circuit the search - select { - case <-done: // It's already been closed - return true - default: - close(done) // Mark the search as completed - return true - } - } - } - return false - }(); complete { - return nil - } - } - return nil - } - keyspaces, err := ts.TopoServer().GetKeyspaces(ctx) - if err != nil { - return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get keyspaces: %v", err) - } - searchGroup, gctx := errgroup.WithContext(ctx) - searchCompleted := make(chan struct{}) - for _, keyspace := range keyspaces { - // The keyspace name could be escaped so we need to unescape it. - ks, err := sqlescape.UnescapeID(keyspace) - if err != nil { // Should never happen - return nil, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid keyspace name %q: %v", keyspace, err) - } - searchGroup.Go(func() error { - return searchKeyspace(gctx, searchCompleted, ks) - }) - } - if err := searchGroup.Wait(); err != nil { - return nil, err - } - - if len(tablesFound) != tableCount { - // Try and create the missing backing sequence tables if we can. - if err := ts.createMissingSequenceTables(ctx, sequencesByBackingTable, tablesFound); err != nil { - return nil, err - } - } - - return sequencesByBackingTable, nil -} - -// createMissingSequenceTables will create the backing sequence tables for those that -// could not be found in any current keyspace. -func (ts trafficSwitcher) createMissingSequenceTables(ctx context.Context, sequencesByBackingTable map[string]*sequenceMetadata, tablesFound map[string]struct{}) error { - globalKeyspace := ts.options.GetGlobalKeyspace() - if globalKeyspace == "" { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to locate all of the backing sequence tables being used and no global-keyspace was provided to auto create them in: %s", - strings.Join(maps.Keys(sequencesByBackingTable), ",")) - } - shards, err := ts.ws.ts.GetShardNames(ctx, globalKeyspace) - if err != nil { - return err - } - if len(shards) != 1 { - return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "global-keyspace %s is not unsharded", globalKeyspace) - } - globalVSchema, err := ts.ws.ts.GetVSchema(ctx, globalKeyspace) - if err != nil { - return err - } - updatedGlobalVSchema := false - for tableName, sequenceMetadata := range sequencesByBackingTable { - if _, ok := tablesFound[tableName]; !ok { - // Create the backing table. - shard, err := ts.ws.ts.GetShard(ctx, globalKeyspace, shards[0]) - if err != nil { - return err - } - if shard.PrimaryAlias == nil { - return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "global-keyspace %s does not currently have a primary tablet", - globalKeyspace) - } - primary, err := ts.ws.ts.GetTablet(ctx, shard.PrimaryAlias) - if err != nil { - return err - } - escapedTableName, err := sqlescape.EnsureEscaped(tableName) - if err != nil { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid table name %s: %v", - tableName, err) - } - stmt := sqlparser.BuildParsedQuery(sqlCreateSequenceTable, escapedTableName) - _, err = ts.ws.tmc.ApplySchema(ctx, primary.Tablet, &tmutils.SchemaChange{ - SQL: stmt.Query, - Force: false, - AllowReplication: true, - SQLMode: vreplication.SQLMode, - DisableForeignKeyChecks: true, - }) - if err != nil { - return vterrors.Wrapf(err, "failed to create sequence backing table %s in global-keyspace %s", - tableName, globalKeyspace) - } - if bt := globalVSchema.Tables[sequenceMetadata.backingTableName]; bt == nil { - if globalVSchema.Tables == nil { - globalVSchema.Tables = make(map[string]*vschemapb.Table) - } - globalVSchema.Tables[tableName] = &vschemapb.Table{ - Type: vindexes.TypeSequence, - } - updatedGlobalVSchema = true - sequenceMetadata.backingTableDBName = "vt_" + globalKeyspace // This will be overridden later if needed - sequenceMetadata.backingTableKeyspace = globalKeyspace - } - } - } - if updatedGlobalVSchema { - err = ts.ws.ts.SaveVSchema(ctx, globalVSchema) - if err != nil { - return vterrors.Wrapf(err, "failed to update vschema in the global-keyspace %s", globalKeyspace) - } - } - return nil -} - -// findSequenceUsageInKeyspace searches the keyspace's vschema for usage -// of sequences. It returns a map of sequence metadata keyed by the backing -// sequence table name -- if any usage is found -- along with a boolean to -// indicate if all of the backing sequence tables were defined using -// qualified table names (so we know where they all live) along with an -// error if any is seen. -func (ts *trafficSwitcher) findSequenceUsageInKeyspace(vschema *vschemapb.Keyspace) (map[string]*sequenceMetadata, bool, error) { - allFullyQualified := true - targets := maps.Values(ts.Targets()) - if len(targets) == 0 || targets[0].GetPrimary() == nil { // This should never happen - return nil, false, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "no primary tablet found for target keyspace %s", ts.targetKeyspace) - } - targetDBName := targets[0].GetPrimary().DbName() - sequencesByBackingTable := make(map[string]*sequenceMetadata) - - for _, table := range ts.tables { - seqTable, ok := vschema.Tables[table] - if !ok || seqTable.GetAutoIncrement().GetSequence() == "" { - continue - } - // Be sure that the table name is unescaped as it can be escaped - // in the vschema. - unescapedTable, err := sqlescape.UnescapeID(table) - if err != nil { - return nil, false, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid table name %q defined in the sequence table %+v: %v", - table, seqTable, err) - } - sm := &sequenceMetadata{ - usingTableName: unescapedTable, - usingTableDBName: targetDBName, - } - // If the sequence table is fully qualified in the vschema then - // we don't need to find it later. - if strings.Contains(seqTable.AutoIncrement.Sequence, ".") { - keyspace, tableName, found := strings.Cut(seqTable.AutoIncrement.Sequence, ".") - if !found { - return nil, false, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid sequence table name %q defined in the %s keyspace", - seqTable.AutoIncrement.Sequence, ts.targetKeyspace) - } - // Unescape the table name and keyspace name as they may be escaped in the - // vschema definition if they e.g. contain dashes. - if keyspace, err = sqlescape.UnescapeID(keyspace); err != nil { - return nil, false, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid keyspace in qualified sequence table name %q defined in sequence table %+v: %v", - seqTable.AutoIncrement.Sequence, seqTable, err) - } - if tableName, err = sqlescape.UnescapeID(tableName); err != nil { - return nil, false, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid qualified sequence table name %q defined in sequence table %+v: %v", - seqTable.AutoIncrement.Sequence, seqTable, err) - } - sm.backingTableKeyspace = keyspace - sm.backingTableName = tableName - // Update the definition with the unescaped values. - seqTable.AutoIncrement.Sequence = fmt.Sprintf("%s.%s", keyspace, tableName) - // Set the default keyspace name. We will later check to - // see if the tablet we send requests to is using a dbname - // override and use that if it is. - sm.backingTableDBName = "vt_" + keyspace - } else { - sm.backingTableName, err = sqlescape.UnescapeID(seqTable.AutoIncrement.Sequence) - if err != nil { - return nil, false, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid sequence table name %q defined in sequence table %+v: %v", - seqTable.AutoIncrement.Sequence, seqTable, err) - } - seqTable.AutoIncrement.Sequence = sm.backingTableName - allFullyQualified = false - } - // The column names can be escaped in the vschema definition. - for i := range seqTable.ColumnVindexes { - var ( - unescapedColumn string - err error - ) - if len(seqTable.ColumnVindexes[i].Columns) > 0 { - for n := range seqTable.ColumnVindexes[i].Columns { - unescapedColumn, err = sqlescape.UnescapeID(seqTable.ColumnVindexes[i].Columns[n]) - seqTable.ColumnVindexes[i].Columns[n] = unescapedColumn - } - } else { - // This is the legacy vschema definition. - unescapedColumn, err = sqlescape.UnescapeID(seqTable.ColumnVindexes[i].Column) - seqTable.ColumnVindexes[i].Column = unescapedColumn - } - if err != nil { - return nil, false, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid sequence column vindex name %q defined in sequence table %+v: %v", - seqTable.ColumnVindexes[i].Column, seqTable, err) - } - } - unescapedAutoIncCol, err := sqlescape.UnescapeID(seqTable.AutoIncrement.Column) - if err != nil { - return nil, false, vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid auto-increment column name %q defined in sequence table %+v: %v", - seqTable.AutoIncrement.Column, seqTable, err) - } - seqTable.AutoIncrement.Column = unescapedAutoIncCol - sm.usingTableDefinition = seqTable - sequencesByBackingTable[sm.backingTableName] = sm - } - - return sequencesByBackingTable, allFullyQualified, nil -} - -// initializeTargetSequences initializes the backing sequence tables -// using a map keyed by the backing sequence table name. -// -// The backing tables must have already been created, unless a default -// global keyspace exists for the trafficSwitcher -- in which case we -// will create the backing table there if needed. - -// This function will then ensure that the next value is set to a value -// greater than any currently stored in the using table on the target -// keyspace. If the backing table is updated to a new higher value then -// it will also tell the primary tablet serving the sequence to -// refresh/reset its cache to be sure that it does not provide a value -// that is less than the current max. -func (ts *trafficSwitcher) initializeTargetSequences(ctx context.Context, sequencesByBackingTable map[string]*sequenceMetadata) error { - initSequenceTable := func(ictx context.Context, sequenceMetadata *sequenceMetadata) error { - // Now we need to run this query on the target shards in order - // to get the max value and set the next id for the sequence to - // a higher value. - shardResults := make([]int64, 0, len(ts.TargetShards())) - srMu := sync.Mutex{} - ierr := ts.ForAllTargets(func(target *MigrationTarget) error { - primary := target.GetPrimary() - if primary == nil || primary.GetAlias() == nil { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "no primary tablet found for target shard %s/%s", - ts.targetKeyspace, target.GetShard().ShardName()) - } - usingCol, err := sqlescape.EnsureEscaped(sequenceMetadata.usingTableDefinition.AutoIncrement.Column) - if err != nil { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid column name %s specified for sequence in table %s: %v", - sequenceMetadata.usingTableDefinition.AutoIncrement.Column, sequenceMetadata.usingTableName, err) - } - usingDB, err := sqlescape.EnsureEscaped(sequenceMetadata.usingTableDBName) - if err != nil { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid database name %s specified for sequence in table %s: %v", - sequenceMetadata.usingTableDBName, sequenceMetadata.usingTableName, err) - } - usingTable, err := sqlescape.EnsureEscaped(sequenceMetadata.usingTableName) - if err != nil { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid sequence table name specified for sequence in table %s: %v", - sequenceMetadata.usingTableName, err) - } - query := sqlparser.BuildParsedQuery(sqlGetMaxSequenceVal, - usingCol, - usingDB, - usingTable, - ) - qr, terr := ts.ws.tmc.ExecuteFetchAsApp(ictx, primary.Tablet, true, &tabletmanagerdatapb.ExecuteFetchAsAppRequest{ - Query: []byte(query.Query), - MaxRows: 1, - }) - if terr != nil || len(qr.Rows) != 1 { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get the max used sequence value for target table %s.%s on tablet %s in order to initialize the backing sequence table: %v", - ts.targetKeyspace, sequenceMetadata.usingTableName, topoproto.TabletAliasString(primary.Alias), terr) - } - rawVal := sqltypes.Proto3ToResult(qr).Rows[0][0] - maxID := int64(0) - if !rawVal.IsNull() { // If it's NULL then there are no rows and 0 remains the max - maxID, terr = rawVal.ToInt64() - if terr != nil { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get the max used sequence value for target table %s.%s on tablet %s in order to initialize the backing sequence table: %v", - ts.targetKeyspace, sequenceMetadata.usingTableName, topoproto.TabletAliasString(primary.Alias), terr) - } - } - srMu.Lock() - defer srMu.Unlock() - shardResults = append(shardResults, maxID) - return nil - }) - if ierr != nil { - return ierr - } - select { - case <-ictx.Done(): - return ictx.Err() - default: - } - if len(shardResults) == 0 { // This should never happen - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "did not get any results for the max used sequence value for target table %s.%s in order to initialize the backing sequence table", - ts.targetKeyspace, sequenceMetadata.usingTableName) - } - // Sort the values to find the max value across all shards. - sort.Slice(shardResults, func(i, j int) bool { - return shardResults[i] < shardResults[j] - }) - nextVal := shardResults[len(shardResults)-1] + 1 - // Now we need to update the sequence table, if needed, in order to - // ensure that that the next value it provides is > the current max. - sequenceShard, ierr := ts.TopoServer().GetOnlyShard(ictx, sequenceMetadata.backingTableKeyspace) - if ierr != nil || sequenceShard == nil || sequenceShard.PrimaryAlias == nil { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get the primary tablet for keyspace %s: %v", - sequenceMetadata.backingTableKeyspace, ierr) - } - sequenceTablet, ierr := ts.TopoServer().GetTablet(ictx, sequenceShard.PrimaryAlias) - if ierr != nil || sequenceTablet == nil { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get the primary tablet for keyspace %s: %v", - sequenceMetadata.backingTableKeyspace, ierr) - } - select { - case <-ictx.Done(): - return ictx.Err() - default: - } - if sequenceTablet.DbNameOverride != "" { - sequenceMetadata.backingTableDBName = sequenceTablet.DbNameOverride - } - backingDB, err := sqlescape.EnsureEscaped(sequenceMetadata.backingTableDBName) - if err != nil { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid database name %s in sequence backing table %s: %v", - sequenceMetadata.backingTableDBName, sequenceMetadata.backingTableName, err) - } - backingTable, err := sqlescape.EnsureEscaped(sequenceMetadata.backingTableName) - if err != nil { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid sequence backing table name %s: %v", - sequenceMetadata.backingTableName, err) - } - query := sqlparser.BuildParsedQuery(sqlInitSequenceTable, - backingDB, - backingTable, - nextVal, - nextVal, - nextVal, - ) - // Now execute this on the primary tablet of the unsharded keyspace - // housing the backing table. - initialize: - qr, ierr := ts.ws.tmc.ExecuteFetchAsApp(ictx, sequenceTablet.Tablet, true, &tabletmanagerdatapb.ExecuteFetchAsAppRequest{ - Query: []byte(query.Query), - MaxRows: 1, - }) - if ierr != nil { - vterr := vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to initialize the backing sequence table %s.%s: %v", - sequenceMetadata.backingTableDBName, sequenceMetadata.backingTableName, ierr) - // If the sequence table doesn't exist, let's try and create it, otherwise - // return the error. - if sqlErr, ok := sqlerror.NewSQLErrorFromError(ierr).(*sqlerror.SQLError); !ok || - (sqlErr.Num != sqlerror.ERNoSuchTable && sqlErr.Num != sqlerror.ERBadTable) { - return vterr - } - stmt := sqlparser.BuildParsedQuery(sqlCreateSequenceTable, backingTable) - _, ierr = ts.ws.tmc.ApplySchema(ctx, sequenceTablet.Tablet, &tmutils.SchemaChange{ - SQL: stmt.Query, - Force: false, - AllowReplication: true, - SQLMode: vreplication.SQLMode, - DisableForeignKeyChecks: true, - }) - if ierr != nil { - return vterrors.Wrapf(vterr, "could not create missing sequence table: %v", err) - } - select { - case <-ctx.Done(): - return vterrors.Wrapf(vterr, "could not create missing sequence table: %v", ctx.Err()) - default: - goto initialize - } - } - // If we actually updated the backing sequence table, then we need - // to tell the primary tablet managing the sequence to refresh/reset - // its cache for the table. - if qr.RowsAffected == 0 { - return nil - } - select { - case <-ictx.Done(): - return ictx.Err() - default: - } - ts.Logger().Infof("Resetting sequence cache for backing table %s on shard %s/%s using tablet %s", - sequenceMetadata.backingTableName, sequenceShard.Keyspace(), sequenceShard.ShardName(), sequenceShard.PrimaryAlias) - ti, ierr := ts.TopoServer().GetTablet(ictx, sequenceShard.PrimaryAlias) - if ierr != nil { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to get primary tablet for keyspace %s: %v", - sequenceMetadata.backingTableKeyspace, ierr) - } - // ResetSequences interfaces with the schema engine and the actual - // table identifiers DO NOT contain the backticks. So we have to - // ensure that the table name is unescaped. - unescapedBackingTable, err := sqlescape.UnescapeID(backingTable) - if err != nil { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid sequence backing table name %s: %v", backingTable, err) - } - ierr = ts.TabletManagerClient().ResetSequences(ictx, ti.Tablet, []string{unescapedBackingTable}) - if ierr != nil { - return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "failed to reset the sequence cache for backing table %s on shard %s/%s using tablet %s: %v", - sequenceMetadata.backingTableName, sequenceShard.Keyspace(), sequenceShard.ShardName(), sequenceShard.PrimaryAlias, ierr) - } - return nil - } - - initGroup, gctx := errgroup.WithContext(ctx) - for _, sequenceMetadata := range sequencesByBackingTable { - initGroup.Go(func() error { - return initSequenceTable(gctx, sequenceMetadata) - }) - } - return initGroup.Wait() -} - -func (ts *trafficSwitcher) mustResetSequences(ctx context.Context) (bool, error) { - switch ts.workflowType { - case binlogdatapb.VReplicationWorkflowType_Migrate, - binlogdatapb.VReplicationWorkflowType_MoveTables: - return ts.isSequenceParticipating(ctx) - default: - return false, nil - } -} - -func (ts *trafficSwitcher) resetSequences(ctx context.Context) error { - var err error - mustReset := false - if mustReset, err = ts.mustResetSequences(ctx); err != nil { - return err - } - if !mustReset { - return nil - } - return ts.ForAllSources(func(source *MigrationSource) error { - ts.Logger().Infof("Resetting sequences for source shard %s.%s on tablet %s", - source.GetShard().Keyspace(), source.GetShard().ShardName(), topoproto.TabletAliasString(source.GetPrimary().GetAlias())) - return ts.TabletManagerClient().ResetSequences(ctx, source.GetPrimary().Tablet, ts.Tables()) - }) -} - func (ts *trafficSwitcher) IsMultiTenantMigration() bool { if ts.options != nil && ts.options.TenantId != "" { return true diff --git a/go/vt/vtctl/workflow/traffic_switcher_test.go b/go/vt/vtctl/workflow/traffic_switcher_test.go index eb8c877e255..d9b5e349630 100644 --- a/go/vt/vtctl/workflow/traffic_switcher_test.go +++ b/go/vt/vtctl/workflow/traffic_switcher_test.go @@ -27,19 +27,14 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "vitess.io/vitess/go/sqlescape" "vitess.io/vitess/go/sqltypes" - "vitess.io/vitess/go/vt/mysqlctl/tmutils" "vitess.io/vitess/go/vt/proto/binlogdata" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + topodatapb "vitess.io/vitess/go/vt/proto/topodata" "vitess.io/vitess/go/vt/proto/vschema" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topo" "vitess.io/vitess/go/vt/vtgate/vindexes" - "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" - - tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" - topodatapb "vitess.io/vitess/go/vt/proto/topodata" - vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata" ) type testTrafficSwitcher struct { @@ -75,521 +70,6 @@ func TestReverseWorkflowName(t *testing.T) { } } -func TestGetTargetSequenceMetadata(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel() - cell := "cell1" - workflow := "wf1" - table := "`t1`" - tableDDL := "create table t1 (id int not null auto_increment primary key, c1 varchar(10))" - table2 := "t2" - unescapedTable := "t1" - sourceKeyspace := &testKeyspace{ - KeyspaceName: "source-ks", - ShardNames: []string{"0"}, - } - targetKeyspace := &testKeyspace{ - KeyspaceName: "targetks", - ShardNames: []string{"-80", "80-"}, - } - vindexes := map[string]*vschema.Vindex{ - "xxhash": { - Type: "xxhash", - }, - } - env := newTestEnv(t, ctx, cell, sourceKeyspace, targetKeyspace) - defer env.close() - - env.tmc.schema = map[string]*tabletmanagerdatapb.SchemaDefinition{ - unescapedTable: { - TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ - { - Name: unescapedTable, - Schema: tableDDL, - }, - }, - }, - } - - type testCase struct { - name string - sourceVSchema *vschema.Keyspace - targetVSchema *vschema.Keyspace - options *vtctldatapb.WorkflowOptions - want map[string]*sequenceMetadata - expectSourceApplySchemaRequestResponse *applySchemaRequestResponse - err string - } - tests := []testCase{ - { - name: "no sequences", - want: nil, - }, - { - name: "sequences with backticks and qualified table", - sourceVSchema: &vschema.Keyspace{ - Vindexes: vindexes, - Tables: map[string]*vschema.Table{ - "`my-seq1`": { - Type: "sequence", - }, - }, - }, - targetVSchema: &vschema.Keyspace{ - Vindexes: vindexes, - Tables: map[string]*vschema.Table{ - table: { - ColumnVindexes: []*vschema.ColumnVindex{ - { - Name: "xxhash", - Column: "`my-col`", - }, - }, - AutoIncrement: &vschema.AutoIncrement{ - Column: "`my-col`", - Sequence: fmt.Sprintf("`%s`.`my-seq1`", sourceKeyspace.KeyspaceName), - }, - }, - }, - }, - want: map[string]*sequenceMetadata{ - "my-seq1": { - backingTableName: "my-seq1", - backingTableKeyspace: "source-ks", - backingTableDBName: "vt_source-ks", - usingTableName: unescapedTable, - usingTableDBName: "vt_targetks", - usingTableDefinition: &vschema.Table{ - ColumnVindexes: []*vschema.ColumnVindex{ - { - Column: "my-col", - Name: "xxhash", - }, - }, - AutoIncrement: &vschema.AutoIncrement{ - Column: "my-col", - Sequence: fmt.Sprintf("%s.my-seq1", sourceKeyspace.KeyspaceName), - }, - }, - }, - }, - }, - { - name: "auto_increment replaced with sequence", - sourceVSchema: &vschema.Keyspace{ - Vindexes: vindexes, - Tables: map[string]*vschema.Table{}, // Sequence table will be created - }, - options: &vtctldatapb.WorkflowOptions{ - ShardedAutoIncrementHandling: vtctldatapb.ShardedAutoIncrementHandling_REPLACE, - GlobalKeyspace: sourceKeyspace.KeyspaceName, - }, - expectSourceApplySchemaRequestResponse: &applySchemaRequestResponse{ - change: &tmutils.SchemaChange{ - SQL: sqlparser.BuildParsedQuery(sqlCreateSequenceTable, - sqlescape.EscapeID(fmt.Sprintf(autoSequenceTableFormat, unescapedTable))).Query, - Force: false, - AllowReplication: true, - SQLMode: vreplication.SQLMode, - DisableForeignKeyChecks: true, - }, - res: &tabletmanagerdatapb.SchemaChangeResult{}, - }, - targetVSchema: &vschema.Keyspace{ - Vindexes: vindexes, - Tables: map[string]*vschema.Table{ - table: { - ColumnVindexes: []*vschema.ColumnVindex{ - { - Name: "xxhash", - Column: "`my-col`", - }, - }, - AutoIncrement: &vschema.AutoIncrement{ - Column: "my-col", - Sequence: fmt.Sprintf(autoSequenceTableFormat, unescapedTable), - }, - }, - }, - }, - want: map[string]*sequenceMetadata{ - fmt.Sprintf(autoSequenceTableFormat, unescapedTable): { - backingTableName: fmt.Sprintf(autoSequenceTableFormat, unescapedTable), - backingTableKeyspace: "source-ks", - backingTableDBName: "vt_source-ks", - usingTableName: unescapedTable, - usingTableDBName: "vt_targetks", - usingTableDefinition: &vschema.Table{ - ColumnVindexes: []*vschema.ColumnVindex{ - { - Column: "my-col", - Name: "xxhash", - }, - }, - AutoIncrement: &vschema.AutoIncrement{ - Column: "my-col", - Sequence: fmt.Sprintf(autoSequenceTableFormat, unescapedTable), - }, - }, - }, - }, - }, - { - name: "sequences with backticks", - sourceVSchema: &vschema.Keyspace{ - Vindexes: vindexes, - Tables: map[string]*vschema.Table{ - "`my-seq1`": { - Type: "sequence", - }, - }, - }, - targetVSchema: &vschema.Keyspace{ - Vindexes: vindexes, - Tables: map[string]*vschema.Table{ - table: { - ColumnVindexes: []*vschema.ColumnVindex{ - { - Name: "xxhash", - Column: "`my-col`", - }, - }, - AutoIncrement: &vschema.AutoIncrement{ - Column: "`my-col`", - Sequence: "`my-seq1`", - }, - }, - }, - }, - want: map[string]*sequenceMetadata{ - "my-seq1": { - backingTableName: "my-seq1", - backingTableKeyspace: "source-ks", - backingTableDBName: "vt_source-ks", - usingTableName: unescapedTable, - usingTableDBName: "vt_targetks", - usingTableDefinition: &vschema.Table{ - ColumnVindexes: []*vschema.ColumnVindex{ - { - Column: "my-col", - Name: "xxhash", - }, - }, - AutoIncrement: &vschema.AutoIncrement{ - Column: "my-col", - Sequence: "my-seq1", - }, - }, - }, - }, - }, - { - name: "sequences using vindexes with both column definition structures", - sourceVSchema: &vschema.Keyspace{ - Vindexes: vindexes, - Tables: map[string]*vschema.Table{ - "seq1": { - Type: "sequence", - }, - "seq2": { - Type: "sequence", - }, - }, - }, - targetVSchema: &vschema.Keyspace{ - Vindexes: vindexes, - Tables: map[string]*vschema.Table{ - table: { - ColumnVindexes: []*vschema.ColumnVindex{ - { - Name: "xxhash", - Column: "col1", - }, - }, - AutoIncrement: &vschema.AutoIncrement{ - Column: "col1", - Sequence: fmt.Sprintf("%s.seq1", sourceKeyspace.KeyspaceName), - }, - }, - table2: { - ColumnVindexes: []*vschema.ColumnVindex{ - { - Name: "xxhash", - Columns: []string{"col2"}, - }, - }, - AutoIncrement: &vschema.AutoIncrement{ - Column: "col2", - Sequence: fmt.Sprintf("%s.seq2", sourceKeyspace.KeyspaceName), - }, - }, - }, - }, - want: map[string]*sequenceMetadata{ - "seq1": { - backingTableName: "seq1", - backingTableKeyspace: "source-ks", - backingTableDBName: "vt_source-ks", - usingTableName: unescapedTable, - usingTableDBName: "vt_targetks", - usingTableDefinition: &vschema.Table{ - ColumnVindexes: []*vschema.ColumnVindex{ - { - Column: "col1", - Name: "xxhash", - }, - }, - AutoIncrement: &vschema.AutoIncrement{ - Column: "col1", - Sequence: fmt.Sprintf("%s.seq1", sourceKeyspace.KeyspaceName), - }, - }, - }, - "seq2": { - backingTableName: "seq2", - backingTableKeyspace: "source-ks", - backingTableDBName: "vt_source-ks", - usingTableName: table2, - usingTableDBName: "vt_targetks", - usingTableDefinition: &vschema.Table{ - ColumnVindexes: []*vschema.ColumnVindex{ - { - Columns: []string{"col2"}, - Name: "xxhash", - }, - }, - AutoIncrement: &vschema.AutoIncrement{ - Column: "col2", - Sequence: fmt.Sprintf("%s.seq2", sourceKeyspace.KeyspaceName), - }, - }, - }, - }, - }, - { - name: "sequence with table having mult-col vindex", - sourceVSchema: &vschema.Keyspace{ - Vindexes: vindexes, - Tables: map[string]*vschema.Table{ - "seq1": { - Type: "sequence", - }, - }, - }, - targetVSchema: &vschema.Keyspace{ - Vindexes: vindexes, - Tables: map[string]*vschema.Table{ - table: { - ColumnVindexes: []*vschema.ColumnVindex{ - { - Name: "xxhash", - Columns: []string{"col3", "col4"}, - }, - }, - AutoIncrement: &vschema.AutoIncrement{ - Column: "col1", - Sequence: fmt.Sprintf("%s.seq1", sourceKeyspace.KeyspaceName), - }, - }, - }, - }, - want: map[string]*sequenceMetadata{ - "seq1": { - backingTableName: "seq1", - backingTableKeyspace: "source-ks", - backingTableDBName: "vt_source-ks", - usingTableName: unescapedTable, - usingTableDBName: "vt_targetks", - usingTableDefinition: &vschema.Table{ - ColumnVindexes: []*vschema.ColumnVindex{ - { - Columns: []string{"col3", "col4"}, - Name: "xxhash", - }, - }, - AutoIncrement: &vschema.AutoIncrement{ - Column: "col1", - Sequence: fmt.Sprintf("%s.seq1", sourceKeyspace.KeyspaceName), - }, - }, - }, - }, - }, - { - name: "invalid table name", - sourceVSchema: &vschema.Keyspace{ - Vindexes: vindexes, - Tables: map[string]*vschema.Table{ - "`my-`seq1`": { - Type: "sequence", - }, - }, - }, - targetVSchema: &vschema.Keyspace{ - Vindexes: vindexes, - Tables: map[string]*vschema.Table{ - table: { - ColumnVindexes: []*vschema.ColumnVindex{ - { - Name: "xxhash", - Column: "`my-col`", - }, - }, - AutoIncrement: &vschema.AutoIncrement{ - Column: "`my-col`", - Sequence: "`my-seq1`", - }, - }, - }, - }, - err: "invalid table name \"`my-`seq1`\" in keyspace source-ks: UnescapeID err: unexpected single backtick at position 3 in 'my-`seq1'", - }, - { - name: "invalid keyspace name", - sourceVSchema: &vschema.Keyspace{ - Vindexes: vindexes, - Tables: map[string]*vschema.Table{ - "`my-seq1`": { - Type: "sequence", - }, - }, - }, - targetVSchema: &vschema.Keyspace{ - Vindexes: vindexes, - Tables: map[string]*vschema.Table{ - table: { - ColumnVindexes: []*vschema.ColumnVindex{ - { - Name: "xxhash", - Column: "`my-col`", - }, - }, - AutoIncrement: &vschema.AutoIncrement{ - Column: "`my-col`", - Sequence: "`ks`1`.`my-seq1`", - }, - }, - }, - }, - err: "invalid keyspace in qualified sequence table name \"`ks`1`.`my-seq1`\" defined in sequence table column_vindexes:{column:\"`my-col`\" name:\"xxhash\"} auto_increment:{column:\"`my-col`\" sequence:\"`ks`1`.`my-seq1`\"}: UnescapeID err: unexpected single backtick at position 2 in 'ks`1'", - }, - { - name: "invalid auto-inc column name", - sourceVSchema: &vschema.Keyspace{ - Vindexes: vindexes, - Tables: map[string]*vschema.Table{ - "`my-seq1`": { - Type: "sequence", - }, - }, - }, - targetVSchema: &vschema.Keyspace{ - Vindexes: vindexes, - Tables: map[string]*vschema.Table{ - table: { - ColumnVindexes: []*vschema.ColumnVindex{ - { - Name: "xxhash", - Column: "`my-col`", - }, - }, - AutoIncrement: &vschema.AutoIncrement{ - Column: "`my`-col`", - Sequence: "`my-seq1`", - }, - }, - }, - }, - err: "invalid auto-increment column name \"`my`-col`\" defined in sequence table column_vindexes:{column:\"my-col\" name:\"xxhash\"} auto_increment:{column:\"`my`-col`\" sequence:\"my-seq1\"}: UnescapeID err: unexpected single backtick at position 2 in 'my`-col'", - }, - { - name: "invalid sequence name", - sourceVSchema: &vschema.Keyspace{ - Vindexes: vindexes, - Tables: map[string]*vschema.Table{ - "`my-seq1`": { - Type: "sequence", - }, - }, - }, - targetVSchema: &vschema.Keyspace{ - Vindexes: vindexes, - Tables: map[string]*vschema.Table{ - table: { - ColumnVindexes: []*vschema.ColumnVindex{ - { - Name: "xxhash", - Column: "`my-col`", - }, - }, - AutoIncrement: &vschema.AutoIncrement{ - Column: "`my-col`", - Sequence: "`my-`seq1`", - }, - }, - }, - }, - err: "invalid sequence table name \"`my-`seq1`\" defined in sequence table column_vindexes:{column:\"`my-col`\" name:\"xxhash\"} auto_increment:{column:\"`my-col`\" sequence:\"`my-`seq1`\"}: UnescapeID err: unexpected single backtick at position 3 in 'my-`seq1'", - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - err := env.ts.SaveVSchema(ctx, &topo.KeyspaceVSchemaInfo{ - Name: sourceKeyspace.KeyspaceName, - Keyspace: tc.sourceVSchema, - }) - require.NoError(t, err) - err = env.ts.SaveVSchema(ctx, &topo.KeyspaceVSchemaInfo{ - Name: targetKeyspace.KeyspaceName, - Keyspace: tc.targetVSchema, - }) - require.NoError(t, err) - err = env.ts.RebuildSrvVSchema(ctx, nil) - require.NoError(t, err) - sources := make(map[string]*MigrationSource, len(sourceKeyspace.ShardNames)) - targets := make(map[string]*MigrationTarget, len(targetKeyspace.ShardNames)) - for i, shard := range sourceKeyspace.ShardNames { - tablet := env.tablets[sourceKeyspace.KeyspaceName][startingSourceTabletUID+(i*tabletUIDStep)] - sources[shard] = &MigrationSource{ - primary: &topo.TabletInfo{ - Tablet: tablet, - }, - } - if tc.expectSourceApplySchemaRequestResponse != nil { - env.tmc.expectApplySchemaRequest(tablet.Alias.Uid, tc.expectSourceApplySchemaRequestResponse) - } - } - for i, shard := range targetKeyspace.ShardNames { - tablet := env.tablets[targetKeyspace.KeyspaceName][startingTargetTabletUID+(i*tabletUIDStep)] - targets[shard] = &MigrationTarget{ - primary: &topo.TabletInfo{ - Tablet: tablet, - }, - } - } - ts := &trafficSwitcher{ - id: 1, - ws: env.ws, - workflow: workflow, - tables: []string{table, table2}, - sourceKeyspace: sourceKeyspace.KeyspaceName, - targetKeyspace: targetKeyspace.KeyspaceName, - sources: sources, - targets: targets, - options: tc.options, - } - got, err := ts.getTargetSequenceMetadata(ctx) - if tc.err != "" { - require.EqualError(t, err, tc.err) - } else { - require.NoError(t, err) - } - require.EqualValues(t, tc.want, got) - }) - } -} - // TestSwitchTrafficPositionHandling confirms that if any writes are somehow // executed against the source between the stop source writes and wait for // catchup steps, that we have the correct position and do not lose the write(s). @@ -659,71 +139,6 @@ func TestTrafficSwitchPositionHandling(t *testing.T) { require.NoError(t, err) } -func TestInitializeTargetSequences(t *testing.T) { - ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) - defer cancel() - - workflowName := "wf1" - tableName := "t1" - sourceKeyspaceName := "sourceks" - targetKeyspaceName := "targetks" - - schema := map[string]*tabletmanagerdatapb.SchemaDefinition{ - tableName: { - TableDefinitions: []*tabletmanagerdatapb.TableDefinition{ - { - Name: tableName, - Schema: fmt.Sprintf("CREATE TABLE %s (id BIGINT, name VARCHAR(64), PRIMARY KEY (id))", tableName), - }, - }, - }, - } - - sourceKeyspace := &testKeyspace{ - KeyspaceName: sourceKeyspaceName, - ShardNames: []string{"0"}, - } - targetKeyspace := &testKeyspace{ - KeyspaceName: targetKeyspaceName, - ShardNames: []string{"0"}, - } - - env := newTestEnv(t, ctx, defaultCellName, sourceKeyspace, targetKeyspace) - defer env.close() - env.tmc.schema = schema - - ts, _, err := env.ws.getWorkflowState(ctx, targetKeyspaceName, workflowName) - require.NoError(t, err) - sw := &switcher{ts: ts, s: env.ws} - - sequencesByBackingTable := map[string]*sequenceMetadata{ - "my-seq1": { - backingTableName: "my-seq1", - backingTableKeyspace: sourceKeyspaceName, - backingTableDBName: fmt.Sprintf("vt_%s", sourceKeyspaceName), - usingTableName: tableName, - usingTableDBName: "vt_targetks", - usingTableDefinition: &vschema.Table{ - AutoIncrement: &vschema.AutoIncrement{ - Column: "my-col", - Sequence: fmt.Sprintf("%s.my-seq1", sourceKeyspace.KeyspaceName), - }, - }, - }, - } - - env.tmc.expectVRQuery(200, "/select max.*", sqltypes.MakeTestResult(sqltypes.MakeTestFields("maxval", "int64"), "34")) - // Expect the insert query to be executed with 35 as a params, since we provide a maxID of 34 in the last query - env.tmc.expectVRQuery(100, "/insert into.*35.*", &sqltypes.Result{RowsAffected: 1}) - - err = sw.initializeTargetSequences(ctx, sequencesByBackingTable) - assert.NoError(t, err) - - // Expect the queries to be cleared - assert.Empty(t, env.tmc.vrQueries[100]) - assert.Empty(t, env.tmc.vrQueries[200]) -} - func TestAddTenantFilter(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel()