Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VDiff: Add some stats #15175

Merged
merged 16 commits into from
Feb 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ comment: # https://docs.codecov.com/docs/pull-request-comments

coverage:
status: # https://docs.codecov.com/docs/commit-status
patch:
default:
informational: true # Don't ever fail the codecov/patch test
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant to do this in: #14967

project:
default:
informational: true # Don't ever fail the codecov/project or codecov/patch tests
informational: true # Don't ever fail the codecov/project test

55 changes: 54 additions & 1 deletion go/test/endtoend/vreplication/vdiff2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"math"
"runtime"
"strconv"
"strings"
"testing"
"time"
Expand All @@ -31,6 +32,7 @@ import (
"google.golang.org/protobuf/encoding/protojson"
"google.golang.org/protobuf/proto"

"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet"
Expand All @@ -56,6 +58,7 @@ type testCase struct {
testCLICreateWait bool // test CLI create and wait until done against this workflow (only needs to be done once)
testCLIFlagHandling bool // test vtctldclient flag handling from end-to-end
extraVDiffFlags map[string]string
vdiffCount int64 // Keep track of the number of vdiffs created to test the stats
}

const (
Expand Down Expand Up @@ -119,6 +122,14 @@ var testCases = []*testCase{
},
}

func checkVDiffCountStat(t *testing.T, tablet *cluster.VttabletProcess, expectedCount int64) {
countStr, err := getDebugVar(t, tablet.Port, []string{"VDiffCount"})
require.NoError(t, err, "failed to get VDiffCount stat from %s-%d tablet: %v", tablet.Cell, tablet.TabletUID, err)
count, err := strconv.Atoi(countStr)
require.NoError(t, err, "failed to convert VDiffCount stat string to int: %v", err)
require.Equal(t, expectedCount, int64(count), "expected VDiffCount stat to be %d but got %d", expectedCount, count)
}

func TestVDiff2(t *testing.T) {
cellNames := "zone5,zone1,zone2,zone3,zone4"
sourceKs := "product"
Expand Down Expand Up @@ -172,6 +183,21 @@ func TestVDiff2(t *testing.T) {
testWorkflow(t, vc, tc, tks, []*Cell{zone3, zone2, zone1})
})
}

statsTablet := vc.getPrimaryTablet(t, targetKs, targetShards[0])

// We diffed X rows so confirm that the global total is > 0.
countStr, err := getDebugVar(t, statsTablet.Port, []string{"VDiffRowsComparedTotal"})
require.NoError(t, err, "failed to get VDiffRowsComparedTotal stat from %s-%d tablet: %v", statsTablet.Cell, statsTablet.TabletUID, err)
count, err := strconv.Atoi(countStr)
require.NoError(t, err, "failed to convert VDiffRowsComparedTotal stat string to int: %v", err)
require.Greater(t, count, 0, "expected VDiffRowsComparedTotal stat to be greater than 0 but got %d", count)

// The VDiffs should all be cleaned up so the VDiffRowsCompared value, which
// is produced from controller info, should be empty.
vdrc, err := getDebugVar(t, statsTablet.Port, []string{"VDiffRowsCompared"})
require.NoError(t, err, "failed to get VDiffRowsCompared stat from %s-%d tablet: %v", statsTablet.Cell, statsTablet.TabletUID, err)
require.Equal(t, "{}", vdrc, "expected VDiffRowsCompared stat to be empty but got %s", vdrc)
}

func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace, cells []*Cell) {
Expand All @@ -183,6 +209,8 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace,

}
ksWorkflow := fmt.Sprintf("%s.%s", tc.targetKs, tc.workflow)
statsShard := arrTargetShards[0]
statsTablet := vc.getPrimaryTablet(t, tc.targetKs, statsShard)
var args []string
args = append(args, tc.typ, "--")
args = append(args, "--source", tc.sourceKs)
Expand Down Expand Up @@ -251,9 +279,21 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace,
}
// Wait for the workflow to catch up again on the deletes.
waitForShardsToCatchup()
tc.vdiffCount++ // We only did vtctldclient vdiff create
} else {
vdiff(t, tc.targetKs, tc.workflow, allCellNames, true, true, nil)
tc.vdiffCount += 2 // We did vtctlclient AND vtctldclient vdiff create
}
checkVDiffCountStat(t, statsTablet, tc.vdiffCount)

// Confirm that the VDiffRowsCompared stat -- which is a running count of the rows
// compared by vdiff per table at the controller level -- works as expected.
vdrc, err := getDebugVar(t, statsTablet.Port, []string{"VDiffRowsCompared"})
require.NoError(t, err, "failed to get VDiffRowsCompared stat from %s-%d tablet: %v", statsTablet.Cell, statsTablet.TabletUID, err)
uuid, jsout := performVDiff2Action(t, false, ksWorkflow, allCellNames, "show", "last", false, "--verbose")
expect := gjson.Get(jsout, fmt.Sprintf("Reports.customer.%s", statsShard)).Int()
got := gjson.Get(vdrc, fmt.Sprintf("%s.%s.%s", tc.workflow, uuid, "customer")).Int()
require.Equal(t, expect, got, "expected VDiffRowsCompared stat to be %d, but got %d", expect, got)

if tc.autoRetryError {
testAutoRetryError(t, tc, allCellNames)
Expand All @@ -263,26 +303,37 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace,
testResume(t, tc, allCellNames)
}

checkVDiffCountStat(t, statsTablet, tc.vdiffCount)

// These are done here so that we have a valid workflow to test the commands against.
if tc.stop {
testStop(t, ksWorkflow, allCellNames)
tc.vdiffCount++ // We did either vtctlclient OR vtctldclient vdiff create
}
if tc.testCLICreateWait {
testCLICreateWait(t, ksWorkflow, allCellNames)
tc.vdiffCount++ // We did either vtctlclient OR vtctldclient vdiff create
}
if tc.testCLIErrors {
testCLIErrors(t, ksWorkflow, allCellNames)
}
if tc.testCLIFlagHandling {
testCLIFlagHandling(t, tc.targetKs, tc.workflow, cells[0])
tc.vdiffCount++ // We did either vtctlclient OR vtctldclient vdiff create
}

checkVDiffCountStat(t, statsTablet, tc.vdiffCount)

testDelete(t, ksWorkflow, allCellNames)
tc.vdiffCount = 0 // All vdiffs are deleted, so reset the count and check
checkVDiffCountStat(t, statsTablet, tc.vdiffCount)

// Create another VDiff record to confirm it gets deleted when the workflow is completed.
ts := time.Now()
uuid, _ := performVDiff2Action(t, false, ksWorkflow, allCellNames, "create", "", false)
uuid, _ = performVDiff2Action(t, false, ksWorkflow, allCellNames, "create", "", false)
waitForVDiff2ToComplete(t, false, ksWorkflow, allCellNames, uuid, ts)
tc.vdiffCount++
checkVDiffCountStat(t, statsTablet, tc.vdiffCount)

err = vc.VtctlClient.ExecuteCommand(tc.typ, "--", "SwitchTraffic", ksWorkflow)
require.NoError(t, err)
Expand All @@ -291,6 +342,8 @@ func testWorkflow(t *testing.T, vc *VitessCluster, tc *testCase, tks *Keyspace,

// Confirm the VDiff data is deleted for the workflow.
testNoOrphanedData(t, tc.targetKs, tc.workflow, arrTargetShards)
tc.vdiffCount = 0 // All vdiffs are deleted, so reset the count and check
checkVDiffCountStat(t, statsTablet, tc.vdiffCount)
}

func testCLIErrors(t *testing.T, ksWorkflow, cells string) {
Expand Down
15 changes: 13 additions & 2 deletions go/vt/vttablet/tabletmanager/vdiff/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,23 @@
}
)

func (vde *Engine) PerformVDiffAction(ctx context.Context, req *tabletmanagerdatapb.VDiffRequest) (*tabletmanagerdatapb.VDiffResponse, error) {
func (vde *Engine) PerformVDiffAction(ctx context.Context, req *tabletmanagerdatapb.VDiffRequest) (resp *tabletmanagerdatapb.VDiffResponse, err error) {
defer func() {
if err != nil {
globalStats.ErrorCount.Add(1)
}
}()
if req == nil {
return nil, vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, "nil vdiff request")

Check warning on line 73 in go/vt/vttablet/tabletmanager/vdiff/action.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletmanager/vdiff/action.go#L73

Added line #L73 was not covered by tests
}
if !vde.isOpen {
return nil, vterrors.New(vtrpcpb.Code_UNAVAILABLE, "vdiff engine is closed")
}
if vde.cancelRetry != nil {
return nil, vterrors.New(vtrpcpb.Code_UNAVAILABLE, "vdiff engine is still trying to open")
}

resp := &tabletmanagerdatapb.VDiffResponse{
resp = &tabletmanagerdatapb.VDiffResponse{

Check warning on line 82 in go/vt/vttablet/tabletmanager/vdiff/action.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletmanager/vdiff/action.go#L82

Added line #L82 was not covered by tests
Id: 0,
Output: nil,
}
Expand Down Expand Up @@ -368,6 +376,9 @@
}
controller.Stop()
delete(vde.controllers, controller.id)
globalStats.mu.Lock()
defer globalStats.mu.Unlock()
delete(globalStats.controllers, controller.id)

Check warning on line 381 in go/vt/vttablet/tabletmanager/vdiff/action.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletmanager/vdiff/action.go#L379-L381

Added lines #L379 - L381 were not covered by tests
}

switch req.ActionArg {
Expand Down
11 changes: 11 additions & 0 deletions go/vt/vttablet/tabletmanager/vdiff/action_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,13 @@ func TestPerformVDiffAction(t *testing.T) {
expectQueries []queryAndResult
wantErr error
}{
{
name: "nil request",
wantErr: vterrors.New(vtrpcpb.Code_INVALID_ARGUMENT, "nil vdiff request"),
},
{
name: "engine not open",
req: &tabletmanagerdatapb.VDiffRequest{},
vde: &Engine{isOpen: false},
wantErr: vterrors.New(vtrpcpb.Code_UNAVAILABLE, "vdiff engine is closed"),
},
Expand Down Expand Up @@ -208,6 +213,7 @@ func TestPerformVDiffAction(t *testing.T) {
},
},
}
errCount := int64(0)
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if tt.preFunc != nil {
Expand All @@ -224,6 +230,9 @@ func TestPerformVDiffAction(t *testing.T) {
vdiffenv.dbClient.ExpectRequest(queryResult.query, queryResult.result, nil)
}
got, err := tt.vde.PerformVDiffAction(ctx, tt.req)
if err != nil {
errCount++
}
vdiffenv.dbClient.Wait()
if tt.wantErr != nil && !vterrors.Equals(err, tt.wantErr) {
t.Errorf("Engine.PerformVDiffAction() error = %v, wantErr %v", err, tt.wantErr)
Expand All @@ -239,6 +248,8 @@ func TestPerformVDiffAction(t *testing.T) {
// No VDiffs should be running anymore.
require.Equal(t, 0, len(vdiffenv.vde.controllers), "expected no controllers to be running, but found %d",
len(vdiffenv.vde.controllers))
require.Equal(t, int64(0), globalStats.numControllers(), "expected no controllers, but found %d")
})
require.Equal(t, errCount, globalStats.ErrorCount.Get(), "expected error count %d, got %d", errCount, globalStats.ErrorCount.Get())
}
}
29 changes: 19 additions & 10 deletions go/vt/vttablet/tabletmanager/vdiff/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

"vitess.io/vitess/go/mysql/replication"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/tabletmanagerdata"
Expand Down Expand Up @@ -75,6 +76,11 @@
sourceTimeZone, targetTimeZone string // Named time zones if conversions are necessary for datetime values

externalCluster string // For Mount+Migrate

// Information used in vdiff stats/metrics.
Errors *stats.CountersWithMultiLabels
TableDiffRowCounts *stats.CountersWithMultiLabels
TableDiffPhaseTimings *stats.Timings
}

func newController(ctx context.Context, row sqltypes.RowNamedValues, dbClientFactory func() binlogplayer.DBClient,
Expand All @@ -84,16 +90,19 @@
id, _ := row["id"].ToInt64()

ct := &controller{
id: id,
uuid: row["vdiff_uuid"].ToString(),
workflow: row["workflow"].ToString(),
dbClientFactory: dbClientFactory,
ts: ts,
vde: vde,
done: make(chan struct{}),
tmc: vde.tmClientFactory(),
sources: make(map[string]*migrationSource),
options: options,
id: id,
uuid: row["vdiff_uuid"].ToString(),
workflow: row["workflow"].ToString(),
dbClientFactory: dbClientFactory,
ts: ts,
vde: vde,
done: make(chan struct{}),
tmc: vde.tmClientFactory(),
sources: make(map[string]*migrationSource),
options: options,
Errors: stats.NewCountersWithMultiLabels("", "", []string{"Error"}),
TableDiffRowCounts: stats.NewCountersWithMultiLabels("", "", []string{"Rows"}),
TableDiffPhaseTimings: stats.NewTimings("", "", "", "TablePhase"),

Check warning on line 105 in go/vt/vttablet/tabletmanager/vdiff/controller.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletmanager/vdiff/controller.go#L93-L105

Added lines #L93 - L105 were not covered by tests
}
ctx, ct.cancel = context.WithCancel(ctx)
go ct.run(ctx)
Expand Down
27 changes: 21 additions & 6 deletions go/vt/vttablet/tabletmanager/vdiff/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,16 @@

"vitess.io/vitess/go/mysql/collations"
"vitess.io/vitess/go/mysql/sqlerror"
"vitess.io/vitess/go/vt/proto/tabletmanagerdata"
"vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
"vitess.io/vitess/go/vt/vttablet/tmclient"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/dbconfigs"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/tabletmanagerdata"
"vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/topo"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
"vitess.io/vitess/go/vt/vttablet/tmclient"
)

type Engine struct {
Expand Down Expand Up @@ -159,6 +158,7 @@
if err := vde.initControllers(rows); err != nil {
return err
}
vde.updateStats()

Check warning on line 161 in go/vt/vttablet/tabletmanager/vdiff/engine.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletmanager/vdiff/engine.go#L161

Added line #L161 was not covered by tests

// At this point we've fully and successfully opened so begin
// retrying error'd VDiffs until the engine is closed.
Expand Down Expand Up @@ -218,6 +218,9 @@
row, vde.thisTablet.Alias)
}
vde.controllers[ct.id] = ct
globalStats.mu.Lock()
defer globalStats.mu.Unlock()
globalStats.controllers[ct.id] = ct

Check warning on line 223 in go/vt/vttablet/tabletmanager/vdiff/engine.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletmanager/vdiff/engine.go#L221-L223

Added lines #L221 - L223 were not covered by tests
return nil
}

Expand Down Expand Up @@ -392,4 +395,16 @@
ct.Stop()
}
vde.controllers = make(map[int64]*controller)
vde.updateStats()

Check warning on line 398 in go/vt/vttablet/tabletmanager/vdiff/engine.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletmanager/vdiff/engine.go#L398

Added line #L398 was not covered by tests
}

// updateStats must only be called while holding the engine lock.
func (vre *Engine) updateStats() {
globalStats.mu.Lock()
defer globalStats.mu.Unlock()

Check warning on line 404 in go/vt/vttablet/tabletmanager/vdiff/engine.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletmanager/vdiff/engine.go#L402-L404

Added lines #L402 - L404 were not covered by tests

globalStats.controllers = make(map[int64]*controller, len(vre.controllers))
for id, ct := range vre.controllers {
globalStats.controllers[id] = ct

Check warning on line 408 in go/vt/vttablet/tabletmanager/vdiff/engine.go

View check run for this annotation

Codecov / codecov/patch

go/vt/vttablet/tabletmanager/vdiff/engine.go#L406-L408

Added lines #L406 - L408 were not covered by tests
}
}
Loading
Loading