Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into acl-empty
Browse files Browse the repository at this point in the history
Signed-off-by: Mohamed Hamza <mhamza15@github.com>
  • Loading branch information
mhamza15 committed Feb 21, 2025
2 parents cf85e42 + 0a6f982 commit 799b74d
Show file tree
Hide file tree
Showing 15 changed files with 141 additions and 25 deletions.
9 changes: 9 additions & 0 deletions docker/lite/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@ FROM --platform=linux/amd64 golang:1.24.0-bookworm AS builder
# Allows docker builds to set the BUILD_NUMBER
ARG BUILD_NUMBER

# Allows docker builds to set the BUILD_GIT_BRANCH
ARG BUILD_GIT_BRANCH

# Allows docker builds to set the BUILD_GIT_REV
ARG BUILD_GIT_REV

# Allows docker builds to set the BUILD_TIME
ARG BUILD_TIME

WORKDIR /vt/src/vitess.io/vitess

# Create vitess user
Expand Down
9 changes: 9 additions & 0 deletions docker/lite/Dockerfile.mysql84
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@ FROM --platform=linux/amd64 golang:1.24.0-bookworm AS builder
# Allows docker builds to set the BUILD_NUMBER
ARG BUILD_NUMBER

# Allows docker builds to set the BUILD_GIT_BRANCH
ARG BUILD_GIT_BRANCH

# Allows docker builds to set the BUILD_GIT_REV
ARG BUILD_GIT_REV

# Allows docker builds to set the BUILD_TIME
ARG BUILD_TIME

WORKDIR /vt/src/vitess.io/vitess

# Create vitess user
Expand Down
9 changes: 9 additions & 0 deletions docker/lite/Dockerfile.percona80
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,15 @@ FROM --platform=linux/amd64 golang:1.24.0-bookworm AS builder
# Allows docker builds to set the BUILD_NUMBER
ARG BUILD_NUMBER

# Allows docker builds to set the BUILD_GIT_BRANCH
ARG BUILD_GIT_BRANCH

# Allows docker builds to set the BUILD_GIT_REV
ARG BUILD_GIT_REV

# Allows docker builds to set the BUILD_TIME
ARG BUILD_TIME

WORKDIR /vt/src/vitess.io/vitess

# Create vitess user
Expand Down
2 changes: 1 addition & 1 deletion go/cmd/vtclient/cli/vtclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ var (
seqChan = make(chan int, 10)
)

func init() {
func InitializeFlags() {
servenv.MoveFlagsToCobraCommand(Main)

Main.Flags().StringVar(&server, "server", server, "vtgate server to connect to")
Expand Down
2 changes: 2 additions & 0 deletions go/cmd/vtclient/cli/vtclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ func TestVtclient(t *testing.T) {
},
}

// initialize the vtclient flags before running any commands
InitializeFlags()
for _, q := range queries {
// Run main function directly and not as external process. To achieve this,
// overwrite os.Args which is used by pflag.Parse().
Expand Down
1 change: 1 addition & 0 deletions go/cmd/vtclient/vtclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
)

func main() {
cli.InitializeFlags()
if err := cli.Main.Execute(); err != nil {
log.Exit(err)
}
Expand Down
17 changes: 10 additions & 7 deletions go/vt/discovery/tablet_picker.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,9 +100,10 @@ func SetTabletPickerRetryDelay(delay time.Duration) {
}

type TabletPickerOptions struct {
CellPreference string
TabletOrder string
IncludeNonServingTablets bool
CellPreference string
TabletOrder string
IncludeNonServingTablets bool
ExcludeTabletsWithMaxReplicationLag time.Duration
}

func parseTabletPickerCellPreferenceString(str string) (TabletPickerCellPreference, error) {
Expand Down Expand Up @@ -356,8 +357,8 @@ func (tp *TabletPicker) PickForStreaming(ctx context.Context) (*topodatapb.Table
if len(candidates) == 0 {
// If no viable candidates were found, sleep and try again.
tp.incNoTabletFoundStat()
log.Infof("No healthy serving tablet found for streaming, shard %s.%s, cells %v, tabletTypes %v, sleeping for %.3f seconds.",
tp.keyspace, tp.shard, tp.cells, tp.tabletTypes, float64(GetTabletPickerRetryDelay().Milliseconds())/1000.0)
log.Infof("No healthy serving tablet found for streaming, shard %s.%s, cells %v, tabletTypes %v, maxReplicationLag: %v, sleeping for %.3f seconds.",
tp.keyspace, tp.shard, tp.cells, tp.tabletTypes, tp.options.ExcludeTabletsWithMaxReplicationLag, float64(GetTabletPickerRetryDelay().Milliseconds())/1000.0)
timer := time.NewTimer(GetTabletPickerRetryDelay())
select {
case <-ctx.Done():
Expand Down Expand Up @@ -471,8 +472,10 @@ func (tp *TabletPicker) GetMatchingTablets(ctx context.Context) []*topo.TabletIn
if err := conn.StreamHealth(shortCtx, func(shr *querypb.StreamHealthResponse) error {
if shr != nil &&
(shr.Serving || tp.options.IncludeNonServingTablets) &&
shr.RealtimeStats != nil &&
shr.RealtimeStats.HealthError == "" {
(shr.RealtimeStats != nil && shr.RealtimeStats.HealthError == "" &&
(tabletInfo.Tablet.Type == topodatapb.TabletType_PRIMARY /* lag is not relevant */ ||
(tp.options.ExcludeTabletsWithMaxReplicationLag == 0 /* not set */ ||
shr.RealtimeStats.ReplicationLagSeconds <= uint32(tp.options.ExcludeTabletsWithMaxReplicationLag.Seconds())))) {
return io.EOF // End the stream
}
return vterrors.New(vtrpcpb.Code_INTERNAL, "tablet is not healthy and serving")
Expand Down
60 changes: 60 additions & 0 deletions go/vt/discovery/tablet_picker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,61 @@ func TestPickNonServingTablets(t *testing.T) {
assert.True(t, picked3)
}

// TestPickNonLaggingTablets validates that lagging tablets are excluded when the
// ExcludeTabletsWithMaxReplicationLag option is set.
func TestPickNonLaggingTablets(t *testing.T) {
ctx := utils.LeakCheckContext(t)
cells := []string{"cell1"}
defaultCell := cells[0]
tabletTypes := "replica"
options := TabletPickerOptions{
ExcludeTabletsWithMaxReplicationLag: lowReplicationLag.Default(),
}
replLag := options.ExcludeTabletsWithMaxReplicationLag + (5 * time.Second)
te := newPickerTestEnv(t, ctx, cells)

// Tablet should not be selected as we only want replicas.
primaryTablet := addTablet(ctx, te, 100, topodatapb.TabletType_PRIMARY, defaultCell, true, true)
defer deleteTablet(t, te, primaryTablet)

// Tablet should not be selected as it is lagging.
laggingReplicaTablet := addTabletWithLag(ctx, te, 200, topodatapb.TabletType_REPLICA, defaultCell, true, true, uint32(replLag.Seconds()))
defer deleteTablet(t, te, laggingReplicaTablet)

// Tablet should be selected because it's a non-lagging replica.
nonLaggingReplicaTablet := addTablet(ctx, te, 300, topodatapb.TabletType_REPLICA, defaultCell, true, true)
defer deleteTablet(t, te, nonLaggingReplicaTablet)

_, err := te.topoServ.UpdateShardFields(ctx, te.keyspace, te.shard, func(si *topo.ShardInfo) error {
si.PrimaryAlias = primaryTablet.Alias
return nil
})
require.NoError(t, err)

tp, err := NewTabletPicker(ctx, te.topoServ, cells, defaultCell, te.keyspace, te.shard, tabletTypes, options)
require.NoError(t, err)
ctx, cancel := context.WithTimeout(ctx, contextTimeout)
defer cancel()

var pickedPrimary, pickedLaggingReplica, pickedNonLaggingReplica int
for i := 0; i < numTestIterations; i++ {
tablet, err := tp.PickForStreaming(ctx)
require.NoError(t, err)
if proto.Equal(tablet, primaryTablet) {
pickedPrimary++
}
if proto.Equal(tablet, laggingReplicaTablet) {
pickedLaggingReplica++
}
if proto.Equal(tablet, nonLaggingReplicaTablet) {
pickedNonLaggingReplica++
}
}
require.Zero(t, pickedPrimary)
require.Zero(t, pickedLaggingReplica)
require.Equal(t, numTestIterations, pickedNonLaggingReplica)
}

type pickerTestEnv struct {
t *testing.T
keyspace string
Expand Down Expand Up @@ -720,6 +775,10 @@ func newPickerTestEnv(t *testing.T, ctx context.Context, cells []string, extraCe
}

func addTablet(ctx context.Context, te *pickerTestEnv, id int, tabletType topodatapb.TabletType, cell string, serving, healthy bool) *topodatapb.Tablet {
return addTabletWithLag(ctx, te, id, tabletType, cell, serving, healthy, 0)
}

func addTabletWithLag(ctx context.Context, te *pickerTestEnv, id int, tabletType topodatapb.TabletType, cell string, serving, healthy bool, replLagSecs uint32) *topodatapb.Tablet {
tablet := &topodatapb.Tablet{
Alias: &topodatapb.TabletAlias{
Cell: cell,
Expand Down Expand Up @@ -748,6 +807,7 @@ func addTablet(ctx context.Context, te *pickerTestEnv, id int, tabletType topoda
if healthy {
shr.RealtimeStats.HealthError = ""
}
shr.RealtimeStats.ReplicationLagSeconds = replLagSecs

_ = createFixedHealthConn(tablet, shr)

Expand Down
4 changes: 2 additions & 2 deletions go/vt/servenv/servenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ func ParseFlagsForTests(cmd string) {
// the given cobra command, then copies over the glog flags that otherwise
// require manual transferring.
func MoveFlagsToCobraCommand(cmd *cobra.Command) {
moveFlags(cmd.Use, cmd.Flags())
moveFlags(cmd.Name(), cmd.Flags())
}

// MovePersistentFlagsToCobraCommand functions exactly like MoveFlagsToCobraCommand,
Expand All @@ -347,7 +347,7 @@ func MoveFlagsToCobraCommand(cmd *cobra.Command) {
// Useful for transferring flags to a parent command whose subcommands should
// inherit the servenv-registered flags.
func MovePersistentFlagsToCobraCommand(cmd *cobra.Command) {
moveFlags(cmd.Use, cmd.PersistentFlags())
moveFlags(cmd.Name(), cmd.PersistentFlags())
}

func moveFlags(name string, fs *pflag.FlagSet) {
Expand Down
4 changes: 4 additions & 0 deletions go/vt/vtgate/vstream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ func (vsm *vstreamManager) VStream(ctx context.Context, tabletType topodatapb.Ta
tabletPickerOptions: discovery.TabletPickerOptions{
CellPreference: flags.GetCellPreference(),
TabletOrder: flags.GetTabletOrder(),
// This is NOT configurable via the API because we check the
// discovery.GetLowReplicationLag().Seconds() value in the tablet
// health stream.
ExcludeTabletsWithMaxReplicationLag: discovery.GetLowReplicationLag(),
},
flags: flags,
}
Expand Down
9 changes: 8 additions & 1 deletion go/vt/vttablet/tabletmanager/rpc_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,7 @@ func (tm *TabletManager) demotePrimary(ctx context.Context, revertPartialFailure
return nil, err
}
defer tm.unlock()
defer tm.QueryServiceControl.SetDemotePrimaryStalled(false)

finishCtx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -546,10 +547,16 @@ func (tm *TabletManager) demotePrimary(ctx context.Context, revertPartialFailure
// We waited for over 10 times of remote operation timeout, but DemotePrimary is still not done.
// Collect more information and signal demote primary is indefinitely stalled.
log.Errorf("DemotePrimary seems to be stalled. Collecting more information.")
tm.QueryServiceControl.SetDemotePrimaryStalled()
tm.QueryServiceControl.SetDemotePrimaryStalled(true)
buf := make([]byte, 1<<16) // 64 KB buffer size
stackSize := runtime.Stack(buf, true)
log.Errorf("Stack trace:\n%s", string(buf[:stackSize]))
// This condition check is only to handle the race, where we start to set the demote primary stalled
// but then the function finishes. So, after we set demote primary stalled, we check if the
// function has finished and if it has, we clear the demote primary stalled.
if finishCtx.Err() != nil {
tm.QueryServiceControl.SetDemotePrimaryStalled(false)
}
}
}()

Expand Down
28 changes: 20 additions & 8 deletions go/vt/vttablet/tabletmanager/rpc_replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,16 @@ func TestWaitForGrantsToHaveApplied(t *testing.T) {

type demotePrimaryStallQS struct {
tabletserver.Controller
waitTime time.Duration
qsWaitChan chan any
primaryStalled atomic.Bool
}

func (d *demotePrimaryStallQS) SetDemotePrimaryStalled() {
d.primaryStalled.Store(true)
func (d *demotePrimaryStallQS) SetDemotePrimaryStalled(val bool) {
d.primaryStalled.Store(val)
}

func (d *demotePrimaryStallQS) IsServing() bool {
time.Sleep(d.waitTime)
<-d.qsWaitChan
return false
}

Expand All @@ -74,7 +74,7 @@ func TestDemotePrimaryStalled(t *testing.T) {

// Create a fake query service control to intercept calls from DemotePrimary function.
qsc := &demotePrimaryStallQS{
waitTime: 2 * time.Second,
qsWaitChan: make(chan any),
}
// Create a tablet manager with a replica type tablet.
tm := &TabletManager{
Expand All @@ -88,8 +88,20 @@ func TestDemotePrimaryStalled(t *testing.T) {
QueryServiceControl: qsc,
}

// We make IsServing stall for over 2 seconds, which is longer than 10 * remote operation timeout.
go func() {
tm.demotePrimary(context.Background(), false)
}()
// We make IsServing stall by making it wait on a channel.
// This should cause the demote primary operation to be stalled.
tm.demotePrimary(context.Background(), false)
require.True(t, qsc.primaryStalled.Load())
require.Eventually(t, func() bool {
return qsc.primaryStalled.Load()
}, 5*time.Second, 100*time.Millisecond)

// Unblock the DemotePrimary call by closing the channel.
close(qsc.qsWaitChan)

// Eventually demote primary will succeed, and we want the stalled field to be cleared.
require.Eventually(t, func() bool {
return !qsc.primaryStalled.Load()
}, 5*time.Second, 100*time.Millisecond)
}
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ type Controller interface {
// WaitForPreparedTwoPCTransactions waits for all prepared transactions to be resolved.
WaitForPreparedTwoPCTransactions(ctx context.Context) error

// SetDemotePrimaryStalled marks that demote primary is stalled in the state manager.
SetDemotePrimaryStalled()
// SetDemotePrimaryStalled sets the demote primary stalled field to the provided value in the state manager.
SetDemotePrimaryStalled(val bool)

// IsDiskStalled returns if the disk is stalled.
IsDiskStalled() bool
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,10 +761,10 @@ func (tsv *TabletServer) WaitForPreparedTwoPCTransactions(ctx context.Context) e
}
}

// SetDemotePrimaryStalled marks that demote primary is stalled in the state manager.
func (tsv *TabletServer) SetDemotePrimaryStalled() {
// SetDemotePrimaryStalled sets the demote primary stalled field to the provided value in the state manager.
func (tsv *TabletServer) SetDemotePrimaryStalled(val bool) {
tsv.sm.mu.Lock()
tsv.sm.demotePrimaryStalled = true
tsv.sm.demotePrimaryStalled = val
tsv.sm.mu.Unlock()
tsv.BroadcastHealth()
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletservermock/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (tqsc *Controller) WaitForPreparedTwoPCTransactions(context.Context) error
}

// SetDemotePrimaryStalled is part of the tabletserver.Controller interface
func (tqsc *Controller) SetDemotePrimaryStalled() {
func (tqsc *Controller) SetDemotePrimaryStalled(bool) {
tqsc.MethodCalled["SetDemotePrimaryStalled"] = true
}

Expand Down

0 comments on commit 799b74d

Please sign in to comment.