From ef3b80a087d695d8f74c866888a3209e0062557f Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Wed, 19 Feb 2025 16:48:12 +0530 Subject: [PATCH 1/3] feat: clear demote primary after the function ends Signed-off-by: Manan Gupta --- .../vttablet/tabletmanager/rpc_replication.go | 9 ++++++ .../tabletmanager/rpc_replication_test.go | 28 +++++++++++++++---- go/vt/vttablet/tabletserver/controller.go | 3 ++ go/vt/vttablet/tabletserver/tabletserver.go | 8 ++++++ go/vt/vttablet/tabletservermock/controller.go | 5 ++++ 5 files changed, 47 insertions(+), 6 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go index 7ac37515b67..a1ab78369d9 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication.go @@ -535,6 +535,7 @@ func (tm *TabletManager) demotePrimary(ctx context.Context, revertPartialFailure return nil, err } defer tm.unlock() + defer tm.QueryServiceControl.ClearDemotePrimaryStalled() finishCtx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -550,6 +551,14 @@ func (tm *TabletManager) demotePrimary(ctx context.Context, revertPartialFailure buf := make([]byte, 1<<16) // 64 KB buffer size stackSize := runtime.Stack(buf, true) log.Errorf("Stack trace:\n%s", string(buf[:stackSize])) + // This select is only to handle the race, where we start to set the demote primary stalled + // but then the function finishes. So, after we set demote primary stalled, we check if the + // function has finished and if it has, we clear the demote primary stalled. + select { + case <-finishCtx.Done(): + tm.QueryServiceControl.ClearDemotePrimaryStalled() + default: + } } }() diff --git a/go/vt/vttablet/tabletmanager/rpc_replication_test.go b/go/vt/vttablet/tabletmanager/rpc_replication_test.go index b388235811b..4f33a9b0fd3 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication_test.go @@ -50,7 +50,7 @@ func TestWaitForGrantsToHaveApplied(t *testing.T) { type demotePrimaryStallQS struct { tabletserver.Controller - waitTime time.Duration + qsWaitChan chan any primaryStalled atomic.Bool } @@ -58,8 +58,12 @@ func (d *demotePrimaryStallQS) SetDemotePrimaryStalled() { d.primaryStalled.Store(true) } +func (d *demotePrimaryStallQS) ClearDemotePrimaryStalled() { + d.primaryStalled.Store(false) +} + func (d *demotePrimaryStallQS) IsServing() bool { - time.Sleep(d.waitTime) + <-d.qsWaitChan return false } @@ -74,7 +78,7 @@ func TestDemotePrimaryStalled(t *testing.T) { // Create a fake query service control to intercept calls from DemotePrimary function. qsc := &demotePrimaryStallQS{ - waitTime: 2 * time.Second, + qsWaitChan: make(chan any), } // Create a tablet manager with a replica type tablet. tm := &TabletManager{ @@ -88,8 +92,20 @@ func TestDemotePrimaryStalled(t *testing.T) { QueryServiceControl: qsc, } - // We make IsServing stall for over 2 seconds, which is longer than 10 * remote operation timeout. + go func() { + tm.demotePrimary(context.Background(), false) + }() + // We make IsServing stall by making it wait on a channel. // This should cause the demote primary operation to be stalled. - tm.demotePrimary(context.Background(), false) - require.True(t, qsc.primaryStalled.Load()) + require.Eventually(t, func() bool { + return qsc.primaryStalled.Load() + }, 5*time.Second, 100*time.Millisecond) + + // Unblock the DemotePrimary call by closing the channel. + close(qsc.qsWaitChan) + + // Eventually demote primary will succeed, and we want the stalled field to be cleared. + require.Eventually(t, func() bool { + return !qsc.primaryStalled.Load() + }, 5*time.Second, 100*time.Millisecond) } diff --git a/go/vt/vttablet/tabletserver/controller.go b/go/vt/vttablet/tabletserver/controller.go index ab2875ae27b..b1aeda7b1fc 100644 --- a/go/vt/vttablet/tabletserver/controller.go +++ b/go/vt/vttablet/tabletserver/controller.go @@ -123,6 +123,9 @@ type Controller interface { // SetDemotePrimaryStalled marks that demote primary is stalled in the state manager. SetDemotePrimaryStalled() + // ClearDemotePrimaryStalled clears the demote primary stalled field in the state manager. + ClearDemotePrimaryStalled() + // IsDiskStalled returns if the disk is stalled. IsDiskStalled() bool } diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index e09e04a9679..0c1f3877287 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -769,6 +769,14 @@ func (tsv *TabletServer) SetDemotePrimaryStalled() { tsv.BroadcastHealth() } +// ClearDemotePrimaryStalled clears the demote primary stalled field in the state manager. +func (tsv *TabletServer) ClearDemotePrimaryStalled() { + tsv.sm.mu.Lock() + tsv.sm.demotePrimaryStalled = false + tsv.sm.mu.Unlock() + tsv.BroadcastHealth() +} + // IsDiskStalled returns if the disk is stalled or not. func (tsv *TabletServer) IsDiskStalled() bool { return tsv.sm.diskHealthMonitor.IsDiskStalled() diff --git a/go/vt/vttablet/tabletservermock/controller.go b/go/vt/vttablet/tabletservermock/controller.go index 21b38755302..34c021d0226 100644 --- a/go/vt/vttablet/tabletservermock/controller.go +++ b/go/vt/vttablet/tabletservermock/controller.go @@ -279,6 +279,11 @@ func (tqsc *Controller) SetDemotePrimaryStalled() { tqsc.MethodCalled["SetDemotePrimaryStalled"] = true } +// ClearDemotePrimaryStalled is part of the tabletserver.Controller interface +func (tqsc *Controller) ClearDemotePrimaryStalled() { + tqsc.MethodCalled["ClearDemotePrimaryStalled"] = true +} + // IsDiskStalled is part of the tabletserver.Controller interface func (tqsc *Controller) IsDiskStalled() bool { tqsc.MethodCalled["IsDiskStalled"] = true From 57fce9b881170db8b5f449e55d9ec523734746ce Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Wed, 19 Feb 2025 19:02:26 +0530 Subject: [PATCH 2/3] feat: simplify code Signed-off-by: Manan Gupta --- go/vt/vttablet/tabletmanager/rpc_replication.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go index a1ab78369d9..e5834617423 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication.go @@ -551,13 +551,11 @@ func (tm *TabletManager) demotePrimary(ctx context.Context, revertPartialFailure buf := make([]byte, 1<<16) // 64 KB buffer size stackSize := runtime.Stack(buf, true) log.Errorf("Stack trace:\n%s", string(buf[:stackSize])) - // This select is only to handle the race, where we start to set the demote primary stalled + // This condition check is only to handle the race, where we start to set the demote primary stalled // but then the function finishes. So, after we set demote primary stalled, we check if the // function has finished and if it has, we clear the demote primary stalled. - select { - case <-finishCtx.Done(): + if finishCtx.Err() != nil { tm.QueryServiceControl.ClearDemotePrimaryStalled() - default: } } }() From 26acc0715eae0c193f38897ded06cdea3886c0e8 Mon Sep 17 00:00:00 2001 From: Manan Gupta Date: Thu, 20 Feb 2025 10:57:38 +0530 Subject: [PATCH 3/3] feat: simplify the interface Signed-off-by: Manan Gupta --- go/vt/vttablet/tabletmanager/rpc_replication.go | 6 +++--- .../vttablet/tabletmanager/rpc_replication_test.go | 8 ++------ go/vt/vttablet/tabletserver/controller.go | 7 ++----- go/vt/vttablet/tabletserver/tabletserver.go | 14 +++----------- go/vt/vttablet/tabletservermock/controller.go | 7 +------ 5 files changed, 11 insertions(+), 31 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/rpc_replication.go b/go/vt/vttablet/tabletmanager/rpc_replication.go index e5834617423..070eab9a38a 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication.go @@ -535,7 +535,7 @@ func (tm *TabletManager) demotePrimary(ctx context.Context, revertPartialFailure return nil, err } defer tm.unlock() - defer tm.QueryServiceControl.ClearDemotePrimaryStalled() + defer tm.QueryServiceControl.SetDemotePrimaryStalled(false) finishCtx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -547,7 +547,7 @@ func (tm *TabletManager) demotePrimary(ctx context.Context, revertPartialFailure // We waited for over 10 times of remote operation timeout, but DemotePrimary is still not done. // Collect more information and signal demote primary is indefinitely stalled. log.Errorf("DemotePrimary seems to be stalled. Collecting more information.") - tm.QueryServiceControl.SetDemotePrimaryStalled() + tm.QueryServiceControl.SetDemotePrimaryStalled(true) buf := make([]byte, 1<<16) // 64 KB buffer size stackSize := runtime.Stack(buf, true) log.Errorf("Stack trace:\n%s", string(buf[:stackSize])) @@ -555,7 +555,7 @@ func (tm *TabletManager) demotePrimary(ctx context.Context, revertPartialFailure // but then the function finishes. So, after we set demote primary stalled, we check if the // function has finished and if it has, we clear the demote primary stalled. if finishCtx.Err() != nil { - tm.QueryServiceControl.ClearDemotePrimaryStalled() + tm.QueryServiceControl.SetDemotePrimaryStalled(false) } } }() diff --git a/go/vt/vttablet/tabletmanager/rpc_replication_test.go b/go/vt/vttablet/tabletmanager/rpc_replication_test.go index 4f33a9b0fd3..4efb7b13081 100644 --- a/go/vt/vttablet/tabletmanager/rpc_replication_test.go +++ b/go/vt/vttablet/tabletmanager/rpc_replication_test.go @@ -54,12 +54,8 @@ type demotePrimaryStallQS struct { primaryStalled atomic.Bool } -func (d *demotePrimaryStallQS) SetDemotePrimaryStalled() { - d.primaryStalled.Store(true) -} - -func (d *demotePrimaryStallQS) ClearDemotePrimaryStalled() { - d.primaryStalled.Store(false) +func (d *demotePrimaryStallQS) SetDemotePrimaryStalled(val bool) { + d.primaryStalled.Store(val) } func (d *demotePrimaryStallQS) IsServing() bool { diff --git a/go/vt/vttablet/tabletserver/controller.go b/go/vt/vttablet/tabletserver/controller.go index b1aeda7b1fc..94bffd7d84d 100644 --- a/go/vt/vttablet/tabletserver/controller.go +++ b/go/vt/vttablet/tabletserver/controller.go @@ -120,11 +120,8 @@ type Controller interface { // WaitForPreparedTwoPCTransactions waits for all prepared transactions to be resolved. WaitForPreparedTwoPCTransactions(ctx context.Context) error - // SetDemotePrimaryStalled marks that demote primary is stalled in the state manager. - SetDemotePrimaryStalled() - - // ClearDemotePrimaryStalled clears the demote primary stalled field in the state manager. - ClearDemotePrimaryStalled() + // SetDemotePrimaryStalled sets the demote primary stalled field to the provided value in the state manager. + SetDemotePrimaryStalled(val bool) // IsDiskStalled returns if the disk is stalled. IsDiskStalled() bool diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 0c1f3877287..b12a3588db2 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -761,18 +761,10 @@ func (tsv *TabletServer) WaitForPreparedTwoPCTransactions(ctx context.Context) e } } -// SetDemotePrimaryStalled marks that demote primary is stalled in the state manager. -func (tsv *TabletServer) SetDemotePrimaryStalled() { +// SetDemotePrimaryStalled sets the demote primary stalled field to the provided value in the state manager. +func (tsv *TabletServer) SetDemotePrimaryStalled(val bool) { tsv.sm.mu.Lock() - tsv.sm.demotePrimaryStalled = true - tsv.sm.mu.Unlock() - tsv.BroadcastHealth() -} - -// ClearDemotePrimaryStalled clears the demote primary stalled field in the state manager. -func (tsv *TabletServer) ClearDemotePrimaryStalled() { - tsv.sm.mu.Lock() - tsv.sm.demotePrimaryStalled = false + tsv.sm.demotePrimaryStalled = val tsv.sm.mu.Unlock() tsv.BroadcastHealth() } diff --git a/go/vt/vttablet/tabletservermock/controller.go b/go/vt/vttablet/tabletservermock/controller.go index 34c021d0226..0d35d8e280f 100644 --- a/go/vt/vttablet/tabletservermock/controller.go +++ b/go/vt/vttablet/tabletservermock/controller.go @@ -275,15 +275,10 @@ func (tqsc *Controller) WaitForPreparedTwoPCTransactions(context.Context) error } // SetDemotePrimaryStalled is part of the tabletserver.Controller interface -func (tqsc *Controller) SetDemotePrimaryStalled() { +func (tqsc *Controller) SetDemotePrimaryStalled(bool) { tqsc.MethodCalled["SetDemotePrimaryStalled"] = true } -// ClearDemotePrimaryStalled is part of the tabletserver.Controller interface -func (tqsc *Controller) ClearDemotePrimaryStalled() { - tqsc.MethodCalled["ClearDemotePrimaryStalled"] = true -} - // IsDiskStalled is part of the tabletserver.Controller interface func (tqsc *Controller) IsDiskStalled() bool { tqsc.MethodCalled["IsDiskStalled"] = true