Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clear demotePrimaryStalled field after the function ends #17823

Merged
merged 3 commits into from
Feb 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion go/vt/vttablet/tabletmanager/rpc_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,6 +535,7 @@ func (tm *TabletManager) demotePrimary(ctx context.Context, revertPartialFailure
return nil, err
}
defer tm.unlock()
defer tm.QueryServiceControl.SetDemotePrimaryStalled(false)

finishCtx, cancel := context.WithCancel(context.Background())
defer cancel()
Expand All @@ -546,10 +547,16 @@ func (tm *TabletManager) demotePrimary(ctx context.Context, revertPartialFailure
// We waited for over 10 times of remote operation timeout, but DemotePrimary is still not done.
// Collect more information and signal demote primary is indefinitely stalled.
log.Errorf("DemotePrimary seems to be stalled. Collecting more information.")
tm.QueryServiceControl.SetDemotePrimaryStalled()
tm.QueryServiceControl.SetDemotePrimaryStalled(true)
buf := make([]byte, 1<<16) // 64 KB buffer size
stackSize := runtime.Stack(buf, true)
log.Errorf("Stack trace:\n%s", string(buf[:stackSize]))
// This condition check is only to handle the race, where we start to set the demote primary stalled
// but then the function finishes. So, after we set demote primary stalled, we check if the
// function has finished and if it has, we clear the demote primary stalled.
if finishCtx.Err() != nil {
tm.QueryServiceControl.SetDemotePrimaryStalled(false)
}
}
}()

Expand Down
28 changes: 20 additions & 8 deletions go/vt/vttablet/tabletmanager/rpc_replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,16 @@ func TestWaitForGrantsToHaveApplied(t *testing.T) {

type demotePrimaryStallQS struct {
tabletserver.Controller
waitTime time.Duration
qsWaitChan chan any
primaryStalled atomic.Bool
}

func (d *demotePrimaryStallQS) SetDemotePrimaryStalled() {
d.primaryStalled.Store(true)
func (d *demotePrimaryStallQS) SetDemotePrimaryStalled(val bool) {
d.primaryStalled.Store(val)
}

func (d *demotePrimaryStallQS) IsServing() bool {
time.Sleep(d.waitTime)
<-d.qsWaitChan
return false
}

Expand All @@ -74,7 +74,7 @@ func TestDemotePrimaryStalled(t *testing.T) {

// Create a fake query service control to intercept calls from DemotePrimary function.
qsc := &demotePrimaryStallQS{
waitTime: 2 * time.Second,
qsWaitChan: make(chan any),
}
// Create a tablet manager with a replica type tablet.
tm := &TabletManager{
Expand All @@ -88,8 +88,20 @@ func TestDemotePrimaryStalled(t *testing.T) {
QueryServiceControl: qsc,
}

// We make IsServing stall for over 2 seconds, which is longer than 10 * remote operation timeout.
go func() {
tm.demotePrimary(context.Background(), false)
}()
// We make IsServing stall by making it wait on a channel.
// This should cause the demote primary operation to be stalled.
tm.demotePrimary(context.Background(), false)
require.True(t, qsc.primaryStalled.Load())
require.Eventually(t, func() bool {
return qsc.primaryStalled.Load()
}, 5*time.Second, 100*time.Millisecond)

// Unblock the DemotePrimary call by closing the channel.
close(qsc.qsWaitChan)

// Eventually demote primary will succeed, and we want the stalled field to be cleared.
require.Eventually(t, func() bool {
return !qsc.primaryStalled.Load()
}, 5*time.Second, 100*time.Millisecond)
}
4 changes: 2 additions & 2 deletions go/vt/vttablet/tabletserver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,8 @@ type Controller interface {
// WaitForPreparedTwoPCTransactions waits for all prepared transactions to be resolved.
WaitForPreparedTwoPCTransactions(ctx context.Context) error

// SetDemotePrimaryStalled marks that demote primary is stalled in the state manager.
SetDemotePrimaryStalled()
// SetDemotePrimaryStalled sets the demote primary stalled field to the provided value in the state manager.
SetDemotePrimaryStalled(val bool)

// IsDiskStalled returns if the disk is stalled.
IsDiskStalled() bool
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -761,10 +761,10 @@ func (tsv *TabletServer) WaitForPreparedTwoPCTransactions(ctx context.Context) e
}
}

// SetDemotePrimaryStalled marks that demote primary is stalled in the state manager.
func (tsv *TabletServer) SetDemotePrimaryStalled() {
// SetDemotePrimaryStalled sets the demote primary stalled field to the provided value in the state manager.
func (tsv *TabletServer) SetDemotePrimaryStalled(val bool) {
tsv.sm.mu.Lock()
tsv.sm.demotePrimaryStalled = true
tsv.sm.demotePrimaryStalled = val
tsv.sm.mu.Unlock()
tsv.BroadcastHealth()
}
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletservermock/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func (tqsc *Controller) WaitForPreparedTwoPCTransactions(context.Context) error
}

// SetDemotePrimaryStalled is part of the tabletserver.Controller interface
func (tqsc *Controller) SetDemotePrimaryStalled() {
func (tqsc *Controller) SetDemotePrimaryStalled(bool) {
tqsc.MethodCalled["SetDemotePrimaryStalled"] = true
}

Expand Down
Loading