Skip to content

Commit

Permalink
Changes from self review
Browse files Browse the repository at this point in the history
Signed-off-by: Matt Lord <mattalord@gmail.com>
  • Loading branch information
mattlord committed Jan 12, 2025
1 parent 030ea5a commit a8253ca
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 12 deletions.
20 changes: 14 additions & 6 deletions go/vt/vttablet/tabletmanager/vdiff/table_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -719,6 +719,7 @@ func (td *tableDiffer) updateTableProgress(dbClient binlogplayer.DBClient, dr *D
if dr == nil {
return fmt.Errorf("cannot update progress with a nil diff report")
}

var err error
var query string
rpt, err := json.Marshal(dr)
Expand All @@ -744,9 +745,10 @@ func (td *tableDiffer) updateTableProgress(dbClient binlogplayer.DBClient, dr *D
td.lastTargetPK = lastPK.Target
if lastPK.Source == nil {
// If the source PK is nil, we use the target value for both.
lastPK.Source = lastPK.Target
td.lastSourcePK = lastPK.Target
} else {
td.lastSourcePK = lastPK.Source
}
td.lastSourcePK = lastPK.Source
}
lastPKTxt, err := prototext.Marshal(lastPK)
if err != nil {
Expand All @@ -766,6 +768,7 @@ func (td *tableDiffer) updateTableProgress(dbClient binlogplayer.DBClient, dr *D
if _, err := dbClient.ExecuteFetch(query, 1); err != nil {
return err
}

td.wd.ct.TableDiffRowCounts.Add(td.table.Name, dr.ProcessedRows)
return nil
}
Expand Down Expand Up @@ -849,7 +852,7 @@ func (td *tableDiffer) lastPKFromRow(row []sqltypes.Value) *tabletmanagerdatapb.
}
// If the source and target PKs are different, we need to save the source PK
// as well. Otherwise the source will be nil which means that the target value
// should be used for the source.
// should also be used for the source.
if !slices.Equal(td.tablePlan.pkCols, td.tablePlan.sourcePkCols) {
lastPK.Source = buildQR(td.tablePlan.sourcePkCols)
}
Expand Down Expand Up @@ -904,13 +907,15 @@ func (td *tableDiffer) adjustForSourceTimeZone(targetSelectExprs sqlparser.Selec

// getSourcePKCols populates the sourcePkCols field in the tablePlan.
// We need this information in order to save the lastpk value for the
// source as the PK columns may differ between the source and target.
// source if the PK columns differ between the source and target.
func (td *tableDiffer) getSourcePKCols() error {
ctx, cancel := context.WithTimeout(td.wd.ct.vde.ctx, topo.RemoteOperationTimeout*3)
defer cancel()

// We use the first sourceShard as all of them should have the same schema.
if len(td.wd.ct.sources) == 0 {
return fmt.Errorf("no source shards found")
return vterrors.Errorf(vtrpcpb.Code_FAILED_PRECONDITION, "no source shards found in %s keyspace",
td.wd.ct.sourceKeyspace)
}
sourceShardName := maps.Keys(td.wd.ct.sources)[0]
sourceTS, err := td.wd.getSourceTopoServer()
Expand All @@ -926,7 +931,8 @@ func (td *tableDiffer) getSourcePKCols() error {
}
sourceTablet, err := sourceTS.GetTablet(ctx, sourceShard.PrimaryAlias)
if err != nil {
return vterrors.Wrapf(err, "failed to get primary tablet in source shard %s", sourceShardName)
return vterrors.Wrapf(err, "failed to get primary tablet in source shard %s/%s",
td.wd.ct.sourceKeyspace, sourceShardName)
}
sourceSchema, err := td.wd.ct.tmc.GetSchema(ctx, sourceTablet.Tablet, &tabletmanagerdatapb.GetSchemaRequest{
Tables: []string{td.table.Name},
Expand Down Expand Up @@ -961,6 +967,7 @@ func (td *tableDiffer) getSourcePKCols() error {
sourceTable.PrimaryKeyColumns = append(sourceTable.PrimaryKeyColumns, td.table.Columns...)
}
}

sourcePKColumns := make(map[string]struct{}, len(sourceTable.PrimaryKeyColumns))
td.tablePlan.sourcePkCols = make([]int, 0, len(sourceTable.PrimaryKeyColumns))
for _, pkc := range sourceTable.PrimaryKeyColumns {
Expand All @@ -971,6 +978,7 @@ func (td *tableDiffer) getSourcePKCols() error {
td.tablePlan.sourcePkCols = append(td.tablePlan.sourcePkCols, i)
}
}

return nil
}

Expand Down
4 changes: 3 additions & 1 deletion go/vt/vttablet/tabletmanager/vdiff/table_differ_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,9 @@ func TestUpdateTableProgress(t *testing.T) {
TableDiffRowCounts: stats.NewCountersWithSingleLabel("", "", "Rows"),
},
opts: &tabletmanagerdatapb.VDiffOptions{
CoreOptions: &tabletmanagerdatapb.VDiffCoreOptions{},
CoreOptions: &tabletmanagerdatapb.VDiffCoreOptions{
MaxDiffSeconds: 100,
},
},
},
table: &tabletmanagerdatapb.TableDefinition{
Expand Down
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletmanager/vdiff/table_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,8 @@ type tablePlan struct {
pkCols []int
// sourcePkCols has the indices of PK cols in the select
// list, but from the source keyspace. This is needed to
// properly store the lastpk for the source.
// properly store the lastpk for the source when the source
// and target have different PK columns.
sourcePkCols []int

// selectPks is the list of pk columns as they appear in the select clause for the diff.
Expand Down Expand Up @@ -211,7 +212,6 @@ func (tp *tablePlan) findPKs(dbClient binlogplayer.DBClient, targetSelect *sqlpa
if len(tp.table.PrimaryKeyColumns) == 0 {
return nil
}

var orderby sqlparser.OrderBy
for _, pk := range tp.table.PrimaryKeyColumns {
found := false
Expand Down
4 changes: 1 addition & 3 deletions go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,6 @@ func (wd *workflowDiffer) buildPlan(dbClient binlogplayer.DBClient, filter *binl
}

td := newTableDiffer(wd, table, sourceQuery)

lastPK, err := wd.getTableLastPK(dbClient, table.Name)
if err != nil {
return err
Expand All @@ -384,7 +383,6 @@ func (wd *workflowDiffer) buildPlan(dbClient binlogplayer.DBClient, filter *binl
if _, err := td.buildTablePlan(dbClient, wd.ct.vde.dbName, wd.collationEnv); err != nil {
return err
}

// We get the PK columns from the source schema as well as they can differ
// and they determine the proper position to use when saving our progress.
if err := td.getSourcePKCols(); err != nil {
Expand Down Expand Up @@ -420,7 +418,7 @@ func (wd *workflowDiffer) getTableLastPK(dbClient binlogplayer.DBClient, tableNa
if len(lastpk) != 0 {
lastPK := &tabletmanagerdatapb.VDiffTableLastPK{}
if err := prototext.Unmarshal(lastpk, lastPK); err != nil {
return nil, vterrors.Wrapf(err, "failed to unmarshal lastpk value of %s for table %s",
return nil, vterrors.Wrapf(err, "failed to unmarshal lastpk value of %s for the %s table",
string(lastpk), tableName)
}
if lastPK.Source == nil { // Then it's the same as the target
Expand Down

0 comments on commit a8253ca

Please sign in to comment.