From 22988df39562f0b4a7ce218d52252e4de9d604d3 Mon Sep 17 00:00:00 2001 From: Matt Lord Date: Thu, 20 Feb 2025 16:31:44 -0500 Subject: [PATCH 1/4] VReplication: Support excluding lagging tablets and use this in vstream manager (#17835) Signed-off-by: Matt Lord --- go/vt/discovery/tablet_picker.go | 17 ++++---- go/vt/discovery/tablet_picker_test.go | 60 +++++++++++++++++++++++++++ go/vt/vtgate/vstream_manager.go | 4 ++ 3 files changed, 74 insertions(+), 7 deletions(-) diff --git a/go/vt/discovery/tablet_picker.go b/go/vt/discovery/tablet_picker.go index cfe1ba2e964..34c7178c350 100644 --- a/go/vt/discovery/tablet_picker.go +++ b/go/vt/discovery/tablet_picker.go @@ -100,9 +100,10 @@ func SetTabletPickerRetryDelay(delay time.Duration) { } type TabletPickerOptions struct { - CellPreference string - TabletOrder string - IncludeNonServingTablets bool + CellPreference string + TabletOrder string + IncludeNonServingTablets bool + ExcludeTabletsWithMaxReplicationLag time.Duration } func parseTabletPickerCellPreferenceString(str string) (TabletPickerCellPreference, error) { @@ -356,8 +357,8 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table if len(candidates) == 0 { // If no viable candidates were found, sleep and try again. tp.incNoTabletFoundStat() - log.Infof("No healthy serving tablet found for streaming, shard %s.%s, cells %v, tabletTypes %v, sleeping for %.3f seconds.", - tp.keyspace, tp.shard, tp.cells, tp.tabletTypes, float64(GetTabletPickerRetryDelay().Milliseconds())/1000.0) + log.Infof("No healthy serving tablet found for streaming, shard %s.%s, cells %v, tabletTypes %v, maxReplicationLag: %v, sleeping for %.3f seconds.", + tp.keyspace, tp.shard, tp.cells, tp.tabletTypes, tp.options.ExcludeTabletsWithMaxReplicationLag, float64(GetTabletPickerRetryDelay().Milliseconds())/1000.0) timer := time.NewTimer(GetTabletPickerRetryDelay()) select { case <-ctx.Done(): @@ -471,8 +472,10 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn if err := conn.StreamHealth(shortCtx, func(shr *querypb.StreamHealthResponse) error { if shr != nil && (shr.Serving || tp.options.IncludeNonServingTablets) && - shr.RealtimeStats != nil && - shr.RealtimeStats.HealthError == "" { + (shr.RealtimeStats != nil && shr.RealtimeStats.HealthError == "" && + (tabletInfo.Tablet.Type == topodatapb.TabletType_PRIMARY /* lag is not relevant */ || + (tp.options.ExcludeTabletsWithMaxReplicationLag == 0 /* not set */ || + shr.RealtimeStats.ReplicationLagSeconds <= uint32(tp.options.ExcludeTabletsWithMaxReplicationLag.Seconds())))) { return io.EOF // End the stream } return vterrors.New(vtrpcpb.Code_INTERNAL, "tablet is not healthy and serving") diff --git a/go/vt/discovery/tablet_picker_test.go b/go/vt/discovery/tablet_picker_test.go index 27c4d8bf7b1..b44ae9adbd1 100644 --- a/go/vt/discovery/tablet_picker_test.go +++ b/go/vt/discovery/tablet_picker_test.go @@ -685,6 +685,61 @@ func TestPickNonServingTablets(t *testing.T) { assert.True(t, picked3) } +// TestPickNonLaggingTablets validates that lagging tablets are excluded when the +// ExcludeTabletsWithMaxReplicationLag option is set. +func TestPickNonLaggingTablets(t *testing.T) { + ctx := utils.LeakCheckContext(t) + cells := []string{"cell1"} + defaultCell := cells[0] + tabletTypes := "replica" + options := TabletPickerOptions{ + ExcludeTabletsWithMaxReplicationLag: lowReplicationLag.Default(), + } + replLag := options.ExcludeTabletsWithMaxReplicationLag + (5 * time.Second) + te := newPickerTestEnv(t, ctx, cells) + + // Tablet should not be selected as we only want replicas. + primaryTablet := addTablet(ctx, te, 100, topodatapb.TabletType_PRIMARY, defaultCell, true, true) + defer deleteTablet(t, te, primaryTablet) + + // Tablet should not be selected as it is lagging. + laggingReplicaTablet := addTabletWithLag(ctx, te, 200, topodatapb.TabletType_REPLICA, defaultCell, true, true, uint32(replLag.Seconds())) + defer deleteTablet(t, te, laggingReplicaTablet) + + // Tablet should be selected because it's a non-lagging replica. + nonLaggingReplicaTablet := addTablet(ctx, te, 300, topodatapb.TabletType_REPLICA, defaultCell, true, true) + defer deleteTablet(t, te, nonLaggingReplicaTablet) + + _, err := te.topoServ.UpdateShardFields(ctx, te.keyspace, te.shard, func(si *topo.ShardInfo) error { + si.PrimaryAlias = primaryTablet.Alias + return nil + }) + require.NoError(t, err) + + tp, err := NewTabletPicker(ctx, te.topoServ, cells, defaultCell, te.keyspace, te.shard, tabletTypes, options) + require.NoError(t, err) + ctx, cancel := context.WithTimeout(ctx, contextTimeout) + defer cancel() + + var pickedPrimary, pickedLaggingReplica, pickedNonLaggingReplica int + for i := 0; i < numTestIterations; i++ { + tablet, err := tp.PickForStreaming(ctx) + require.NoError(t, err) + if proto.Equal(tablet, primaryTablet) { + pickedPrimary++ + } + if proto.Equal(tablet, laggingReplicaTablet) { + pickedLaggingReplica++ + } + if proto.Equal(tablet, nonLaggingReplicaTablet) { + pickedNonLaggingReplica++ + } + } + require.Zero(t, pickedPrimary) + require.Zero(t, pickedLaggingReplica) + require.Equal(t, numTestIterations, pickedNonLaggingReplica) +} + type pickerTestEnv struct { t *testing.T keyspace string @@ -720,6 +775,10 @@ func newPickerTestEnv(t *testing.T, ctx context.Context, cells []string, extraCe } func addTablet(ctx context.Context, te *pickerTestEnv, id int, tabletType topodatapb.TabletType, cell string, serving, healthy bool) *topodatapb.Tablet { + return addTabletWithLag(ctx, te, id, tabletType, cell, serving, healthy, 0) +} + +func addTabletWithLag(ctx context.Context, te *pickerTestEnv, id int, tabletType topodatapb.TabletType, cell string, serving, healthy bool, replLagSecs uint32) *topodatapb.Tablet { tablet := &topodatapb.Tablet{ Alias: &topodatapb.TabletAlias{ Cell: cell, @@ -748,6 +807,7 @@ func addTablet(ctx context.Context, te *pickerTestEnv, id int, tabletType topoda if healthy { shr.RealtimeStats.HealthError = "" } + shr.RealtimeStats.ReplicationLagSeconds = replLagSecs _ = createFixedHealthConn(tablet, shr) diff --git a/go/vt/vtgate/vstream_manager.go b/go/vt/vtgate/vstream_manager.go index 2a48fa9fc6d..ada1ddb131c 100644 --- a/go/vt/vtgate/vstream_manager.go +++ b/go/vt/vtgate/vstream_manager.go @@ -194,6 +194,10 @@ func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.Ta tabletPickerOptions: discovery.TabletPickerOptions{ CellPreference: flags.GetCellPreference(), TabletOrder: flags.GetTabletOrder(), + // This is NOT configurable via the API because we check the + // discovery.GetLowReplicationLag().Seconds() value in the tablet + // health stream. + ExcludeTabletsWithMaxReplicationLag: discovery.GetLowReplicationLag(), }, flags: flags, } From c9c227db69b8f30cc90a43fa023e0d545964713b Mon Sep 17 00:00:00 2001 From: Tim Vaillancourt Date: Thu, 20 Feb 2025 23:24:11 +0100 Subject: [PATCH 2/4] Allow build git envs to be set in `docker/lite` (#17827) Signed-off-by: Tim Vaillancourt --- docker/lite/Dockerfile | 9 +++++++++ docker/lite/Dockerfile.mysql84 | 9 +++++++++ docker/lite/Dockerfile.percona80 | 9 +++++++++ 3 files changed, 27 insertions(+) diff --git a/docker/lite/Dockerfile b/docker/lite/Dockerfile index 70d4787686c..beeb0cfef8c 100644 --- a/docker/lite/Dockerfile +++ b/docker/lite/Dockerfile @@ -17,6 +17,15 @@ FROM --platform=linux/amd64 golang:1.24.0-bookworm AS builder # Allows docker builds to set the BUILD_NUMBER ARG BUILD_NUMBER +# Allows docker builds to set the BUILD_GIT_BRANCH +ARG BUILD_GIT_BRANCH + +# Allows docker builds to set the BUILD_GIT_REV +ARG BUILD_GIT_REV + +# Allows docker builds to set the BUILD_TIME +ARG BUILD_TIME + WORKDIR /vt/src/vitess.io/vitess # Create vitess user diff --git a/docker/lite/Dockerfile.mysql84 b/docker/lite/Dockerfile.mysql84 index 047e3d1f90c..f47758754ae 100644 --- a/docker/lite/Dockerfile.mysql84 +++ b/docker/lite/Dockerfile.mysql84 @@ -17,6 +17,15 @@ FROM --platform=linux/amd64 golang:1.24.0-bookworm AS builder # Allows docker builds to set the BUILD_NUMBER ARG BUILD_NUMBER +# Allows docker builds to set the BUILD_GIT_BRANCH +ARG BUILD_GIT_BRANCH + +# Allows docker builds to set the BUILD_GIT_REV +ARG BUILD_GIT_REV + +# Allows docker builds to set the BUILD_TIME +ARG BUILD_TIME + WORKDIR /vt/src/vitess.io/vitess # Create vitess user diff --git a/docker/lite/Dockerfile.percona80 b/docker/lite/Dockerfile.percona80 index 011f20c4022..9a7626244ec 100644 --- a/docker/lite/Dockerfile.percona80 +++ b/docker/lite/Dockerfile.percona80 @@ -17,6 +17,15 @@ FROM --platform=linux/amd64 golang:1.24.0-bookworm AS builder # Allows docker builds to set the BUILD_NUMBER ARG BUILD_NUMBER +# Allows docker builds to set the BUILD_GIT_BRANCH +ARG BUILD_GIT_BRANCH + +# Allows docker builds to set the BUILD_GIT_REV +ARG BUILD_GIT_REV + +# Allows docker builds to set the BUILD_TIME +ARG BUILD_TIME + WORKDIR /vt/src/vitess.io/vitess # Create vitess user From db77d81d7ca5f919dd48c1a3b923aa0adb65e9c6 Mon Sep 17 00:00:00 2001 From: Manan Gupta <35839558+GuptaManan100@users.noreply.github.com> Date: Fri, 21 Feb 2025 11:55:59 +0530 Subject: [PATCH 3/4] Clear `demotePrimaryStalled` field after the function ends (#17823) Signed-off-by: Manan Gupta --- .../vttablet/tabletmanager/rpc_replication.go | 9 +++++- .../tabletmanager/rpc_replication_test.go | 28 +++++++++++++------ go/vt/vttablet/tabletserver/controller.go | 4 +-- go/vt/vttablet/tabletserver/tabletserver.go | 6 ++-- go/vt/vttablet/tabletservermock/controller.go | 2 +- 5 files changed, 34 insertions(+), 15 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go index 7ac37515b67..070eab9a38a 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication.go @@ -535,6 +535,7 @@ func (tm *TabletManager) demotePrimary(ctx context.Context, revertPartialFailure return nil, err } defer tm.unlock() + defer tm.QueryServiceControl.SetDemotePrimaryStalled(false) finishCtx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -546,10 +547,16 @@ func (tm *TabletManager) demotePrimary(ctx context.Context, revertPartialFailure // We waited for over 10 times of remote operation timeout, but DemotePrimary is still not done. // Collect more information and signal demote primary is indefinitely stalled. log.Errorf("DemotePrimary seems to be stalled. Collecting more information.") - tm.QueryServiceControl.SetDemotePrimaryStalled() + tm.QueryServiceControl.SetDemotePrimaryStalled(true) buf := make([]byte, 1<<16) // 64 KB buffer size stackSize := runtime.Stack(buf, true) log.Errorf("Stack trace:\n%s", string(buf[:stackSize])) + // This condition check is only to handle the race, where we start to set the demote primary stalled + // but then the function finishes. So, after we set demote primary stalled, we check if the + // function has finished and if it has, we clear the demote primary stalled. + if finishCtx.Err() != nil { + tm.QueryServiceControl.SetDemotePrimaryStalled(false) + } } }() diff --git a/go/vt/vttablet/tabletmanager/rpc_replication_test.go b/go/vt/vttablet/tabletmanager/rpc_replication_test.go index b388235811b..4efb7b13081 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication_test.go @@ -50,16 +50,16 @@ func TestWaitForGrantsToHaveApplied(t *testing.T) { type demotePrimaryStallQS struct { tabletserver.Controller - waitTime time.Duration + qsWaitChan chan any primaryStalled atomic.Bool } -func (d *demotePrimaryStallQS) SetDemotePrimaryStalled() { - d.primaryStalled.Store(true) +func (d *demotePrimaryStallQS) SetDemotePrimaryStalled(val bool) { + d.primaryStalled.Store(val) } func (d *demotePrimaryStallQS) IsServing() bool { - time.Sleep(d.waitTime) + <-d.qsWaitChan return false } @@ -74,7 +74,7 @@ func TestDemotePrimaryStalled(t *testing.T) { // Create a fake query service control to intercept calls from DemotePrimary function. qsc := &demotePrimaryStallQS{ - waitTime: 2 * time.Second, + qsWaitChan: make(chan any), } // Create a tablet manager with a replica type tablet. tm := &TabletManager{ @@ -88,8 +88,20 @@ func TestDemotePrimaryStalled(t *testing.T) { QueryServiceControl: qsc, } - // We make IsServing stall for over 2 seconds, which is longer than 10 * remote operation timeout. + go func() { + tm.demotePrimary(context.Background(), false) + }() + // We make IsServing stall by making it wait on a channel. // This should cause the demote primary operation to be stalled. - tm.demotePrimary(context.Background(), false) - require.True(t, qsc.primaryStalled.Load()) + require.Eventually(t, func() bool { + return qsc.primaryStalled.Load() + }, 5*time.Second, 100*time.Millisecond) + + // Unblock the DemotePrimary call by closing the channel. + close(qsc.qsWaitChan) + + // Eventually demote primary will succeed, and we want the stalled field to be cleared. + require.Eventually(t, func() bool { + return !qsc.primaryStalled.Load() + }, 5*time.Second, 100*time.Millisecond) } diff --git a/go/vt/vttablet/tabletserver/controller.go b/go/vt/vttablet/tabletserver/controller.go index ab2875ae27b..94bffd7d84d 100644 --- a/go/vt/vttablet/tabletserver/controller.go +++ b/go/vt/vttablet/tabletserver/controller.go @@ -120,8 +120,8 @@ type Controller interface { // WaitForPreparedTwoPCTransactions waits for all prepared transactions to be resolved. WaitForPreparedTwoPCTransactions(ctx context.Context) error - // SetDemotePrimaryStalled marks that demote primary is stalled in the state manager. - SetDemotePrimaryStalled() + // SetDemotePrimaryStalled sets the demote primary stalled field to the provided value in the state manager. + SetDemotePrimaryStalled(val bool) // IsDiskStalled returns if the disk is stalled. IsDiskStalled() bool diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index e09e04a9679..b12a3588db2 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -761,10 +761,10 @@ func (tsv *TabletServer) WaitForPreparedTwoPCTransactions(ctx context.Context) e } } -// SetDemotePrimaryStalled marks that demote primary is stalled in the state manager. -func (tsv *TabletServer) SetDemotePrimaryStalled() { +// SetDemotePrimaryStalled sets the demote primary stalled field to the provided value in the state manager. +func (tsv *TabletServer) SetDemotePrimaryStalled(val bool) { tsv.sm.mu.Lock() - tsv.sm.demotePrimaryStalled = true + tsv.sm.demotePrimaryStalled = val tsv.sm.mu.Unlock() tsv.BroadcastHealth() } diff --git a/go/vt/vttablet/tabletservermock/controller.go b/go/vt/vttablet/tabletservermock/controller.go index 21b38755302..0d35d8e280f 100644 --- a/go/vt/vttablet/tabletservermock/controller.go +++ b/go/vt/vttablet/tabletservermock/controller.go @@ -275,7 +275,7 @@ func (tqsc *Controller) WaitForPreparedTwoPCTransactions(context.Context) error } // SetDemotePrimaryStalled is part of the tabletserver.Controller interface -func (tqsc *Controller) SetDemotePrimaryStalled() { +func (tqsc *Controller) SetDemotePrimaryStalled(bool) { tqsc.MethodCalled["SetDemotePrimaryStalled"] = true } From 0a6f982bca73269042f5809ed7ee53c7acc836eb Mon Sep 17 00:00:00 2001 From: Kyle Johnson Date: Fri, 21 Feb 2025 03:54:24 -0700 Subject: [PATCH 4/4] Fix vtclient vtgate missing flags (specifically --grpc_*) (#17800) Signed-off-by: Kyle Johnson Co-authored-by: Kyle Johnson --- go/cmd/vtclient/cli/vtclient.go | 2 +- go/cmd/vtclient/cli/vtclient_test.go | 2 ++ go/cmd/vtclient/vtclient.go | 1 + go/vt/servenv/servenv.go | 4 ++-- 4 files changed, 6 insertions(+), 3 deletions(-) diff --git a/go/cmd/vtclient/cli/vtclient.go b/go/cmd/vtclient/cli/vtclient.go index e8bcd9b7ff2..4f3ac6ede27 100644 --- a/go/cmd/vtclient/cli/vtclient.go +++ b/go/cmd/vtclient/cli/vtclient.go @@ -82,7 +82,7 @@ var ( seqChan = make(chan int, 10) ) -func init() { +func InitializeFlags() { servenv.MoveFlagsToCobraCommand(Main) Main.Flags().StringVar(&server, "server", server, "vtgate server to connect to") diff --git a/go/cmd/vtclient/cli/vtclient_test.go b/go/cmd/vtclient/cli/vtclient_test.go index bf0c1206167..83a9d2a3339 100644 --- a/go/cmd/vtclient/cli/vtclient_test.go +++ b/go/cmd/vtclient/cli/vtclient_test.go @@ -121,6 +121,8 @@ func TestVtclient(t *testing.T) { }, } + // initialize the vtclient flags before running any commands + InitializeFlags() for _, q := range queries { // Run main function directly and not as external process. To achieve this, // overwrite os.Args which is used by pflag.Parse(). diff --git a/go/cmd/vtclient/vtclient.go b/go/cmd/vtclient/vtclient.go index 4201d25c882..ccfd31a0ac3 100644 --- a/go/cmd/vtclient/vtclient.go +++ b/go/cmd/vtclient/vtclient.go @@ -22,6 +22,7 @@ import ( ) func main() { + cli.InitializeFlags() if err := cli.Main.Execute(); err != nil { log.Exit(err) } diff --git a/go/vt/servenv/servenv.go b/go/vt/servenv/servenv.go index 22bf3523dfc..42ce4a9cf12 100644 --- a/go/vt/servenv/servenv.go +++ b/go/vt/servenv/servenv.go @@ -336,7 +336,7 @@ func ParseFlagsForTests(cmd string) { // the given cobra command, then copies over the glog flags that otherwise // require manual transferring. func MoveFlagsToCobraCommand(cmd *cobra.Command) { - moveFlags(cmd.Use, cmd.Flags()) + moveFlags(cmd.Name(), cmd.Flags()) } // MovePersistentFlagsToCobraCommand functions exactly like MoveFlagsToCobraCommand, @@ -347,7 +347,7 @@ func MoveFlagsToCobraCommand(cmd *cobra.Command) { // Useful for transferring flags to a parent command whose subcommands should // inherit the servenv-registered flags. func MovePersistentFlagsToCobraCommand(cmd *cobra.Command) { - moveFlags(cmd.Use, cmd.PersistentFlags()) + moveFlags(cmd.Name(), cmd.PersistentFlags()) } func moveFlags(name string, fs *pflag.FlagSet) {