From e45331fe4ac2f879f3f46d43c74e3997e09684af Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 7 Feb 2024 17:22:10 -0500 Subject: [PATCH 01/15] Scaffolding and error count total Signed-off-by: Matt Lord --- go/vt/vttablet/tabletmanager/vdiff/action.go | 19 ++++-- go/vt/vttablet/tabletmanager/vdiff/stats.go | 72 ++++++++++++++++++++ 2 files changed, 84 insertions(+), 7 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vdiff/action.go b/go/vt/vttablet/tabletmanager/vdiff/action.go index ded232bf3c7..ca5fa976a51 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/action.go +++ b/go/vt/vttablet/tabletmanager/vdiff/action.go @@ -63,7 +63,12 @@ var ( } ) -func (vde *Engine) PerformVDiffAction(ctx context.Context, req *tabletmanagerdatapb.VDiffRequest) (*tabletmanagerdatapb.VDiffResponse, error) { +func (vde *Engine) PerformVDiffAction(ctx context.Context, req *tabletmanagerdatapb.VDiffRequest) (resp *tabletmanagerdatapb.VDiffResponse, err error) { + defer func() { + if err != nil { + globalStats.Errors.Add(1) + } + }() if !vde.isOpen { return nil, vterrors.New(vtrpcpb.Code_UNAVAILABLE, "vdiff engine is closed") } @@ -71,13 +76,13 @@ func (vde *Engine) PerformVDiffAction(ctx context.Context, req *tabletmanagerdat return nil, vterrors.New(vtrpcpb.Code_UNAVAILABLE, "vdiff engine is still trying to open") } - resp := &tabletmanagerdatapb.VDiffResponse{ + resp = &tabletmanagerdatapb.VDiffResponse{ Id: 0, Output: nil, } // We use the db_filtered user for vreplication related work. dbClient := vde.dbClientFactoryFiltered() - if err := dbClient.Connect(); err != nil { + if err = dbClient.Connect(); err != nil { return nil, err } defer dbClient.Close() @@ -85,19 +90,19 @@ func (vde *Engine) PerformVDiffAction(ctx context.Context, req *tabletmanagerdat action := VDiffAction(req.Action) switch action { case CreateAction, ResumeAction: - if err := vde.handleCreateResumeAction(ctx, dbClient, action, req, resp); err != nil { + if err = vde.handleCreateResumeAction(ctx, dbClient, action, req, resp); err != nil { return nil, err } case ShowAction: - if err := vde.handleShowAction(ctx, dbClient, action, req, resp); err != nil { + if err = vde.handleShowAction(ctx, dbClient, action, req, resp); err != nil { return nil, err } case StopAction: - if err := vde.handleStopAction(ctx, dbClient, action, req, resp); err != nil { + if err = vde.handleStopAction(ctx, dbClient, action, req, resp); err != nil { return nil, err } case DeleteAction: - if err := vde.handleDeleteAction(ctx, dbClient, action, req, resp); err != nil { + if err = vde.handleDeleteAction(ctx, dbClient, action, req, resp); err != nil { return nil, err } default: diff --git a/go/vt/vttablet/tabletmanager/vdiff/stats.go b/go/vt/vttablet/tabletmanager/vdiff/stats.go index 3d984366bc9..28da2ceff73 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/stats.go +++ b/go/vt/vttablet/tabletmanager/vdiff/stats.go @@ -18,6 +18,7 @@ package vdiff import ( "sync" + "time" "vitess.io/vitess/go/stats" ) @@ -27,6 +28,9 @@ var ( ) func init() { + globalStats.Timings = stats.NewTimings("", "", "") + globalStats.Rates = stats.NewRates("", globalStats.Timings, 15*60/5, 5*time.Second) + globalStats.PhaseTimings = stats.NewTimings("", "", "Phase") globalStats.register() } @@ -36,11 +40,70 @@ func init() { type vdiffStats struct { mu sync.Mutex + Created *stats.Counter + CreatedByWorkflow *stats.CountersWithSingleLabel + Errors *stats.Counter + ErrorsByWorkflow *stats.CountersWithSingleLabel RestartedTableDiffs *stats.CountersWithSingleLabel + + Timings *stats.Timings // How long a VDiff run takes to complete. + PhaseTimings *stats.Timings // How long we spend in phases such as table diff initialization. + Rates *stats.Rates + + RowsDiffedPerSecond *stats.CountersWithMultiLabels } func (st *vdiffStats) register() { + globalStats.Created = stats.NewCounter("", "") + globalStats.CreatedByWorkflow = stats.NewCountersWithSingleLabel("", "", "Workflow", "") + globalStats.Errors = stats.NewCounter("", "") + globalStats.ErrorsByWorkflow = stats.NewCountersWithSingleLabel("", "", "Workflow", "") globalStats.RestartedTableDiffs = stats.NewCountersWithSingleLabel("", "", "Table", "") + globalStats.Timings = stats.NewTimings("", "", "") + globalStats.Rates = stats.NewRates("", globalStats.Timings, 15*60/5, 5*time.Second) // Pers second avg with 15 second samples + globalStats.PhaseTimings = stats.NewTimings("", "", "Phase") + + /* + bps.BulkQueryCount = stats.NewCountersWithSingleLabel("", "", "Statement", "") + bps.TrxQueryBatchCount = stats.NewCountersWithSingleLabel("", "", "Statement", "") + bps.CopyRowCount = stats.NewCounter("", "") + bps.CopyLoopCount = stats.NewCounter("", "") + bps.ErrorCounts = stats.NewCountersWithMultiLabels("", "", []string{"type"}) + bps.NoopQueryCount = stats.NewCountersWithSingleLabel("", "", "Statement", "") + bps.VReplicationLags = stats.NewTimings("", "", "") + bps.VReplicationLagRates = stats.NewRates("", bps.VReplicationLags, 15*60/5, 5*time.Second) + bps.TableCopyRowCounts = stats.NewCountersWithSingleLabel("", "", "Table", "") + bps.TableCopyTimings = stats.NewTimings("", "", "Table") + bps.PartialQueryCacheSize = stats.NewCountersWithMultiLabels("", "", []string{"type"}) + bps.PartialQueryCount = stats.NewCountersWithMultiLabels("", "", []string{"type"}) + */ + + stats.NewCounterFunc( + "VDiffErrorsTotal", + "number of errors encountered across all vdiffs", + func() int64 { + st.mu.Lock() + defer st.mu.Unlock() + return globalStats.Errors.Get() + }) + + stats.NewGaugesFuncWithMultiLabels( + "VDiffErrorsByWorkflow", + "number of errors encountered for vdiffs by vreplication workflow name", + []string{"table_name"}, + func() map[string]int64 { + st.mu.Lock() + defer st.mu.Unlock() + result := make(map[string]int64) + for label, count := range globalStats.ErrorsByWorkflow.Counts() { + if label == "" { + continue + } + result[label] = count + } + return result + }, + ) stats.NewGaugesFuncWithMultiLabels( "VDiffRestartedTableDiffsCount", @@ -59,4 +122,13 @@ func (st *vdiffStats) register() { return result }, ) + + stats.NewRateFunc( + "VDiffRowsComparedPerSecond", + "number of rows diffed per second all vdiffs", + func() map[string][]float64 { + st.mu.Lock() + defer st.mu.Unlock() + return globalStats.Rates.Get() + }) } From c964bc5ea671acad9b9201fe3c7437ba6a41198b Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 7 Feb 2024 17:41:36 -0500 Subject: [PATCH 02/15] Add additional stats Signed-off-by: Matt Lord --- go/vt/vttablet/tabletmanager/vdiff/action.go | 6 + go/vt/vttablet/tabletmanager/vdiff/engine.go | 16 +++ go/vt/vttablet/tabletmanager/vdiff/stats.go | 114 ++++++++++++------ .../tabletmanager/vdiff/table_differ.go | 5 + 4 files changed, 103 insertions(+), 38 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vdiff/action.go b/go/vt/vttablet/tabletmanager/vdiff/action.go index ca5fa976a51..6033b760a2f 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/action.go +++ b/go/vt/vttablet/tabletmanager/vdiff/action.go @@ -67,8 +67,14 @@ func (vde *Engine) PerformVDiffAction(ctx context.Context, req *tabletmanagerdat defer func() { if err != nil { globalStats.Errors.Add(1) + if req != nil { + globalStats.ErrorsByWorkflow.Add(req.Workflow, 1) + } } }() + if req == nil { + return nil, vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, "nil vdiff request") + } if !vde.isOpen { return nil, vterrors.New(vtrpcpb.Code_UNAVAILABLE, "vdiff engine is closed") } diff --git a/go/vt/vttablet/tabletmanager/vdiff/engine.go b/go/vt/vttablet/tabletmanager/vdiff/engine.go index 16e8a89d90e..f74ae55ea1b 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/engine.go +++ b/go/vt/vttablet/tabletmanager/vdiff/engine.go @@ -159,6 +159,7 @@ func (vde *Engine) openLocked(ctx context.Context) error { if err := vde.initControllers(rows); err != nil { return err } + vde.updateStats() // At this point we've fully and successfully opened so begin // retrying error'd VDiffs until the engine is closed. @@ -218,6 +219,9 @@ func (vde *Engine) addController(row sqltypes.RowNamedValues, options *tabletman row, vde.thisTablet.Alias) } vde.controllers[ct.id] = ct + globalStats.mu.Lock() + defer globalStats.mu.Unlock() + globalStats.controllers[ct.id] = ct return nil } @@ -392,4 +396,16 @@ func (vde *Engine) resetControllers() { ct.Stop() } vde.controllers = make(map[int64]*controller) + vde.updateStats() +} + +// UpdateStats must be called with lock held. +func (vre *Engine) updateStats() { + globalStats.mu.Lock() + defer globalStats.mu.Unlock() + + globalStats.controllers = make(map[int64]*controller, len(vre.controllers)) + for id, ct := range vre.controllers { + globalStats.controllers[id] = ct + } } diff --git a/go/vt/vttablet/tabletmanager/vdiff/stats.go b/go/vt/vttablet/tabletmanager/vdiff/stats.go index 28da2ceff73..b1ccb942e22 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/stats.go +++ b/go/vt/vttablet/tabletmanager/vdiff/stats.go @@ -17,10 +17,16 @@ limitations under the License. package vdiff import ( + "fmt" "sync" "time" "vitess.io/vitess/go/stats" + "vitess.io/vitess/go/vt/topo/topoproto" +) + +const ( +// Keys for the rates/timings stats map. ) var ( @@ -28,9 +34,6 @@ var ( ) func init() { - globalStats.Timings = stats.NewTimings("", "", "") - globalStats.Rates = stats.NewRates("", globalStats.Timings, 15*60/5, 5*time.Second) - globalStats.PhaseTimings = stats.NewTimings("", "", "Phase") globalStats.register() } @@ -38,45 +41,33 @@ func init() { // vdiffStats exports the stats for Engine. It's a separate structure to // prevent potential deadlocks with the mutex in Engine. type vdiffStats struct { - mu sync.Mutex - - Created *stats.Counter - CreatedByWorkflow *stats.CountersWithSingleLabel - Errors *stats.Counter - ErrorsByWorkflow *stats.CountersWithSingleLabel - RestartedTableDiffs *stats.CountersWithSingleLabel - - Timings *stats.Timings // How long a VDiff run takes to complete. + mu sync.Mutex + controllers map[int64]*controller + + Count *stats.Gauge + Errors *stats.Counter + ErrorsByWorkflow *stats.CountersWithSingleLabel + RestartedTableDiffs *stats.CountersWithSingleLabel + RowsDiffed *stats.Counter + RowsDiffedByWorkflow *stats.CountersWithSingleLabel + + DiffTimings *stats.Timings // How long a VDiff run takes to complete. + DiffRates *stats.Rates // How many things we're doing per second. PhaseTimings *stats.Timings // How long we spend in phases such as table diff initialization. - Rates *stats.Rates - - RowsDiffedPerSecond *stats.CountersWithMultiLabels } func (st *vdiffStats) register() { - globalStats.Created = stats.NewCounter("", "") - globalStats.CreatedByWorkflow = stats.NewCountersWithSingleLabel("", "", "Workflow", "") + globalStats.Count = stats.NewGauge("", "") globalStats.Errors = stats.NewCounter("", "") globalStats.ErrorsByWorkflow = stats.NewCountersWithSingleLabel("", "", "Workflow", "") globalStats.RestartedTableDiffs = stats.NewCountersWithSingleLabel("", "", "Table", "") - globalStats.Timings = stats.NewTimings("", "", "") - globalStats.Rates = stats.NewRates("", globalStats.Timings, 15*60/5, 5*time.Second) // Pers second avg with 15 second samples - globalStats.PhaseTimings = stats.NewTimings("", "", "Phase") - - /* - bps.BulkQueryCount = stats.NewCountersWithSingleLabel("", "", "Statement", "") - bps.TrxQueryBatchCount = stats.NewCountersWithSingleLabel("", "", "Statement", "") - bps.CopyRowCount = stats.NewCounter("", "") - bps.CopyLoopCount = stats.NewCounter("", "") - bps.ErrorCounts = stats.NewCountersWithMultiLabels("", "", []string{"type"}) - bps.NoopQueryCount = stats.NewCountersWithSingleLabel("", "", "Statement", "") - bps.VReplicationLags = stats.NewTimings("", "", "") - bps.VReplicationLagRates = stats.NewRates("", bps.VReplicationLags, 15*60/5, 5*time.Second) - bps.TableCopyRowCounts = stats.NewCountersWithSingleLabel("", "", "Table", "") - bps.TableCopyTimings = stats.NewTimings("", "", "Table") - bps.PartialQueryCacheSize = stats.NewCountersWithMultiLabels("", "", []string{"type"}) - bps.PartialQueryCount = stats.NewCountersWithMultiLabels("", "", []string{"type"}) - */ + globalStats.RowsDiffed = stats.NewCounter("", "") + globalStats.RowsDiffedByWorkflow = stats.NewCountersWithSingleLabel("", "", "Workflow", "") + globalStats.DiffTimings = stats.NewTimings("", "", "") + globalStats.DiffRates = stats.NewRates("", globalStats.DiffTimings, 15*60/5, 5*time.Second) // Pers second avg with 15 second samples + globalStats.PhaseTimings = stats.NewTimings("VDiffPhaseTimings", "vdiff per phase timings", "Phase") + + stats.NewGaugeFunc("VDiffCount", "Number of current vdiffs", st.numControllers) stats.NewCounterFunc( "VDiffErrorsTotal", @@ -123,12 +114,59 @@ func (st *vdiffStats) register() { }, ) + stats.NewCounterFunc( + "VDiffRowsCompared", + "number of rows compared across all vdiffs", + func() int64 { + st.mu.Lock() + defer st.mu.Unlock() + return globalStats.RowsDiffed.Get() + }) + + stats.NewGaugesFuncWithMultiLabels( + "VDiffRowsComparedByWorkflow", + "number of rows compared by vreplication workflow name", + []string{"table_name"}, + func() map[string]int64 { + st.mu.Lock() + defer st.mu.Unlock() + result := make(map[string]int64) + for label, count := range globalStats.RowsDiffedByWorkflow.Counts() { + if label == "" { + continue + } + result[label] = count + } + return result + }, + ) + stats.NewRateFunc( - "VDiffRowsComparedPerSecond", - "number of rows diffed per second all vdiffs", + "VDiffActionsPerSecond", + "number of actions per second across all vdiffs", func() map[string][]float64 { st.mu.Lock() defer st.mu.Unlock() - return globalStats.Rates.Get() + return globalStats.DiffRates.Get() }) + + stats.Publish("VDiffStreamingTablets", stats.StringMapFunc(func() map[string]string { + st.mu.Lock() + defer st.mu.Unlock() + result := make(map[string]string, len(st.controllers)) + for _, ct := range st.controllers { + tt := topoproto.TabletAliasString(ct.targetShardStreamer.tablet.Alias) + for _, s := range ct.sources { + result[fmt.Sprintf("%s.%s.%s", ct.workflow, ct.uuid, s.shard)] = + fmt.Sprintf("source:%s;target:%s", topoproto.TabletAliasString(s.tablet.Alias), tt) + } + } + return result + })) +} + +func (st *vdiffStats) numControllers() int64 { + st.mu.Lock() + defer st.mu.Unlock() + return int64(len(st.controllers)) } diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index d658fea2a25..98e0a9809d5 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -90,6 +90,7 @@ func newTableDiffer(wd *workflowDiffer, table *tabletmanagerdatapb.TableDefiniti // initialize func (td *tableDiffer) initialize(ctx context.Context) error { + defer globalStats.PhaseTimings.Record("initialize", time.Now()) vdiffEngine := td.wd.ct.vde vdiffEngine.snapshotMu.Lock() defer vdiffEngine.snapshotMu.Unlock() @@ -467,6 +468,7 @@ func (td *tableDiffer) setupRowSorters() { } func (td *tableDiffer) diff(ctx context.Context, rowsToCompare int64, debug, onlyPks bool, maxExtraRowsToCompare int64, maxReportSampleRows int64, stop <-chan time.Time) (*DiffReport, error) { + globalStats.PhaseTimings.Record("diff", time.Now()) dbClient := td.wd.ct.dbClientFactory() if err := dbClient.Connect(); err != nil { return nil, err @@ -664,6 +666,7 @@ func (td *tableDiffer) diff(ctx context.Context, rowsToCompare int64, debug, onl } func (td *tableDiffer) compare(sourceRow, targetRow []sqltypes.Value, cols []compareColInfo, compareOnlyNonPKs bool) (int, error) { + globalStats.DiffTimings.Record("RowDiff", time.Now()) for _, col := range cols { if col.isPK && compareOnlyNonPKs { continue @@ -738,6 +741,8 @@ func (td *tableDiffer) updateTableProgress(dbClient binlogplayer.DBClient, dr *D return err } } + globalStats.RowsDiffed.Add(dr.ProcessedRows) + globalStats.RowsDiffedByWorkflow.Add(td.wd.ct.workflow, dr.ProcessedRows) if _, err := dbClient.ExecuteFetch(query, 1); err != nil { return err } From f378b4bae517185360c1f901d804be9e4f37be4a Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 8 Feb 2024 14:44:13 -0500 Subject: [PATCH 03/15] Additional stats and arrangement Signed-off-by: Matt Lord --- go/vt/vttablet/tabletmanager/vdiff/action.go | 18 +-- .../tabletmanager/vdiff/controller.go | 29 ++-- go/vt/vttablet/tabletmanager/vdiff/stats.go | 131 +++++++++--------- .../tabletmanager/vdiff/table_differ.go | 17 ++- .../tabletmanager/vdiff/workflow_differ.go | 7 +- 5 files changed, 108 insertions(+), 94 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vdiff/action.go b/go/vt/vttablet/tabletmanager/vdiff/action.go index 6033b760a2f..0b9dd6f45ed 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/action.go +++ b/go/vt/vttablet/tabletmanager/vdiff/action.go @@ -66,10 +66,7 @@ var ( func (vde *Engine) PerformVDiffAction(ctx context.Context, req *tabletmanagerdatapb.VDiffRequest) (resp *tabletmanagerdatapb.VDiffResponse, err error) { defer func() { if err != nil { - globalStats.Errors.Add(1) - if req != nil { - globalStats.ErrorsByWorkflow.Add(req.Workflow, 1) - } + globalStats.ErrorCount.Add(1) } }() if req == nil { @@ -88,7 +85,7 @@ func (vde *Engine) PerformVDiffAction(ctx context.Context, req *tabletmanagerdat } // We use the db_filtered user for vreplication related work. dbClient := vde.dbClientFactoryFiltered() - if err = dbClient.Connect(); err != nil { + if err := dbClient.Connect(); err != nil { return nil, err } defer dbClient.Close() @@ -96,19 +93,19 @@ func (vde *Engine) PerformVDiffAction(ctx context.Context, req *tabletmanagerdat action := VDiffAction(req.Action) switch action { case CreateAction, ResumeAction: - if err = vde.handleCreateResumeAction(ctx, dbClient, action, req, resp); err != nil { + if err := vde.handleCreateResumeAction(ctx, dbClient, action, req, resp); err != nil { return nil, err } case ShowAction: - if err = vde.handleShowAction(ctx, dbClient, action, req, resp); err != nil { + if err := vde.handleShowAction(ctx, dbClient, action, req, resp); err != nil { return nil, err } case StopAction: - if err = vde.handleStopAction(ctx, dbClient, action, req, resp); err != nil { + if err := vde.handleStopAction(ctx, dbClient, action, req, resp); err != nil { return nil, err } case DeleteAction: - if err = vde.handleDeleteAction(ctx, dbClient, action, req, resp); err != nil { + if err := vde.handleDeleteAction(ctx, dbClient, action, req, resp); err != nil { return nil, err } default: @@ -379,6 +376,9 @@ func (vde *Engine) handleDeleteAction(ctx context.Context, dbClient binlogplayer } controller.Stop() delete(vde.controllers, controller.id) + globalStats.mu.Lock() + defer globalStats.mu.Unlock() + delete(globalStats.controllers, controller.id) } switch req.ActionArg { diff --git a/go/vt/vttablet/tabletmanager/vdiff/controller.go b/go/vt/vttablet/tabletmanager/vdiff/controller.go index 1c50c0597ef..cfe449fd265 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/controller.go +++ b/go/vt/vttablet/tabletmanager/vdiff/controller.go @@ -27,6 +27,7 @@ import ( "vitess.io/vitess/go/mysql/replication" "vitess.io/vitess/go/sqltypes" + "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/proto/tabletmanagerdata" @@ -75,6 +76,11 @@ type controller struct { sourceTimeZone, targetTimeZone string // Named time zones if conversions are necessary for datetime values externalCluster string // For Mount+Migrate + + // Information used in vdiff stats/metrics. + ErrorCounts *stats.CountersWithMultiLabels + TableDiffRowCounts *stats.CountersWithMultiLabels + TableDiffPhaseTimings *stats.Timings } func newController(ctx context.Context, row sqltypes.RowNamedValues, dbClientFactory func() binlogplayer.DBClient, @@ -84,16 +90,19 @@ func newController(ctx context.Context, row sqltypes.RowNamedValues, dbClientFac id, _ := row["id"].ToInt64() ct := &controller{ - id: id, - uuid: row["vdiff_uuid"].ToString(), - workflow: row["workflow"].ToString(), - dbClientFactory: dbClientFactory, - ts: ts, - vde: vde, - done: make(chan struct{}), - tmc: vde.tmClientFactory(), - sources: make(map[string]*migrationSource), - options: options, + id: id, + uuid: row["vdiff_uuid"].ToString(), + workflow: row["workflow"].ToString(), + dbClientFactory: dbClientFactory, + ts: ts, + vde: vde, + done: make(chan struct{}), + tmc: vde.tmClientFactory(), + sources: make(map[string]*migrationSource), + options: options, + ErrorCounts: stats.NewCountersWithMultiLabels("", "", []string{"Error"}), + TableDiffRowCounts: stats.NewCountersWithMultiLabels("", "", []string{"Rows"}), + TableDiffPhaseTimings: stats.NewTimings("", "", "", "TablePhase"), } ctx, ct.cancel = context.WithCancel(ctx) go ct.run(ctx) diff --git a/go/vt/vttablet/tabletmanager/vdiff/stats.go b/go/vt/vttablet/tabletmanager/vdiff/stats.go index b1ccb942e22..8d5c3de8505 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/stats.go +++ b/go/vt/vttablet/tabletmanager/vdiff/stats.go @@ -19,16 +19,11 @@ package vdiff import ( "fmt" "sync" - "time" "vitess.io/vitess/go/stats" "vitess.io/vitess/go/vt/topo/topoproto" ) -const ( -// Keys for the rates/timings stats map. -) - var ( globalStats = &vdiffStats{} ) @@ -44,58 +39,29 @@ type vdiffStats struct { mu sync.Mutex controllers map[int64]*controller - Count *stats.Gauge - Errors *stats.Counter - ErrorsByWorkflow *stats.CountersWithSingleLabel - RestartedTableDiffs *stats.CountersWithSingleLabel - RowsDiffed *stats.Counter - RowsDiffedByWorkflow *stats.CountersWithSingleLabel - - DiffTimings *stats.Timings // How long a VDiff run takes to complete. - DiffRates *stats.Rates // How many things we're doing per second. - PhaseTimings *stats.Timings // How long we spend in phases such as table diff initialization. + Count *stats.Gauge + ErrorCount *stats.Counter + RestartedTableDiffs *stats.CountersWithSingleLabel + RowsDiffedCount *stats.Counter } func (st *vdiffStats) register() { globalStats.Count = stats.NewGauge("", "") - globalStats.Errors = stats.NewCounter("", "") - globalStats.ErrorsByWorkflow = stats.NewCountersWithSingleLabel("", "", "Workflow", "") + globalStats.ErrorCount = stats.NewCounter("", "") globalStats.RestartedTableDiffs = stats.NewCountersWithSingleLabel("", "", "Table", "") - globalStats.RowsDiffed = stats.NewCounter("", "") - globalStats.RowsDiffedByWorkflow = stats.NewCountersWithSingleLabel("", "", "Workflow", "") - globalStats.DiffTimings = stats.NewTimings("", "", "") - globalStats.DiffRates = stats.NewRates("", globalStats.DiffTimings, 15*60/5, 5*time.Second) // Pers second avg with 15 second samples - globalStats.PhaseTimings = stats.NewTimings("VDiffPhaseTimings", "vdiff per phase timings", "Phase") + globalStats.RowsDiffedCount = stats.NewCounter("", "") stats.NewGaugeFunc("VDiffCount", "Number of current vdiffs", st.numControllers) stats.NewCounterFunc( - "VDiffErrorsTotal", + "VDiffErrorCountTotal", "number of errors encountered across all vdiffs", func() int64 { st.mu.Lock() defer st.mu.Unlock() - return globalStats.Errors.Get() + return globalStats.ErrorCount.Get() }) - stats.NewGaugesFuncWithMultiLabels( - "VDiffErrorsByWorkflow", - "number of errors encountered for vdiffs by vreplication workflow name", - []string{"table_name"}, - func() map[string]int64 { - st.mu.Lock() - defer st.mu.Unlock() - result := make(map[string]int64) - for label, count := range globalStats.ErrorsByWorkflow.Counts() { - if label == "" { - continue - } - result[label] = count - } - return result - }, - ) - stats.NewGaugesFuncWithMultiLabels( "VDiffRestartedTableDiffsCount", "vdiff table diffs restarted due to max-diff-duration counts per table", @@ -115,54 +81,81 @@ func (st *vdiffStats) register() { ) stats.NewCounterFunc( - "VDiffRowsCompared", + "VDiffRowsComparedTotal", "number of rows compared across all vdiffs", func() int64 { st.mu.Lock() defer st.mu.Unlock() - return globalStats.RowsDiffed.Get() + return globalStats.RowsDiffedCount.Get() }) stats.NewGaugesFuncWithMultiLabels( - "VDiffRowsComparedByWorkflow", - "number of rows compared by vreplication workflow name", - []string{"table_name"}, + "VDiffRowsCompared", + "live number of rows compared per vdiff by table", + []string{"workflow", "uuid", "table"}, func() map[string]int64 { st.mu.Lock() defer st.mu.Unlock() - result := make(map[string]int64) - for label, count := range globalStats.RowsDiffedByWorkflow.Counts() { - if label == "" { - continue + result := make(map[string]int64, len(st.controllers)) + for _, ct := range st.controllers { + for key, val := range ct.TableDiffRowCounts.Counts() { + result[fmt.Sprintf("%s.%s.%s", ct.workflow, ct.uuid, key)] = val } - result[label] = count } return result }, ) - stats.NewRateFunc( - "VDiffActionsPerSecond", - "number of actions per second across all vdiffs", - func() map[string][]float64 { + stats.NewStringMapFuncWithMultiLabels( + "VDiffStreamingTablets", + "latest tablets used on the source and target for streaming table data", + []string{"workflow", "uuid", "target_shard"}, + "tablets", + func() map[string]string { st.mu.Lock() defer st.mu.Unlock() - return globalStats.DiffRates.Get() + result := make(map[string]string, len(st.controllers)) + for _, ct := range st.controllers { + tt := topoproto.TabletAliasString(ct.targetShardStreamer.tablet.Alias) + for _, s := range ct.sources { + result[fmt.Sprintf("%s.%s.%s", ct.workflow, ct.uuid, s.shard)] = + fmt.Sprintf("source:%s,target:%s", topoproto.TabletAliasString(s.tablet.Alias), tt) + } + } + return result }) - stats.Publish("VDiffStreamingTablets", stats.StringMapFunc(func() map[string]string { - st.mu.Lock() - defer st.mu.Unlock() - result := make(map[string]string, len(st.controllers)) - for _, ct := range st.controllers { - tt := topoproto.TabletAliasString(ct.targetShardStreamer.tablet.Alias) - for _, s := range ct.sources { - result[fmt.Sprintf("%s.%s.%s", ct.workflow, ct.uuid, s.shard)] = - fmt.Sprintf("source:%s;target:%s", topoproto.TabletAliasString(s.tablet.Alias), tt) + stats.NewCountersFuncWithMultiLabels( + "VDiffErrors", + "count of specific errors seen when performing a vdiff", + []string{"workflow", "uuid", "error"}, + func() map[string]int64 { + st.mu.Lock() + defer st.mu.Unlock() + result := make(map[string]int64, len(st.controllers)) + for _, ct := range st.controllers { + for key, val := range ct.ErrorCounts.Counts() { + result[fmt.Sprintf("%s.%s.%s", ct.workflow, ct.uuid, key)] = val + } } - } - return result - })) + return result + }) + + stats.NewGaugesFuncWithMultiLabels( + "VDiffPhaseTimings", + "vdiff phase timings", + []string{"workflow", "uuid", "table", "phase"}, + func() map[string]int64 { + st.mu.Lock() + defer st.mu.Unlock() + result := make(map[string]int64, len(st.controllers)) + for _, ct := range st.controllers { + for tablePhase, h := range ct.TableDiffPhaseTimings.Histograms() { + result[fmt.Sprintf("%s.%s.%s", ct.workflow, ct.uuid, tablePhase)] = h.Total() + } + } + return result + }) } func (st *vdiffStats) numControllers() int64 { diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index 98e0a9809d5..a9690053c1b 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -90,7 +90,7 @@ func newTableDiffer(wd *workflowDiffer, table *tabletmanagerdatapb.TableDefiniti // initialize func (td *tableDiffer) initialize(ctx context.Context) error { - defer globalStats.PhaseTimings.Record("initialize", time.Now()) + defer td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.initializing", td.table.Name), time.Now()) vdiffEngine := td.wd.ct.vde vdiffEngine.snapshotMu.Lock() defer vdiffEngine.snapshotMu.Unlock() @@ -213,6 +213,7 @@ func (td *tableDiffer) forEachSource(cb func(source *migrationSource) error) err } func (td *tableDiffer) selectTablets(ctx context.Context) error { + td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.picking_streaming_tablets", td.table.Name), time.Now()) var ( wg sync.WaitGroup sourceErr, targetErr error @@ -288,6 +289,7 @@ func (td *tableDiffer) pickTablet(ctx context.Context, ts *topo.Server, cells [] } func (td *tableDiffer) syncSourceStreams(ctx context.Context) error { + td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.syncing_source_streams", td.table.Name), time.Now()) // source can be replica, wait for them to at least reach max gtid of all target streams ct := td.wd.ct waitCtx, cancel := context.WithTimeout(ctx, time.Duration(ct.options.CoreOptions.TimeoutSeconds*int64(time.Second))) @@ -306,6 +308,7 @@ func (td *tableDiffer) syncSourceStreams(ctx context.Context) error { } func (td *tableDiffer) syncTargetStreams(ctx context.Context) error { + td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.syncing_target_streams", td.table.Name), time.Now()) ct := td.wd.ct waitCtx, cancel := context.WithTimeout(ctx, time.Duration(ct.options.CoreOptions.TimeoutSeconds*int64(time.Second))) defer cancel() @@ -328,6 +331,7 @@ func (td *tableDiffer) syncTargetStreams(ctx context.Context) error { } func (td *tableDiffer) startTargetDataStream(ctx context.Context) error { + td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.starting_target_data_streams", td.table.Name), time.Now()) ct := td.wd.ct gtidch := make(chan string, 1) ct.targetShardStreamer.result = make(chan *sqltypes.Result, 1) @@ -342,6 +346,7 @@ func (td *tableDiffer) startTargetDataStream(ctx context.Context) error { } func (td *tableDiffer) startSourceDataStreams(ctx context.Context) error { + td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.starting_source_data_streams", td.table.Name), time.Now()) if err := td.forEachSource(func(source *migrationSource) error { gtidch := make(chan string, 1) source.result = make(chan *sqltypes.Result, 1) @@ -360,6 +365,7 @@ func (td *tableDiffer) startSourceDataStreams(ctx context.Context) error { } func (td *tableDiffer) restartTargetVReplicationStreams(ctx context.Context) error { + td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.restarting_vreplication_streams", td.table.Name), time.Now()) ct := td.wd.ct query := fmt.Sprintf("update _vt.vreplication set state='Running', message='', stop_pos='' where db_name=%s and workflow=%s", encodeString(ct.vde.dbName), encodeString(ct.workflow)) @@ -468,7 +474,7 @@ func (td *tableDiffer) setupRowSorters() { } func (td *tableDiffer) diff(ctx context.Context, rowsToCompare int64, debug, onlyPks bool, maxExtraRowsToCompare int64, maxReportSampleRows int64, stop <-chan time.Time) (*DiffReport, error) { - globalStats.PhaseTimings.Record("diff", time.Now()) + td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.diffing_table", td.table.Name), time.Now()) dbClient := td.wd.ct.dbClientFactory() if err := dbClient.Connect(); err != nil { return nil, err @@ -517,6 +523,7 @@ func (td *tableDiffer) diff(ctx context.Context, rowsToCompare int64, debug, onl if err := td.updateTableProgress(dbClient, dr, lastProcessedRow); err != nil { log.Errorf("Failed to update vdiff progress on %s table: %v", td.table.Name, err) } + globalStats.RowsDiffedCount.Add(dr.ProcessedRows) }() for { @@ -666,7 +673,6 @@ func (td *tableDiffer) diff(ctx context.Context, rowsToCompare int64, debug, onl } func (td *tableDiffer) compare(sourceRow, targetRow []sqltypes.Value, cols []compareColInfo, compareOnlyNonPKs bool) (int, error) { - globalStats.DiffTimings.Record("RowDiff", time.Now()) for _, col := range cols { if col.isPK && compareOnlyNonPKs { continue @@ -741,11 +747,12 @@ func (td *tableDiffer) updateTableProgress(dbClient binlogplayer.DBClient, dr *D return err } } - globalStats.RowsDiffed.Add(dr.ProcessedRows) - globalStats.RowsDiffedByWorkflow.Add(td.wd.ct.workflow, dr.ProcessedRows) if _, err := dbClient.ExecuteFetch(query, 1); err != nil { return err } + // We have to reset first as the value is not incremental. + td.wd.ct.TableDiffRowCounts.Reset([]string{td.table.Name}) + td.wd.ct.TableDiffRowCounts.Add([]string{td.table.Name}, dr.ProcessedRows) return nil } diff --git a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go index f477e88406e..118b9b74e88 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go @@ -235,7 +235,12 @@ func (wd *workflowDiffer) diffTable(ctx context.Context, dbClient binlogplayer.D return nil } -func (wd *workflowDiffer) diff(ctx context.Context) error { +func (wd *workflowDiffer) diff(ctx context.Context) (err error) { + defer func() { + if err != nil { + wd.ct.ErrorCounts.Add([]string{err.Error()}, 1) + } + }() dbClient := wd.ct.dbClientFactory() if err := dbClient.Connect(); err != nil { return err From 4b0c8908d123dcc99fa5913eb4ea2831783dc270 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 8 Feb 2024 19:41:22 -0500 Subject: [PATCH 04/15] Fix unit test Signed-off-by: Matt Lord --- .../tabletmanager/vdiff/action_test.go | 5 ++ go/vt/vttablet/tabletmanager/vdiff/stats.go | 70 +++++++++---------- 2 files changed, 40 insertions(+), 35 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vdiff/action_test.go b/go/vt/vttablet/tabletmanager/vdiff/action_test.go index 1049bc8607d..e1db797f8ab 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/action_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/action_test.go @@ -56,8 +56,13 @@ func TestPerformVDiffAction(t *testing.T) { expectQueries []queryAndResult wantErr error }{ + { + name: "nil request", + wantErr: vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, "nil vdiff request"), + }, { name: "engine not open", + req: &tabletmanagerdatapb.VDiffRequest{}, vde: &Engine{isOpen: false}, wantErr: vterrors.New(vtrpcpb.Code_UNAVAILABLE, "vdiff engine is closed"), }, diff --git a/go/vt/vttablet/tabletmanager/vdiff/stats.go b/go/vt/vttablet/tabletmanager/vdiff/stats.go index 8d5c3de8505..87870463014 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/stats.go +++ b/go/vt/vttablet/tabletmanager/vdiff/stats.go @@ -45,30 +45,30 @@ type vdiffStats struct { RowsDiffedCount *stats.Counter } -func (st *vdiffStats) register() { +func (vds *vdiffStats) register() { globalStats.Count = stats.NewGauge("", "") globalStats.ErrorCount = stats.NewCounter("", "") globalStats.RestartedTableDiffs = stats.NewCountersWithSingleLabel("", "", "Table", "") globalStats.RowsDiffedCount = stats.NewCounter("", "") - stats.NewGaugeFunc("VDiffCount", "Number of current vdiffs", st.numControllers) + stats.NewGaugeFunc("VDiffCount", "Number of current vdiffs", vds.numControllers) stats.NewCounterFunc( "VDiffErrorCountTotal", - "number of errors encountered across all vdiffs", + "Number of errors encountered across all vdiffs", func() int64 { - st.mu.Lock() - defer st.mu.Unlock() + vds.mu.Lock() + defer vds.mu.Unlock() return globalStats.ErrorCount.Get() }) stats.NewGaugesFuncWithMultiLabels( "VDiffRestartedTableDiffsCount", - "vdiff table diffs restarted due to max-diff-duration counts per table", + "Table diffs restarted due to max-diff-duration counts per table", []string{"table_name"}, func() map[string]int64 { - st.mu.Lock() - defer st.mu.Unlock() + vds.mu.Lock() + defer vds.mu.Unlock() result := make(map[string]int64) for label, count := range globalStats.RestartedTableDiffs.Counts() { if label == "" { @@ -82,22 +82,22 @@ func (st *vdiffStats) register() { stats.NewCounterFunc( "VDiffRowsComparedTotal", - "number of rows compared across all vdiffs", + "Number of rows compared across all vdiffs", func() int64 { - st.mu.Lock() - defer st.mu.Unlock() + vds.mu.Lock() + defer vds.mu.Unlock() return globalStats.RowsDiffedCount.Get() }) stats.NewGaugesFuncWithMultiLabels( "VDiffRowsCompared", - "live number of rows compared per vdiff by table", + "Live number of rows compared per vdiff by table", []string{"workflow", "uuid", "table"}, func() map[string]int64 { - st.mu.Lock() - defer st.mu.Unlock() - result := make(map[string]int64, len(st.controllers)) - for _, ct := range st.controllers { + vds.mu.Lock() + defer vds.mu.Unlock() + result := make(map[string]int64, len(vds.controllers)) + for _, ct := range vds.controllers { for key, val := range ct.TableDiffRowCounts.Counts() { result[fmt.Sprintf("%s.%s.%s", ct.workflow, ct.uuid, key)] = val } @@ -108,14 +108,14 @@ func (st *vdiffStats) register() { stats.NewStringMapFuncWithMultiLabels( "VDiffStreamingTablets", - "latest tablets used on the source and target for streaming table data", + "Latest tablets used on the source and target for streaming table data", []string{"workflow", "uuid", "target_shard"}, "tablets", func() map[string]string { - st.mu.Lock() - defer st.mu.Unlock() - result := make(map[string]string, len(st.controllers)) - for _, ct := range st.controllers { + vds.mu.Lock() + defer vds.mu.Unlock() + result := make(map[string]string, len(vds.controllers)) + for _, ct := range vds.controllers { tt := topoproto.TabletAliasString(ct.targetShardStreamer.tablet.Alias) for _, s := range ct.sources { result[fmt.Sprintf("%s.%s.%s", ct.workflow, ct.uuid, s.shard)] = @@ -127,13 +127,13 @@ func (st *vdiffStats) register() { stats.NewCountersFuncWithMultiLabels( "VDiffErrors", - "count of specific errors seen when performing a vdiff", + "Count of specific errors seen when performing a vdiff", []string{"workflow", "uuid", "error"}, func() map[string]int64 { - st.mu.Lock() - defer st.mu.Unlock() - result := make(map[string]int64, len(st.controllers)) - for _, ct := range st.controllers { + vds.mu.Lock() + defer vds.mu.Unlock() + result := make(map[string]int64, len(vds.controllers)) + for _, ct := range vds.controllers { for key, val := range ct.ErrorCounts.Counts() { result[fmt.Sprintf("%s.%s.%s", ct.workflow, ct.uuid, key)] = val } @@ -143,13 +143,13 @@ func (st *vdiffStats) register() { stats.NewGaugesFuncWithMultiLabels( "VDiffPhaseTimings", - "vdiff phase timings", + "VDiff phase timings", []string{"workflow", "uuid", "table", "phase"}, func() map[string]int64 { - st.mu.Lock() - defer st.mu.Unlock() - result := make(map[string]int64, len(st.controllers)) - for _, ct := range st.controllers { + vds.mu.Lock() + defer vds.mu.Unlock() + result := make(map[string]int64, len(vds.controllers)) + for _, ct := range vds.controllers { for tablePhase, h := range ct.TableDiffPhaseTimings.Histograms() { result[fmt.Sprintf("%s.%s.%s", ct.workflow, ct.uuid, tablePhase)] = h.Total() } @@ -158,8 +158,8 @@ func (st *vdiffStats) register() { }) } -func (st *vdiffStats) numControllers() int64 { - st.mu.Lock() - defer st.mu.Unlock() - return int64(len(st.controllers)) +func (vds *vdiffStats) numControllers() int64 { + vds.mu.Lock() + defer vds.mu.Unlock() + return int64(len(vds.controllers)) } From 876fc1134ad86c5e3234dffd521e92350736e71e Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 8 Feb 2024 23:36:35 -0500 Subject: [PATCH 05/15] Minor changes and fixups Signed-off-by: Matt Lord --- go/test/endtoend/vreplication/vdiff2_test.go | 4 ++++ go/vt/vttablet/tabletmanager/vdiff/controller.go | 4 ++-- go/vt/vttablet/tabletmanager/vdiff/engine.go | 13 ++++++------- go/vt/vttablet/tabletmanager/vdiff/stats.go | 4 ++-- .../vttablet/tabletmanager/vdiff/workflow_differ.go | 3 ++- 5 files changed, 16 insertions(+), 12 deletions(-) diff --git a/go/test/endtoend/vreplication/vdiff2_test.go b/go/test/endtoend/vreplication/vdiff2_test.go index 7f881c1f21b..4313b13ef5c 100644 --- a/go/test/endtoend/vreplication/vdiff2_test.go +++ b/go/test/endtoend/vreplication/vdiff2_test.go @@ -358,6 +358,10 @@ func testCLIFlagHandling(t *testing.T, targetKs, workflowName string, cell *Cell // Confirm that the options were passed through and saved correctly. query := sqlparser.BuildParsedQuery("select options from %s.vdiff where vdiff_uuid = %s", sidecarDBIdentifier, encodeString(vduuid.String())).Query + // TODO (mlord): figure out why for a brief moment (N us) the debug/vars value + // produced for TabletStateName becomes NOT_SERVING after the vdiff is created + // due to the tabletmanager returning "Not connected to mysql". + time.Sleep(100 * time.Millisecond) tablets := vc.getVttabletsInKeyspace(t, cell, targetKs, "PRIMARY") require.Greater(t, len(tablets), 0, "no primary tablets found in keyspace %s", targetKs) tablet := maps.Values(tablets)[0] diff --git a/go/vt/vttablet/tabletmanager/vdiff/controller.go b/go/vt/vttablet/tabletmanager/vdiff/controller.go index cfe449fd265..0265e8a0a35 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/controller.go +++ b/go/vt/vttablet/tabletmanager/vdiff/controller.go @@ -78,7 +78,7 @@ type controller struct { externalCluster string // For Mount+Migrate // Information used in vdiff stats/metrics. - ErrorCounts *stats.CountersWithMultiLabels + Errors *stats.CountersWithMultiLabels TableDiffRowCounts *stats.CountersWithMultiLabels TableDiffPhaseTimings *stats.Timings } @@ -100,7 +100,7 @@ func newController(ctx context.Context, row sqltypes.RowNamedValues, dbClientFac tmc: vde.tmClientFactory(), sources: make(map[string]*migrationSource), options: options, - ErrorCounts: stats.NewCountersWithMultiLabels("", "", []string{"Error"}), + Errors: stats.NewCountersWithMultiLabels("", "", []string{"Error"}), TableDiffRowCounts: stats.NewCountersWithMultiLabels("", "", []string{"Rows"}), TableDiffPhaseTimings: stats.NewTimings("", "", "", "TablePhase"), } diff --git a/go/vt/vttablet/tabletmanager/vdiff/engine.go b/go/vt/vttablet/tabletmanager/vdiff/engine.go index f74ae55ea1b..7db8cc6aa9a 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/engine.go +++ b/go/vt/vttablet/tabletmanager/vdiff/engine.go @@ -26,17 +26,16 @@ import ( "vitess.io/vitess/go/mysql/collations" "vitess.io/vitess/go/mysql/sqlerror" - "vitess.io/vitess/go/vt/proto/tabletmanagerdata" - "vitess.io/vitess/go/vt/proto/topodata" - "vitess.io/vitess/go/vt/sqlparser" - "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" - "vitess.io/vitess/go/vt/vttablet/tmclient" - "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/vt/binlog/binlogplayer" "vitess.io/vitess/go/vt/dbconfigs" "vitess.io/vitess/go/vt/log" + "vitess.io/vitess/go/vt/proto/tabletmanagerdata" + "vitess.io/vitess/go/vt/proto/topodata" + "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/topo" + "vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication" + "vitess.io/vitess/go/vt/vttablet/tmclient" ) type Engine struct { @@ -399,7 +398,7 @@ func (vde *Engine) resetControllers() { vde.updateStats() } -// UpdateStats must be called with lock held. +// UpdateStats must be called while holding the engine lock. func (vre *Engine) updateStats() { globalStats.mu.Lock() defer globalStats.mu.Unlock() diff --git a/go/vt/vttablet/tabletmanager/vdiff/stats.go b/go/vt/vttablet/tabletmanager/vdiff/stats.go index 87870463014..1c358db0c74 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/stats.go +++ b/go/vt/vttablet/tabletmanager/vdiff/stats.go @@ -55,7 +55,7 @@ func (vds *vdiffStats) register() { stats.NewCounterFunc( "VDiffErrorCountTotal", - "Number of errors encountered across all vdiffs", + "Number of errors encountered across all vdiff actions", func() int64 { vds.mu.Lock() defer vds.mu.Unlock() @@ -134,7 +134,7 @@ func (vds *vdiffStats) register() { defer vds.mu.Unlock() result := make(map[string]int64, len(vds.controllers)) for _, ct := range vds.controllers { - for key, val := range ct.ErrorCounts.Counts() { + for key, val := range ct.Errors.Counts() { result[fmt.Sprintf("%s.%s.%s", ct.workflow, ct.uuid, key)] = val } } diff --git a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go index 118b9b74e88..97d2bd387cb 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/workflow_differ.go @@ -238,7 +238,8 @@ func (wd *workflowDiffer) diffTable(ctx context.Context, dbClient binlogplayer.D func (wd *workflowDiffer) diff(ctx context.Context) (err error) { defer func() { if err != nil { - wd.ct.ErrorCounts.Add([]string{err.Error()}, 1) + globalStats.ErrorCount.Add(1) + wd.ct.Errors.Add([]string{err.Error()}, 1) } }() dbClient := wd.ct.dbClientFactory() From 15a051ea2471c34fd9d6bd4171bddae6d84b92d8 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 9 Feb 2024 15:16:23 -0500 Subject: [PATCH 06/15] Fix stat bug Signed-off-by: Matt Lord --- go/test/endtoend/vreplication/vdiff2_test.go | 4 ---- go/vt/vttablet/tabletmanager/vdiff/stats.go | 8 ++++++++ 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/go/test/endtoend/vreplication/vdiff2_test.go b/go/test/endtoend/vreplication/vdiff2_test.go index 4313b13ef5c..7f881c1f21b 100644 --- a/go/test/endtoend/vreplication/vdiff2_test.go +++ b/go/test/endtoend/vreplication/vdiff2_test.go @@ -358,10 +358,6 @@ func testCLIFlagHandling(t *testing.T, targetKs, workflowName string, cell *Cell // Confirm that the options were passed through and saved correctly. query := sqlparser.BuildParsedQuery("select options from %s.vdiff where vdiff_uuid = %s", sidecarDBIdentifier, encodeString(vduuid.String())).Query - // TODO (mlord): figure out why for a brief moment (N us) the debug/vars value - // produced for TabletStateName becomes NOT_SERVING after the vdiff is created - // due to the tabletmanager returning "Not connected to mysql". - time.Sleep(100 * time.Millisecond) tablets := vc.getVttabletsInKeyspace(t, cell, targetKs, "PRIMARY") require.Greater(t, len(tablets), 0, "no primary tablets found in keyspace %s", targetKs) tablet := maps.Values(tablets)[0] diff --git a/go/vt/vttablet/tabletmanager/vdiff/stats.go b/go/vt/vttablet/tabletmanager/vdiff/stats.go index 1c358db0c74..6bfcdd49ab2 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/stats.go +++ b/go/vt/vttablet/tabletmanager/vdiff/stats.go @@ -116,8 +116,16 @@ func (vds *vdiffStats) register() { defer vds.mu.Unlock() result := make(map[string]string, len(vds.controllers)) for _, ct := range vds.controllers { + if ct.targetShardStreamer == nil || ct.targetShardStreamer.tablet == nil { + // We haven't yet chosen the target shard streamer, so skip it. + continue + } tt := topoproto.TabletAliasString(ct.targetShardStreamer.tablet.Alias) for _, s := range ct.sources { + if s == nil || s.tablet == nil { + // We haven't yet chosen the source shard streamer, so skip it. + continue + } result[fmt.Sprintf("%s.%s.%s", ct.workflow, ct.uuid, s.shard)] = fmt.Sprintf("source:%s,target:%s", topoproto.TabletAliasString(s.tablet.Alias), tt) } From 94b047f423b95f02a88c7b8cb546610617bbf494 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 9 Feb 2024 15:33:02 -0500 Subject: [PATCH 07/15] Remove VDiffStreamingTablets and add unit test Signed-off-by: Matt Lord --- go/vt/vttablet/tabletmanager/vdiff/stats.go | 28 ------- .../tabletmanager/vdiff/stats_test.go | 76 +++++++++++++++++++ 2 files changed, 76 insertions(+), 28 deletions(-) create mode 100644 go/vt/vttablet/tabletmanager/vdiff/stats_test.go diff --git a/go/vt/vttablet/tabletmanager/vdiff/stats.go b/go/vt/vttablet/tabletmanager/vdiff/stats.go index 6bfcdd49ab2..4b2dbc551bd 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/stats.go +++ b/go/vt/vttablet/tabletmanager/vdiff/stats.go @@ -21,7 +21,6 @@ import ( "sync" "vitess.io/vitess/go/stats" - "vitess.io/vitess/go/vt/topo/topoproto" ) var ( @@ -106,33 +105,6 @@ func (vds *vdiffStats) register() { }, ) - stats.NewStringMapFuncWithMultiLabels( - "VDiffStreamingTablets", - "Latest tablets used on the source and target for streaming table data", - []string{"workflow", "uuid", "target_shard"}, - "tablets", - func() map[string]string { - vds.mu.Lock() - defer vds.mu.Unlock() - result := make(map[string]string, len(vds.controllers)) - for _, ct := range vds.controllers { - if ct.targetShardStreamer == nil || ct.targetShardStreamer.tablet == nil { - // We haven't yet chosen the target shard streamer, so skip it. - continue - } - tt := topoproto.TabletAliasString(ct.targetShardStreamer.tablet.Alias) - for _, s := range ct.sources { - if s == nil || s.tablet == nil { - // We haven't yet chosen the source shard streamer, so skip it. - continue - } - result[fmt.Sprintf("%s.%s.%s", ct.workflow, ct.uuid, s.shard)] = - fmt.Sprintf("source:%s,target:%s", topoproto.TabletAliasString(s.tablet.Alias), tt) - } - } - return result - }) - stats.NewCountersFuncWithMultiLabels( "VDiffErrors", "Count of specific errors seen when performing a vdiff", diff --git a/go/vt/vttablet/tabletmanager/vdiff/stats_test.go b/go/vt/vttablet/tabletmanager/vdiff/stats_test.go new file mode 100644 index 00000000000..c11897ed7ac --- /dev/null +++ b/go/vt/vttablet/tabletmanager/vdiff/stats_test.go @@ -0,0 +1,76 @@ +/* +Copyright 2024 The Vitess Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vdiff + +import ( + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/require" + + "vitess.io/vitess/go/stats" + + binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" +) + +func TestVDiffStats(t *testing.T) { + testStats := &vdiffStats{ + ErrorCount: stats.NewCounter("", ""), + RestartedTableDiffs: stats.NewCountersWithSingleLabel("", "", "Table", ""), + RowsDiffedCount: stats.NewCounter("", ""), + } + id := int64(1) + testStats.controllers = map[int64]*controller{ + id: { + id: id, + workflow: "testwf", + workflowType: binlogdatapb.VReplicationWorkflowType_MoveTables, + uuid: uuid.New().String(), + Errors: stats.NewCountersWithMultiLabels("", "", []string{"Error"}), + TableDiffRowCounts: stats.NewCountersWithMultiLabels("", "", []string{"Rows"}), + TableDiffPhaseTimings: stats.NewTimings("", "", "", "TablePhase"), + }, + } + + require.Equal(t, int64(1), testStats.numControllers()) + + sleepTime := 1 * time.Millisecond + record := func(phase string) { + defer testStats.controllers[id].TableDiffPhaseTimings.Record(phase, time.Now()) + time.Sleep(sleepTime) + } + want := int64(1.2 * float64(sleepTime)) // Allow 20% overhead for recording timing + record("initialize") + require.Greater(t, want, testStats.controllers[id].TableDiffPhaseTimings.Histograms()["initialize"].Total()) + record("selecting_tablets") + require.Greater(t, want, testStats.controllers[id].TableDiffPhaseTimings.Histograms()["selecting_tablets"].Total()) + record("diff") + require.Greater(t, want, testStats.controllers[id].TableDiffPhaseTimings.Histograms()["diff"].Total()) + + testStats.ErrorCount.Set(11) + require.Equal(t, int64(11), testStats.ErrorCount.Get()) + + testStats.controllers[id].Errors.Add([]string{"test error"}, int64(12)) + require.Equal(t, int64(12), testStats.controllers[id].Errors.Counts()["test error"]) + + testStats.RestartedTableDiffs.Add("t1", int64(5)) + require.Equal(t, int64(5), testStats.RestartedTableDiffs.Counts()["t1"]) + + testStats.RowsDiffedCount.Add(512) + require.Equal(t, int64(512), testStats.RowsDiffedCount.Get()) +} From e5c330ba7494eddbe26d96dddbb612ff9f17e35f Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 9 Feb 2024 20:42:07 -0500 Subject: [PATCH 08/15] Add some e2e tests Signed-off-by: Matt Lord --- go/test/endtoend/vreplication/vdiff2_test.go | 27 ++++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/go/test/endtoend/vreplication/vdiff2_test.go b/go/test/endtoend/vreplication/vdiff2_test.go index 7f881c1f21b..0d83d72641a 100644 --- a/go/test/endtoend/vreplication/vdiff2_test.go +++ b/go/test/endtoend/vreplication/vdiff2_test.go @@ -20,6 +20,7 @@ import ( "fmt" "math" "runtime" + "strconv" "strings" "testing" "time" @@ -31,6 +32,7 @@ import ( "google.golang.org/protobuf/encoding/protojson" "google.golang.org/protobuf/proto" + "vitess.io/vitess/go/test/endtoend/cluster" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/sqlparser" "vitess.io/vitess/go/vt/vttablet" @@ -56,6 +58,7 @@ type testCase struct { testCLICreateWait bool // test CLI create and wait until done against this workflow (only needs to be done once) testCLIFlagHandling bool // test vtctldclient flag handling from end-to-end extraVDiffFlags map[string]string + vdiffCount int64 // Keep track of the number of vdiffs created to test the stats } const ( @@ -119,6 +122,14 @@ var testCases = []*testCase{ }, } +func checkVDiffCountStat(t *testing.T, tablet *cluster.VttabletProcess, expectedCount int64) { + countStr, err := getDebugVar(t, tablet.Port, []string{"VDiffCount"}) + require.NoError(t, err, "failed to get VDiffCount stat from %s-%d tablet: %v", tablet.Cell, tablet.TabletUID, err) + count, err := strconv.Atoi(countStr) + require.NoError(t, err, "failed to convert VDiffCount stat string to int: %v", err) + require.Equal(t, expectedCount, int64(count), "expected VDiffCount stat to be %d but got %d", expectedCount, count) +} + func TestVDiff2(t *testing.T) { cellNames := "zone5,zone1,zone2,zone3,zone4" sourceKs := "product" @@ -251,9 +262,12 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace, } // Wait for the workflow to catch up again on the deletes. waitForShardsToCatchup() + tc.vdiffCount++ // We only did vtctldclient vdiff create } else { vdiff(t, tc.targetKs, tc.workflow, allCellNames, true, true, nil) + tc.vdiffCount += 2 // We did vtctlclient AND vtctldclient vdiff create } + checkVDiffCountStat(t, vc.getPrimaryTablet(t, tc.targetKs, arrTargetShards[0]), tc.vdiffCount) if tc.autoRetryError { testAutoRetryError(t, tc, allCellNames) @@ -263,26 +277,37 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace, testResume(t, tc, allCellNames) } + checkVDiffCountStat(t, vc.getPrimaryTablet(t, tc.targetKs, arrTargetShards[0]), tc.vdiffCount) + // These are done here so that we have a valid workflow to test the commands against. if tc.stop { testStop(t, ksWorkflow, allCellNames) + tc.vdiffCount++ // Does either vtctlclient OR vtctldclient vdiff create } if tc.testCLICreateWait { testCLICreateWait(t, ksWorkflow, allCellNames) + tc.vdiffCount++ // Does either vtctlclient OR vtctldclient vdiff create } if tc.testCLIErrors { testCLIErrors(t, ksWorkflow, allCellNames) } if tc.testCLIFlagHandling { testCLIFlagHandling(t, tc.targetKs, tc.workflow, cells[0]) + tc.vdiffCount++ // Does either vtctlclient OR vtctldclient vdiff create } + checkVDiffCountStat(t, vc.getPrimaryTablet(t, tc.targetKs, arrTargetShards[0]), tc.vdiffCount) + testDelete(t, ksWorkflow, allCellNames) + tc.vdiffCount = 0 // All vdiffs are deleted, so reset the count and check + checkVDiffCountStat(t, vc.getPrimaryTablet(t, tc.targetKs, arrTargetShards[0]), tc.vdiffCount) // Create another VDiff record to confirm it gets deleted when the workflow is completed. ts := time.Now() uuid, _ := performVDiff2Action(t, false, ksWorkflow, allCellNames, "create", "", false) waitForVDiff2ToComplete(t, false, ksWorkflow, allCellNames, uuid, ts) + tc.vdiffCount++ + checkVDiffCountStat(t, vc.getPrimaryTablet(t, tc.targetKs, arrTargetShards[0]), tc.vdiffCount) err = vc.VtctlClient.ExecuteCommand(tc.typ, "--", "SwitchTraffic", ksWorkflow) require.NoError(t, err) @@ -291,6 +316,8 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace, // Confirm the VDiff data is deleted for the workflow. testNoOrphanedData(t, tc.targetKs, tc.workflow, arrTargetShards) + tc.vdiffCount = 0 // All vdiffs are deleted, so reset the count and check + checkVDiffCountStat(t, vc.getPrimaryTablet(t, tc.targetKs, arrTargetShards[0]), tc.vdiffCount) } func testCLIErrors(t *testing.T, ksWorkflow, cells string) { From 1a804b6d2a38bd457838baf8cb5d696db715cea3 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 9 Feb 2024 22:13:18 -0500 Subject: [PATCH 09/15] Add more unit test and adjust codecov Signed-off-by: Matt Lord --- codecov.yml | 5 ++++- go/vt/vttablet/tabletmanager/vdiff/action_test.go | 6 ++++++ 2 files changed, 10 insertions(+), 1 deletion(-) diff --git a/codecov.yml b/codecov.yml index 61232ae9197..978fc499df1 100644 --- a/codecov.yml +++ b/codecov.yml @@ -44,7 +44,10 @@ comment: # https://docs.codecov.com/docs/pull-request-comments coverage: status: # https://docs.codecov.com/docs/commit-status + patch: + default: + informational: true # Don't ever fail the codecov/patch test project: default: - informational: true # Don't ever fail the codecov/project or codecov/patch tests + informational: true # Don't ever fail the codecov/project test diff --git a/go/vt/vttablet/tabletmanager/vdiff/action_test.go b/go/vt/vttablet/tabletmanager/vdiff/action_test.go index e1db797f8ab..825cf7a0cff 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/action_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/action_test.go @@ -213,6 +213,7 @@ func TestPerformVDiffAction(t *testing.T) { }, }, } + errCount := int64(0) for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { if tt.preFunc != nil { @@ -229,6 +230,9 @@ func TestPerformVDiffAction(t *testing.T) { vdiffenv.dbClient.ExpectRequest(queryResult.query, queryResult.result, nil) } got, err := tt.vde.PerformVDiffAction(ctx, tt.req) + if err != nil { + errCount++ + } vdiffenv.dbClient.Wait() if tt.wantErr != nil && !vterrors.Equals(err, tt.wantErr) { t.Errorf("Engine.PerformVDiffAction() error = %v, wantErr %v", err, tt.wantErr) @@ -244,6 +248,8 @@ func TestPerformVDiffAction(t *testing.T) { // No VDiffs should be running anymore. require.Equal(t, 0, len(vdiffenv.vde.controllers), "expected no controllers to be running, but found %d", len(vdiffenv.vde.controllers)) + require.Equal(t, int64(0), globalStats.numControllers(), "expected no controllers to be running, but found %d") }) + require.Equal(t, errCount, globalStats.ErrorCount.Get()) } } From bf78beec130d89d30eda003ad4a8a6b3f04cebe1 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Fri, 9 Feb 2024 22:33:22 -0500 Subject: [PATCH 10/15] Changes/improvements after self review Signed-off-by: Matt Lord --- go/test/endtoend/vreplication/vdiff2_test.go | 21 +++++++++++++++++--- go/vt/vttablet/tabletmanager/vdiff/engine.go | 2 +- 2 files changed, 19 insertions(+), 4 deletions(-) diff --git a/go/test/endtoend/vreplication/vdiff2_test.go b/go/test/endtoend/vreplication/vdiff2_test.go index 0d83d72641a..b2d011a1125 100644 --- a/go/test/endtoend/vreplication/vdiff2_test.go +++ b/go/test/endtoend/vreplication/vdiff2_test.go @@ -183,6 +183,21 @@ func TestVDiff2(t *testing.T) { testWorkflow(t, vc, tc, tks, []*Cell{zone3, zone2, zone1}) }) } + + statsTablet := vc.getPrimaryTablet(t, targetKs, targetShards[0]) + + // We diffed X rows so confirm that the global total is > 0. + countStr, err := getDebugVar(t, statsTablet.Port, []string{"VDiffRowsComparedTotal"}) + require.NoError(t, err, "failed to get VDiffRowsComparedTotal stat from %s-%d tablet: %v", statsTablet.Cell, statsTablet.TabletUID, err) + count, err := strconv.Atoi(countStr) + require.NoError(t, err, "failed to convert VDiffRowsComparedTotal stat string to int: %v", err) + require.Greater(t, count, 0, "expected VDiffRowsComparedTotal stat to be greater than 0 but got %d", count) + + // The VDiffs should all be cleaned up so the VDiffRowsCompared value, which + // is produced from controller info, should be empty. + vdrc, err := getDebugVar(t, statsTablet.Port, []string{"VDiffRowsCompared"}) + require.NoError(t, err, "failed to get VDiffRowsCompared stat from %s-%d tablet: %v", statsTablet.Cell, statsTablet.TabletUID, err) + require.Equal(t, "{}", vdrc, "expected VDiffRowsCompared stat to be empty but got %s", vdrc) } func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace, cells []*Cell) { @@ -282,18 +297,18 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace, // These are done here so that we have a valid workflow to test the commands against. if tc.stop { testStop(t, ksWorkflow, allCellNames) - tc.vdiffCount++ // Does either vtctlclient OR vtctldclient vdiff create + tc.vdiffCount++ // We did either vtctlclient OR vtctldclient vdiff create } if tc.testCLICreateWait { testCLICreateWait(t, ksWorkflow, allCellNames) - tc.vdiffCount++ // Does either vtctlclient OR vtctldclient vdiff create + tc.vdiffCount++ // We did either vtctlclient OR vtctldclient vdiff create } if tc.testCLIErrors { testCLIErrors(t, ksWorkflow, allCellNames) } if tc.testCLIFlagHandling { testCLIFlagHandling(t, tc.targetKs, tc.workflow, cells[0]) - tc.vdiffCount++ // Does either vtctlclient OR vtctldclient vdiff create + tc.vdiffCount++ // We did either vtctlclient OR vtctldclient vdiff create } checkVDiffCountStat(t, vc.getPrimaryTablet(t, tc.targetKs, arrTargetShards[0]), tc.vdiffCount) diff --git a/go/vt/vttablet/tabletmanager/vdiff/engine.go b/go/vt/vttablet/tabletmanager/vdiff/engine.go index 7db8cc6aa9a..b2285a070fa 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/engine.go +++ b/go/vt/vttablet/tabletmanager/vdiff/engine.go @@ -398,7 +398,7 @@ func (vde *Engine) resetControllers() { vde.updateStats() } -// UpdateStats must be called while holding the engine lock. +// updateStats must only be called while holding the engine lock. func (vre *Engine) updateStats() { globalStats.mu.Lock() defer globalStats.mu.Unlock() From a032dc71eb627a7feb6c5a7113543f52fde0c956 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Sat, 10 Feb 2024 11:50:53 -0500 Subject: [PATCH 11/15] Add more testing Signed-off-by: Matt Lord --- go/test/endtoend/vreplication/vdiff2_test.go | 25 +++++++++++++------ .../tabletmanager/vdiff/action_test.go | 4 +-- .../tabletmanager/vdiff/table_differ.go | 2 -- 3 files changed, 20 insertions(+), 11 deletions(-) diff --git a/go/test/endtoend/vreplication/vdiff2_test.go b/go/test/endtoend/vreplication/vdiff2_test.go index b2d011a1125..08f5bb8926d 100644 --- a/go/test/endtoend/vreplication/vdiff2_test.go +++ b/go/test/endtoend/vreplication/vdiff2_test.go @@ -209,6 +209,8 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace, } ksWorkflow := fmt.Sprintf("%s.%s", tc.targetKs, tc.workflow) + statsShard := arrTargetShards[0] + statsTablet := vc.getPrimaryTablet(t, tc.targetKs, statsShard) var args []string args = append(args, tc.typ, "--") args = append(args, "--source", tc.sourceKs) @@ -282,7 +284,16 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace, vdiff(t, tc.targetKs, tc.workflow, allCellNames, true, true, nil) tc.vdiffCount += 2 // We did vtctlclient AND vtctldclient vdiff create } - checkVDiffCountStat(t, vc.getPrimaryTablet(t, tc.targetKs, arrTargetShards[0]), tc.vdiffCount) + checkVDiffCountStat(t, statsTablet, tc.vdiffCount) + + // Confirm that the VDiffRowsCompared stat -- which is a running count of the rows + // compared by vdiff per table at the controller level -- works as expected. + vdrc, err := getDebugVar(t, statsTablet.Port, []string{"VDiffRowsCompared"}) + require.NoError(t, err, "failed to get VDiffRowsCompared stat from %s-%d tablet: %v", statsTablet.Cell, statsTablet.TabletUID, err) + uuid, jsout := performVDiff2Action(t, false, ksWorkflow, allCellNames, "show", "last", false, "--verbose") + expect := gjson.Get(jsout, fmt.Sprintf("Reports.customer.%s", statsShard)).Int() + got := gjson.Get(vdrc, fmt.Sprintf("%s.%s.%s", tc.workflow, uuid, "customer")).Int() + require.Equal(t, expect, got, "expected VDiffRowsCompared stat to be %d, but got %d", expect, got) if tc.autoRetryError { testAutoRetryError(t, tc, allCellNames) @@ -292,7 +303,7 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace, testResume(t, tc, allCellNames) } - checkVDiffCountStat(t, vc.getPrimaryTablet(t, tc.targetKs, arrTargetShards[0]), tc.vdiffCount) + checkVDiffCountStat(t, statsTablet, tc.vdiffCount) // These are done here so that we have a valid workflow to test the commands against. if tc.stop { @@ -311,18 +322,18 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace, tc.vdiffCount++ // We did either vtctlclient OR vtctldclient vdiff create } - checkVDiffCountStat(t, vc.getPrimaryTablet(t, tc.targetKs, arrTargetShards[0]), tc.vdiffCount) + checkVDiffCountStat(t, statsTablet, tc.vdiffCount) testDelete(t, ksWorkflow, allCellNames) tc.vdiffCount = 0 // All vdiffs are deleted, so reset the count and check - checkVDiffCountStat(t, vc.getPrimaryTablet(t, tc.targetKs, arrTargetShards[0]), tc.vdiffCount) + checkVDiffCountStat(t, statsTablet, tc.vdiffCount) // Create another VDiff record to confirm it gets deleted when the workflow is completed. ts := time.Now() - uuid, _ := performVDiff2Action(t, false, ksWorkflow, allCellNames, "create", "", false) + uuid, _ = performVDiff2Action(t, false, ksWorkflow, allCellNames, "create", "", false) waitForVDiff2ToComplete(t, false, ksWorkflow, allCellNames, uuid, ts) tc.vdiffCount++ - checkVDiffCountStat(t, vc.getPrimaryTablet(t, tc.targetKs, arrTargetShards[0]), tc.vdiffCount) + checkVDiffCountStat(t, statsTablet, tc.vdiffCount) err = vc.VtctlClient.ExecuteCommand(tc.typ, "--", "SwitchTraffic", ksWorkflow) require.NoError(t, err) @@ -332,7 +343,7 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace, // Confirm the VDiff data is deleted for the workflow. testNoOrphanedData(t, tc.targetKs, tc.workflow, arrTargetShards) tc.vdiffCount = 0 // All vdiffs are deleted, so reset the count and check - checkVDiffCountStat(t, vc.getPrimaryTablet(t, tc.targetKs, arrTargetShards[0]), tc.vdiffCount) + checkVDiffCountStat(t, statsTablet, tc.vdiffCount) } func testCLIErrors(t *testing.T, ksWorkflow, cells string) { diff --git a/go/vt/vttablet/tabletmanager/vdiff/action_test.go b/go/vt/vttablet/tabletmanager/vdiff/action_test.go index 825cf7a0cff..4676238cf69 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/action_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/action_test.go @@ -248,8 +248,8 @@ func TestPerformVDiffAction(t *testing.T) { // No VDiffs should be running anymore. require.Equal(t, 0, len(vdiffenv.vde.controllers), "expected no controllers to be running, but found %d", len(vdiffenv.vde.controllers)) - require.Equal(t, int64(0), globalStats.numControllers(), "expected no controllers to be running, but found %d") + require.Equal(t, int64(0), globalStats.numControllers(), "expected no controllers, but found %d") }) - require.Equal(t, errCount, globalStats.ErrorCount.Get()) + require.Equal(t, errCount, globalStats.ErrorCount.Get(), "expected error count %d, got %d", errCount, globalStats.ErrorCount.Get()) } } diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index 88a4e24e80f..1549fbd3faf 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -750,8 +750,6 @@ func (td *tableDiffer) updateTableProgress(dbClient binlogplayer.DBClient, dr *D if _, err := dbClient.ExecuteFetch(query, 1); err != nil { return err } - // We have to reset first as the value is not incremental. - td.wd.ct.TableDiffRowCounts.Reset([]string{td.table.Name}) td.wd.ct.TableDiffRowCounts.Add([]string{td.table.Name}, dr.ProcessedRows) return nil } From d5b90d7b78c0bb585e445a202ca1eb4f032700eb Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Mon, 12 Feb 2024 12:54:57 -0500 Subject: [PATCH 12/15] Minor description tweaks after working on the docs Signed-off-by: Matt Lord --- go/vt/vttablet/tabletmanager/vdiff/stats.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vdiff/stats.go b/go/vt/vttablet/tabletmanager/vdiff/stats.go index 4b2dbc551bd..1febd5a8997 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/stats.go +++ b/go/vt/vttablet/tabletmanager/vdiff/stats.go @@ -63,7 +63,7 @@ func (vds *vdiffStats) register() { stats.NewGaugesFuncWithMultiLabels( "VDiffRestartedTableDiffsCount", - "Table diffs restarted due to max-diff-duration counts per table", + "Table diffs restarted due to --max-diff-duration counts by table", []string{"table_name"}, func() map[string]int64 { vds.mu.Lock() @@ -107,7 +107,7 @@ func (vds *vdiffStats) register() { stats.NewCountersFuncWithMultiLabels( "VDiffErrors", - "Count of specific errors seen when performing a vdiff", + "Count of specific errors seen during the lifetime of a vdiff", []string{"workflow", "uuid", "error"}, func() map[string]int64 { vds.mu.Lock() From 3cf8b33287bc5e8ec3c24a5c2069448141e677b7 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 13 Feb 2024 14:46:20 -0500 Subject: [PATCH 13/15] Nitting myself to death while waiting for reviews Signed-off-by: Matt Lord --- go/vt/vttablet/tabletmanager/vdiff/stats.go | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vdiff/stats.go b/go/vt/vttablet/tabletmanager/vdiff/stats.go index 1febd5a8997..b68e1f86556 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/stats.go +++ b/go/vt/vttablet/tabletmanager/vdiff/stats.go @@ -59,7 +59,8 @@ func (vds *vdiffStats) register() { vds.mu.Lock() defer vds.mu.Unlock() return globalStats.ErrorCount.Get() - }) + }, + ) stats.NewGaugesFuncWithMultiLabels( "VDiffRestartedTableDiffsCount", @@ -86,7 +87,8 @@ func (vds *vdiffStats) register() { vds.mu.Lock() defer vds.mu.Unlock() return globalStats.RowsDiffedCount.Get() - }) + }, + ) stats.NewGaugesFuncWithMultiLabels( "VDiffRowsCompared", @@ -119,7 +121,8 @@ func (vds *vdiffStats) register() { } } return result - }) + }, + ) stats.NewGaugesFuncWithMultiLabels( "VDiffPhaseTimings", @@ -135,7 +138,8 @@ func (vds *vdiffStats) register() { } } return result - }) + }, + ) } func (vds *vdiffStats) numControllers() int64 { From 0487b00af28118c42a9d63bcc314096ffb3c563a Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Tue, 13 Feb 2024 17:34:01 -0500 Subject: [PATCH 14/15] All Recordings should be done in a defer Because we're not deferring a new function, the parameters passed to Record are calculated when we register the defer and thus Record is able to compare the current time to the time we passed in when registering the defer (time.Now()). Signed-off-by: Matt Lord --- go/vt/vttablet/tabletmanager/vdiff/table_differ.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index 1549fbd3faf..c22df478fa0 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -213,7 +213,7 @@ func (td *tableDiffer) forEachSource(cb func(source *migrationSource) error) err } func (td *tableDiffer) selectTablets(ctx context.Context) error { - td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.picking_streaming_tablets", td.table.Name), time.Now()) + defer td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.picking_streaming_tablets", td.table.Name), time.Now()) var ( wg sync.WaitGroup sourceErr, targetErr error @@ -289,7 +289,7 @@ func (td *tableDiffer) pickTablet(ctx context.Context, ts *topo.Server, cells [] } func (td *tableDiffer) syncSourceStreams(ctx context.Context) error { - td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.syncing_source_streams", td.table.Name), time.Now()) + defer td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.syncing_source_streams", td.table.Name), time.Now()) // source can be replica, wait for them to at least reach max gtid of all target streams ct := td.wd.ct waitCtx, cancel := context.WithTimeout(ctx, time.Duration(ct.options.CoreOptions.TimeoutSeconds*int64(time.Second))) @@ -308,7 +308,7 @@ func (td *tableDiffer) syncSourceStreams(ctx context.Context) error { } func (td *tableDiffer) syncTargetStreams(ctx context.Context) error { - td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.syncing_target_streams", td.table.Name), time.Now()) + defer td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.syncing_target_streams", td.table.Name), time.Now()) ct := td.wd.ct waitCtx, cancel := context.WithTimeout(ctx, time.Duration(ct.options.CoreOptions.TimeoutSeconds*int64(time.Second))) defer cancel() @@ -331,7 +331,7 @@ func (td *tableDiffer) syncTargetStreams(ctx context.Context) error { } func (td *tableDiffer) startTargetDataStream(ctx context.Context) error { - td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.starting_target_data_streams", td.table.Name), time.Now()) + defer td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.starting_target_data_streams", td.table.Name), time.Now()) ct := td.wd.ct gtidch := make(chan string, 1) ct.targetShardStreamer.result = make(chan *sqltypes.Result, 1) @@ -346,7 +346,7 @@ func (td *tableDiffer) startTargetDataStream(ctx context.Context) error { } func (td *tableDiffer) startSourceDataStreams(ctx context.Context) error { - td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.starting_source_data_streams", td.table.Name), time.Now()) + defer td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.starting_source_data_streams", td.table.Name), time.Now()) if err := td.forEachSource(func(source *migrationSource) error { gtidch := make(chan string, 1) source.result = make(chan *sqltypes.Result, 1) @@ -365,7 +365,7 @@ func (td *tableDiffer) startSourceDataStreams(ctx context.Context) error { } func (td *tableDiffer) restartTargetVReplicationStreams(ctx context.Context) error { - td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.restarting_vreplication_streams", td.table.Name), time.Now()) + defer td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.restarting_vreplication_streams", td.table.Name), time.Now()) ct := td.wd.ct query := fmt.Sprintf("update _vt.vreplication set state='Running', message='', stop_pos='' where db_name=%s and workflow=%s", encodeString(ct.vde.dbName), encodeString(ct.workflow)) @@ -474,7 +474,7 @@ func (td *tableDiffer) setupRowSorters() { } func (td *tableDiffer) diff(ctx context.Context, rowsToCompare int64, debug, onlyPks bool, maxExtraRowsToCompare int64, maxReportSampleRows int64, stop <-chan time.Time) (*DiffReport, error) { - td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.diffing_table", td.table.Name), time.Now()) + defer td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.diffing_table", td.table.Name), time.Now()) dbClient := td.wd.ct.dbClientFactory() if err := dbClient.Connect(); err != nil { return nil, err From 14d3f73a97b85dfd1b7b7b1dd8388fc68352bad9 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Wed, 14 Feb 2024 11:49:00 -0500 Subject: [PATCH 15/15] Address review comment Signed-off-by: Matt Lord --- .../tabletmanager/vdiff/stats_test.go | 12 ++++---- .../tabletmanager/vdiff/table_differ.go | 29 ++++++++++++++----- 2 files changed, 27 insertions(+), 14 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vdiff/stats_test.go b/go/vt/vttablet/tabletmanager/vdiff/stats_test.go index c11897ed7ac..8b1a174466f 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/stats_test.go +++ b/go/vt/vttablet/tabletmanager/vdiff/stats_test.go @@ -55,12 +55,12 @@ func TestVDiffStats(t *testing.T) { time.Sleep(sleepTime) } want := int64(1.2 * float64(sleepTime)) // Allow 20% overhead for recording timing - record("initialize") - require.Greater(t, want, testStats.controllers[id].TableDiffPhaseTimings.Histograms()["initialize"].Total()) - record("selecting_tablets") - require.Greater(t, want, testStats.controllers[id].TableDiffPhaseTimings.Histograms()["selecting_tablets"].Total()) - record("diff") - require.Greater(t, want, testStats.controllers[id].TableDiffPhaseTimings.Histograms()["diff"].Total()) + record(string(initializing)) + require.Greater(t, want, testStats.controllers[id].TableDiffPhaseTimings.Histograms()[string(initializing)].Total()) + record(string(pickingTablets)) + require.Greater(t, want, testStats.controllers[id].TableDiffPhaseTimings.Histograms()[string(pickingTablets)].Total()) + record(string(diffingTable)) + require.Greater(t, want, testStats.controllers[id].TableDiffPhaseTimings.Histograms()[string(diffingTable)].Total()) testStats.ErrorCount.Set(11) require.Equal(t, int64(11), testStats.ErrorCount.Get()) diff --git a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go index c22df478fa0..7ad83a3ad4b 100644 --- a/go/vt/vttablet/tabletmanager/vdiff/table_differ.go +++ b/go/vt/vttablet/tabletmanager/vdiff/table_differ.go @@ -49,6 +49,19 @@ import ( vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" ) +type tableDiffPhase string + +const ( + initializing = tableDiffPhase("initializing") + pickingTablets = tableDiffPhase("picking_streaming_tablets") + syncingSources = tableDiffPhase("syncing_source_streams") + syncingTargets = tableDiffPhase("syncing_target_streams") + startingSources = tableDiffPhase("starting_source_data_streams") + startingTargets = tableDiffPhase("starting_target_data_streams") + restartingVreplication = tableDiffPhase("restarting_vreplication_streams") + diffingTable = tableDiffPhase("diffing_table") +) + // how long to wait for background operations to complete var BackgroundOperationTimeout = topo.RemoteOperationTimeout * 4 @@ -90,7 +103,7 @@ func newTableDiffer(wd *workflowDiffer, table *tabletmanagerdatapb.TableDefiniti // initialize func (td *tableDiffer) initialize(ctx context.Context) error { - defer td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.initializing", td.table.Name), time.Now()) + defer td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.%s", td.table.Name, initializing), time.Now()) vdiffEngine := td.wd.ct.vde vdiffEngine.snapshotMu.Lock() defer vdiffEngine.snapshotMu.Unlock() @@ -213,7 +226,7 @@ func (td *tableDiffer) forEachSource(cb func(source *migrationSource) error) err } func (td *tableDiffer) selectTablets(ctx context.Context) error { - defer td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.picking_streaming_tablets", td.table.Name), time.Now()) + defer td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.%s", td.table.Name, pickingTablets), time.Now()) var ( wg sync.WaitGroup sourceErr, targetErr error @@ -289,7 +302,7 @@ func (td *tableDiffer) pickTablet(ctx context.Context, ts *topo.Server, cells [] } func (td *tableDiffer) syncSourceStreams(ctx context.Context) error { - defer td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.syncing_source_streams", td.table.Name), time.Now()) + defer td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.%s", td.table.Name, syncingSources), time.Now()) // source can be replica, wait for them to at least reach max gtid of all target streams ct := td.wd.ct waitCtx, cancel := context.WithTimeout(ctx, time.Duration(ct.options.CoreOptions.TimeoutSeconds*int64(time.Second))) @@ -308,7 +321,7 @@ func (td *tableDiffer) syncSourceStreams(ctx context.Context) error { } func (td *tableDiffer) syncTargetStreams(ctx context.Context) error { - defer td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.syncing_target_streams", td.table.Name), time.Now()) + defer td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.%s", td.table.Name, syncingTargets), time.Now()) ct := td.wd.ct waitCtx, cancel := context.WithTimeout(ctx, time.Duration(ct.options.CoreOptions.TimeoutSeconds*int64(time.Second))) defer cancel() @@ -331,7 +344,7 @@ func (td *tableDiffer) syncTargetStreams(ctx context.Context) error { } func (td *tableDiffer) startTargetDataStream(ctx context.Context) error { - defer td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.starting_target_data_streams", td.table.Name), time.Now()) + defer td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.%s", td.table.Name, startingTargets), time.Now()) ct := td.wd.ct gtidch := make(chan string, 1) ct.targetShardStreamer.result = make(chan *sqltypes.Result, 1) @@ -346,7 +359,7 @@ func (td *tableDiffer) startTargetDataStream(ctx context.Context) error { } func (td *tableDiffer) startSourceDataStreams(ctx context.Context) error { - defer td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.starting_source_data_streams", td.table.Name), time.Now()) + defer td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.%s", td.table.Name, startingSources), time.Now()) if err := td.forEachSource(func(source *migrationSource) error { gtidch := make(chan string, 1) source.result = make(chan *sqltypes.Result, 1) @@ -365,7 +378,7 @@ func (td *tableDiffer) startSourceDataStreams(ctx context.Context) error { } func (td *tableDiffer) restartTargetVReplicationStreams(ctx context.Context) error { - defer td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.restarting_vreplication_streams", td.table.Name), time.Now()) + defer td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.%s", td.table.Name, restartingVreplication), time.Now()) ct := td.wd.ct query := fmt.Sprintf("update _vt.vreplication set state='Running', message='', stop_pos='' where db_name=%s and workflow=%s", encodeString(ct.vde.dbName), encodeString(ct.workflow)) @@ -474,7 +487,7 @@ func (td *tableDiffer) setupRowSorters() { } func (td *tableDiffer) diff(ctx context.Context, rowsToCompare int64, debug, onlyPks bool, maxExtraRowsToCompare int64, maxReportSampleRows int64, stop <-chan time.Time) (*DiffReport, error) { - defer td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.diffing_table", td.table.Name), time.Now()) + defer td.wd.ct.TableDiffPhaseTimings.Record(fmt.Sprintf("%s.%s", td.table.Name, diffingTable), time.Now()) dbClient := td.wd.ct.dbClientFactory() if err := dbClient.Connect(); err != nil { return nil, err