Skip to content

Commit

Permalink
Create sequence table if not found while updating sequence value. Som…
Browse files Browse the repository at this point in the history
…e refactor

Signed-off-by: Rohit Nayak <rohit@planetscale.com>
  • Loading branch information
rohit-nayak-ps committed Feb 25, 2025
1 parent ab84f36 commit 3d1742e
Showing 1 changed file with 83 additions and 46 deletions.
129 changes: 83 additions & 46 deletions go/vt/vtctl/workflow/sequences.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,30 +22,32 @@ import (
"strings"
"sync"

"vitess.io/vitess/go/vt/log"

"golang.org/x/exp/maps"
"golang.org/x/sync/errgroup"

"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/sqlescape"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl/tmutils"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vtgate/vindexes"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
vschemapb "vitess.io/vitess/go/vt/proto/vschema"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
)

const (
sqlGetMaxSequenceVal = "select max(%a) as maxval from %a.%a"
sqlInitSequenceTable = "insert into %a.%a (id, next_id, cache) values (0, %d, 1000) on duplicate key update next_id = if(next_id < %d, %d, next_id)"
sqlCreateSequenceTable = "create table if not exists %a (id int, next_id bigint, cache bigint, primary key(id)) comment 'vitess_sequence'"
sqlGetCurrentSequenceVal = "select next_id from %a.%a where id = 0"
sqlGetMaxSequenceVal = "select max(%a) as maxval from %a.%a"
)

// sequenceMetadata contains all of the relevant metadata for a sequence that
Expand All @@ -65,11 +67,15 @@ type sequenceMetadata struct {
usingTableDefinition *vschemapb.Table

// escaped values
escaped bool
usingCol, usingDB, usingTable string
backingDB, backingTable string
}

func (sm *sequenceMetadata) escapeValues() error {
if sm.escaped {
return nil
}
usingCol := ""
var err error
if sm.usingTableDefinition != nil && sm.usingTableDefinition.AutoIncrement != nil {
Expand Down Expand Up @@ -109,6 +115,7 @@ func (sm *sequenceMetadata) escapeValues() error {
return err
}
sm.backingTable = backingTable
sm.escaped = true
return nil
}

Expand Down Expand Up @@ -270,24 +277,48 @@ func (ts *trafficSwitcher) updateSequenceValue(ctx context.Context, seq *sequenc
if sequenceTablet.DbNameOverride != "" {
seq.backingTableDBName = sequenceTablet.DbNameOverride
}
query := sqlparser.BuildParsedQuery(sqlInitSequenceTable,
initQuery := sqlparser.BuildParsedQuery(sqlInitSequenceTable,
seq.backingTableDBName,
seq.backingTableName,
nextVal,
nextVal,
nextVal,
)
_, ierr = ts.ws.tmc.ExecuteFetchAsApp(ctx, sequenceTablet.Tablet, true, &tabletmanagerdatapb.ExecuteFetchAsAppRequest{
Query: []byte(query.Query),
MaxRows: 1,
})
if ierr != nil {
vterr := vterrors.Errorf(vtrpcpb.Code_INTERNAL,
"failed to initialize the backing sequence table %s.%s: %v",
seq.backingTableDBName, seq.backingTableName, ierr)
return vterr

const maxTries = 2
var err error

for i := 0; i < maxTries; i++ {
// Attempt to initialize the sequence.
_, err = ts.ws.tmc.ExecuteFetchAsApp(ctx, sequenceTablet.Tablet, true, &tabletmanagerdatapb.ExecuteFetchAsAppRequest{
Query: []byte(initQuery.Query),
MaxRows: 1,
})
if err == nil {
return nil
}

// If the table doesn't exist, try creating it.
sqlErr, ok := sqlerror.NewSQLErrorFromError(err).(*sqlerror.SQLError)
if !ok || (sqlErr.Num != sqlerror.ERNoSuchTable && sqlErr.Num != sqlerror.ERBadTable) {
return vterrors.Errorf(
vtrpcpb.Code_INTERNAL,
"failed to initialize the backing sequence table %s.%s: %v",
seq.backingTableDBName, seq.backingTableName, err,
)
}

if err := ts.createSequenceTable(ctx, seq.backingTable, sequenceTablet); err != nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL,
"failed to create the backing sequence table %s in the global-keyspace %s: %v",
seq.backingTable, sequenceShard.Keyspace(), err)
}
// Table has been created, so we fall through and try again on the next loop iteration.
}
return nil

return vterrors.Errorf(
vtrpcpb.Code_INTERNAL, "failed to initialize the backing sequence table %s.%s after retries. Last error: %v",
seq.backingTableDBName, seq.backingTableName, err)
}

// initializeTargetSequences initializes the backing sequence tables
Expand Down Expand Up @@ -478,38 +509,27 @@ func (ts trafficSwitcher) createMissingSequenceTables(ctx context.Context, seque
if err != nil {
return err
}
shard, err := ts.ws.ts.GetShard(ctx, globalKeyspace, shards[0])
if err != nil {
return err
}
if shard.PrimaryAlias == nil {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "global-keyspace %s does not currently have a primary tablet",
globalKeyspace)
}
primary, err := ts.ws.ts.GetTablet(ctx, shard.PrimaryAlias)
if err != nil {
return err
}
updatedGlobalVSchema := false
for tableName, sequenceMetadata := range sequencesByBackingTable {
if _, ok := tablesFound[tableName]; !ok {
// Create the backing table.
shard, err := ts.ws.ts.GetShard(ctx, globalKeyspace, shards[0])
if err != nil {
return err
}
if shard.PrimaryAlias == nil {
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "global-keyspace %s does not currently have a primary tablet",
globalKeyspace)
}
primary, err := ts.ws.ts.GetTablet(ctx, shard.PrimaryAlias)
if err != nil {
return err
}
escapedTableName, err := sqlescape.EnsureEscaped(tableName)
if err != nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid table name %s: %v",
tableName, err)
}
stmt := sqlparser.BuildParsedQuery(sqlCreateSequenceTable, escapedTableName)
_, err = ts.ws.tmc.ApplySchema(ctx, primary.Tablet, &tmutils.SchemaChange{
SQL: stmt.Query,
Force: false,
AllowReplication: true,
SQLMode: vreplication.SQLMode,
DisableForeignKeyChecks: true,
})
if err != nil {
return vterrors.Wrapf(err, "failed to create sequence backing table %s in global-keyspace %s",
tableName, globalKeyspace)

if err := ts.createSequenceTable(ctx, tableName, primary); err != nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL,
"failed to create the backing sequence table %s in the global-keyspace %s: %v",
tableName, globalKeyspace, err)
}
if bt := globalVSchema.Tables[sequenceMetadata.backingTableName]; bt == nil {
if globalVSchema.Tables == nil {
Expand All @@ -533,6 +553,23 @@ func (ts trafficSwitcher) createMissingSequenceTables(ctx context.Context, seque
return nil
}

func (ts trafficSwitcher) createSequenceTable(ctx context.Context, tableName string, primary *topo.TabletInfo) error {
escapedTableName, err := sqlescape.EnsureEscaped(tableName)
if err != nil {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "invalid table name %s: %v",
tableName, err)
}
stmt := sqlparser.BuildParsedQuery(sqlCreateSequenceTable, escapedTableName)
_, err = ts.ws.tmc.ApplySchema(ctx, primary.Tablet, &tmutils.SchemaChange{
SQL: stmt.Query,
Force: false,
AllowReplication: true,
SQLMode: vreplication.SQLMode,
DisableForeignKeyChecks: true,
})
return err
}

// findSequenceUsageInKeyspace searches the keyspace's vschema for usage
// of sequences. It returns a map of sequence metadata keyed by the backing
// sequence table name -- if any usage is found -- along with a boolean to
Expand Down

0 comments on commit 3d1742e

Please sign in to comment.