Skip to content

Commit

Permalink
feat: move the semi-sync monitor from the tabletserver to tablet mana…
Browse files Browse the repository at this point in the history
…ger to only start it when required

Signed-off-by: Manan Gupta <manan@planetscale.com>
  • Loading branch information
GuptaManan100 committed Feb 17, 2025
1 parent 7286419 commit 4725dd9
Show file tree
Hide file tree
Showing 9 changed files with 93 additions and 111 deletions.
2 changes: 2 additions & 0 deletions go/cmd/vttablet/cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vtenv"
"vitess.io/vitess/go/vt/vttablet/tabletmanager"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/semisyncmonitor"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vdiff"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
"vitess.io/vitess/go/vt/vttablet/tabletserver"
Expand Down Expand Up @@ -168,6 +169,7 @@ func run(cmd *cobra.Command, args []string) error {
QueryServiceControl: qsc,
UpdateStream: binlog.NewUpdateStream(ts, tablet.Keyspace, tabletAlias.Cell, qsc.SchemaEngine(), env.Parser()),
VREngine: vreplication.NewEngine(env, config, ts, tabletAlias.Cell, mysqld, qsc.LagThrottler()),
SemiSyncMonitor: semisyncmonitor.NewMonitor(config, qsc.Exporter()),
VDiffEngine: vdiff.NewEngine(ts, tablet, env.CollationEnv(), env.Parser()),
}
if err := tm.Start(tablet, config); err != nil {
Expand Down
12 changes: 10 additions & 2 deletions go/vt/vttablet/tabletmanager/rpc_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (tm *TabletManager) FullStatus(ctx context.Context) (*replicationdatapb.Ful
SemiSyncPrimaryClients: semiSyncClients,
SemiSyncPrimaryTimeout: semiSyncTimeout,
SemiSyncWaitForReplicaCount: semiSyncNumReplicas,
SemiSyncMonitorBlocked: tm.QueryServiceControl.SemiSyncMonitorBlocked(),
SemiSyncMonitorBlocked: tm.SemiSyncMonitor.AllWritesBlocked(),
SuperReadOnly: superReadOnly,
ReplicationConfiguration: replConfiguration,
}, nil
Expand Down Expand Up @@ -588,7 +588,7 @@ func (tm *TabletManager) demotePrimary(ctx context.Context, revertPartialFailure

// Now we know no writes are in-flight and no new writes can occur.
// We just need to wait for no write being blocked on semi-sync ACKs.
err = tm.QueryServiceControl.WaitUntilSemiSyncBeingUnblocked(ctx)
err = tm.SemiSyncMonitor.WaitUntilSemiSyncUnblocked(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1052,10 +1052,18 @@ func (tm *TabletManager) fixSemiSync(ctx context.Context, tabletType topodatapb.
case SemiSyncActionNone:
return nil
case SemiSyncActionSet:
// We want to enable the semi-sync monitor only if the tablet is going to start
// expecting semi-sync ACKs.
if tabletType == topodatapb.TabletType_PRIMARY {
tm.SemiSyncMonitor.Open()
} else {
tm.SemiSyncMonitor.Close()
}
// Always enable replica-side since it doesn't hurt to keep it on for a primary.
// The primary-side needs to be off for a replica, or else it will get stuck.
return tm.MysqlDaemon.SetSemiSyncEnabled(ctx, tabletType == topodatapb.TabletType_PRIMARY, true)
case SemiSyncActionUnset:
tm.SemiSyncMonitor.Close()
return tm.MysqlDaemon.SetSemiSyncEnabled(ctx, false, false)
default:
return vterrors.Errorf(vtrpc.Code_INTERNAL, "Unknown SemiSyncAction - %v", semiSync)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"vitess.io/vitess/go/vt/dbconnpool"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
)
Expand All @@ -47,8 +48,8 @@ const (
// blocking PRS. The monitor looks for this situation and manufactures a write
// periodically to unblock the primary.
type Monitor struct {
// env is used to get the connection parameters.
env tabletenv.Env
// config is used to get the connection parameters.
config *tabletenv.TabletConfig
// ticks is the ticker on which we'll check
// if the primary is blocked on semi-sync ACKs or not.
ticks *timer.Timer
Expand Down Expand Up @@ -77,15 +78,15 @@ type Monitor struct {
}

// NewMonitor creates a new Monitor.
func NewMonitor(env tabletenv.Env) *Monitor {
func NewMonitor(config *tabletenv.TabletConfig, exporter *servenv.Exporter) *Monitor {
return &Monitor{
env: env,
ticks: timer.NewTimer(env.Config().SemiSyncMonitor.Interval),
config: config,
ticks: timer.NewTimer(config.SemiSyncMonitor.Interval),
// We clear the data every day. We can make it configurable in the future,
// but this seams fine for now.
clearTicks: timer.NewTimer(clearTimerDuration),
writesBlockedGauge: env.Exporter().NewGauge("SemiSyncMonitorWritesBlocked", "Number of writes blocked in the semi-sync monitor"),
appPool: dbconnpool.NewConnectionPool("SemiSyncMonitorAppPool", env.Exporter(), 20, mysqlctl.DbaIdleTimeout, 0, mysqlctl.PoolDynamicHostnameResolution),
writesBlockedGauge: exporter.NewGauge("SemiSyncMonitorWritesBlocked", "Number of writes blocked in the semi-sync monitor"),
appPool: dbconnpool.NewConnectionPool("SemiSyncMonitorAppPool", exporter, 20, mysqlctl.DbaIdleTimeout, 0, mysqlctl.PoolDynamicHostnameResolution),
waiters: make([]chan any, 0),
}
}
Expand All @@ -105,7 +106,7 @@ func (m *Monitor) Open() {
// This function could be running from within a unit test scope, in which case we use
// mock pools that are already open. This is why we test for the pool being open.
if !m.appPool.IsOpen() {
m.appPool.Open(m.env.Config().DB.AppWithDB())
m.appPool.Open(m.config.DB.AppWithDB())
}
m.clearTicks.Start(m.clearAllData)
m.ticks.Start(m.checkAndFixSemiSyncBlocked)
Expand Down Expand Up @@ -179,9 +180,21 @@ func (m *Monitor) isSemiSyncBlocked(ctx context.Context) (bool, error) {
return value != 0, err
}

// isClosed returns if the monitor is currently closed or not.
func (m *Monitor) isClosed() bool {
m.mu.Lock()
defer m.mu.Lock()
return !m.isOpen
}

// WaitUntilSemiSyncUnblocked waits until the primary is not blocked
// on semi-sync or until the context expires.
func (m *Monitor) WaitUntilSemiSyncUnblocked(ctx context.Context) error {
// SemiSyncMonitor is closed, which means semi-sync is not enabled.
// We don't have anything to wait for.
if m.isClosed() {
return nil
}
// run one iteration of checking if semi-sync is blocked or not.
m.checkAndFixSemiSyncBlocked()
if !m.stillBlocked() {
Expand Down
2 changes: 2 additions & 0 deletions go/vt/vttablet/tabletmanager/tm_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ import (
"vitess.io/vitess/go/vt/vtctl/reparentutil/policy"
"vitess.io/vitess/go/vt/vtenv"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/semisyncmonitor"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vdiff"
"vitess.io/vitess/go/vt/vttablet/tabletmanager/vreplication"
"vitess.io/vitess/go/vt/vttablet/tabletserver"
Expand Down Expand Up @@ -162,6 +163,7 @@ type TabletManager struct {
QueryServiceControl tabletserver.Controller
UpdateStream binlog.UpdateStreamControl
VREngine *vreplication.Engine
SemiSyncMonitor *semisyncmonitor.Monitor
VDiffEngine *vdiff.Engine
Env *vtenv.Environment

Expand Down
6 changes: 0 additions & 6 deletions go/vt/vttablet/tabletserver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,6 @@ type Controller interface {

// IsDiskStalled returns if the disk is stalled.
IsDiskStalled() bool

// WaitUntilSemiSyncBeingUnblocked waits until the primary is not blocked on semi-sync.
WaitUntilSemiSyncBeingUnblocked(ctx context.Context) error

// SemiSyncMonitorBlocked returns whether the semi-sync monitor has all its writes blocked.
SemiSyncMonitorBlocked() bool
}

// Ensure TabletServer satisfies Controller interface.
Expand Down
31 changes: 13 additions & 18 deletions go/vt/vttablet/tabletserver/state_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,20 +112,19 @@ type stateManager struct {
// Open must be done in forward order.
// Close must be done in reverse order.
// All Close functions must be called before Open.
hs *healthStreamer
se schemaEngine
rt replTracker
vstreamer subComponent
tracker subComponent
watcher subComponent
semiSyncMonitor subComponent
qe queryEngine
txThrottler txThrottler
te txEngine
messager subComponent
ddle onlineDDLExecutor
throttler lagThrottler
tableGC tableGarbageCollector
hs *healthStreamer
se schemaEngine
rt replTracker
vstreamer subComponent
tracker subComponent
watcher subComponent
qe queryEngine
txThrottler txThrottler
te txEngine
messager subComponent
ddle onlineDDLExecutor
throttler lagThrottler
tableGC tableGarbageCollector

// hcticks starts on initialization and runs forever.
hcticks *timer.Timer
Expand Down Expand Up @@ -470,7 +469,6 @@ func (sm *stateManager) servePrimary() error {
sm.throttler.Open()
sm.tableGC.Open()
sm.ddle.Open()
sm.semiSyncMonitor.Open()
sm.setState(topodatapb.TabletType_PRIMARY, StateServing)
return nil
}
Expand All @@ -487,7 +485,6 @@ func (sm *stateManager) unservePrimary() error {
sm.se.MakePrimary(false)
sm.hs.MakePrimary(false)
sm.rt.MakePrimary()
sm.semiSyncMonitor.Open()
sm.setState(topodatapb.TabletType_PRIMARY, StateNotServing)
return nil
}
Expand All @@ -498,7 +495,6 @@ func (sm *stateManager) serveNonPrimary(wantTabletType topodatapb.TabletType) er
cancel := sm.terminateAllQueries(nil)
defer cancel()

sm.semiSyncMonitor.Close()
sm.ddle.Close()
sm.tableGC.Close()
sm.messager.Close()
Expand All @@ -521,7 +517,6 @@ func (sm *stateManager) serveNonPrimary(wantTabletType topodatapb.TabletType) er
func (sm *stateManager) unserveNonPrimary(wantTabletType topodatapb.TabletType) error {
sm.unserveCommon()

sm.semiSyncMonitor.Close()
sm.se.MakeNonPrimary()
sm.hs.MakeNonPrimary()

Expand Down
66 changes: 30 additions & 36 deletions go/vt/vttablet/tabletserver/state_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,6 @@ func TestStateManagerServePrimary(t *testing.T) {
verifySubcomponent(t, 10, sm.throttler, testStateOpen)
verifySubcomponent(t, 11, sm.tableGC, testStateOpen)
verifySubcomponent(t, 12, sm.ddle, testStateOpen)
verifySubcomponent(t, 13, sm.semiSyncMonitor, testStateOpen)

assert.False(t, sm.se.(*testSchemaEngine).nonPrimary)
assert.True(t, sm.se.(*testSchemaEngine).ensureCalled)
Expand All @@ -105,21 +104,20 @@ func TestStateManagerServeNonPrimary(t *testing.T) {
err := sm.SetServingType(topodatapb.TabletType_REPLICA, testNow, StateServing, "")
require.NoError(t, err)

verifySubcomponent(t, 1, sm.semiSyncMonitor, testStateClosed)
verifySubcomponent(t, 2, sm.ddle, testStateClosed)
verifySubcomponent(t, 3, sm.tableGC, testStateClosed)
verifySubcomponent(t, 4, sm.messager, testStateClosed)
verifySubcomponent(t, 5, sm.tracker, testStateClosed)
verifySubcomponent(t, 1, sm.ddle, testStateClosed)
verifySubcomponent(t, 2, sm.tableGC, testStateClosed)
verifySubcomponent(t, 3, sm.messager, testStateClosed)
verifySubcomponent(t, 4, sm.tracker, testStateClosed)
assert.True(t, sm.se.(*testSchemaEngine).nonPrimary)

verifySubcomponent(t, 6, sm.se, testStateOpen)
verifySubcomponent(t, 7, sm.vstreamer, testStateOpen)
verifySubcomponent(t, 8, sm.qe, testStateOpen)
verifySubcomponent(t, 9, sm.txThrottler, testStateOpen)
verifySubcomponent(t, 10, sm.te, testStateNonPrimary)
verifySubcomponent(t, 11, sm.rt, testStateNonPrimary)
verifySubcomponent(t, 12, sm.watcher, testStateOpen)
verifySubcomponent(t, 13, sm.throttler, testStateOpen)
verifySubcomponent(t, 5, sm.se, testStateOpen)
verifySubcomponent(t, 6, sm.vstreamer, testStateOpen)
verifySubcomponent(t, 7, sm.qe, testStateOpen)
verifySubcomponent(t, 8, sm.txThrottler, testStateOpen)
verifySubcomponent(t, 9, sm.te, testStateNonPrimary)
verifySubcomponent(t, 10, sm.rt, testStateNonPrimary)
verifySubcomponent(t, 11, sm.watcher, testStateOpen)
verifySubcomponent(t, 12, sm.throttler, testStateOpen)

assert.Equal(t, topodatapb.TabletType_REPLICA, sm.target.TabletType)
assert.Equal(t, StateServing, sm.state)
Expand All @@ -145,7 +143,6 @@ func TestStateManagerUnservePrimary(t *testing.T) {
verifySubcomponent(t, 11, sm.txThrottler, testStateOpen)

verifySubcomponent(t, 12, sm.rt, testStatePrimary)
verifySubcomponent(t, 13, sm.semiSyncMonitor, testStateOpen)

assert.Equal(t, topodatapb.TabletType_PRIMARY, sm.target.TabletType)
assert.Equal(t, StateNotServing, sm.state)
Expand Down Expand Up @@ -198,14 +195,13 @@ func TestStateManagerUnserveNonPrimary(t *testing.T) {
verifySubcomponent(t, 6, sm.tracker, testStateClosed)
assert.True(t, sm.se.(*testSchemaEngine).nonPrimary)

verifySubcomponent(t, 7, sm.semiSyncMonitor, testStateClosed)
verifySubcomponent(t, 8, sm.se, testStateOpen)
verifySubcomponent(t, 9, sm.vstreamer, testStateOpen)
verifySubcomponent(t, 10, sm.qe, testStateOpen)
verifySubcomponent(t, 11, sm.txThrottler, testStateOpen)
verifySubcomponent(t, 7, sm.se, testStateOpen)
verifySubcomponent(t, 8, sm.vstreamer, testStateOpen)
verifySubcomponent(t, 9, sm.qe, testStateOpen)
verifySubcomponent(t, 10, sm.txThrottler, testStateOpen)

verifySubcomponent(t, 12, sm.rt, testStateNonPrimary)
verifySubcomponent(t, 13, sm.watcher, testStateOpen)
verifySubcomponent(t, 11, sm.rt, testStateNonPrimary)
verifySubcomponent(t, 12, sm.watcher, testStateOpen)

assert.Equal(t, topodatapb.TabletType_RDONLY, sm.target.TabletType)
assert.Equal(t, StateNotServing, sm.state)
Expand Down Expand Up @@ -331,21 +327,20 @@ func TestStateManagerSetServingTypeNoChange(t *testing.T) {
err = sm.SetServingType(topodatapb.TabletType_REPLICA, testNow, StateServing, "")
require.NoError(t, err)

verifySubcomponent(t, 1, sm.semiSyncMonitor, testStateClosed)
verifySubcomponent(t, 2, sm.ddle, testStateClosed)
verifySubcomponent(t, 3, sm.tableGC, testStateClosed)
verifySubcomponent(t, 4, sm.messager, testStateClosed)
verifySubcomponent(t, 5, sm.tracker, testStateClosed)
verifySubcomponent(t, 1, sm.ddle, testStateClosed)
verifySubcomponent(t, 2, sm.tableGC, testStateClosed)
verifySubcomponent(t, 3, sm.messager, testStateClosed)
verifySubcomponent(t, 4, sm.tracker, testStateClosed)
assert.True(t, sm.se.(*testSchemaEngine).nonPrimary)

verifySubcomponent(t, 6, sm.se, testStateOpen)
verifySubcomponent(t, 7, sm.vstreamer, testStateOpen)
verifySubcomponent(t, 8, sm.qe, testStateOpen)
verifySubcomponent(t, 9, sm.txThrottler, testStateOpen)
verifySubcomponent(t, 10, sm.te, testStateNonPrimary)
verifySubcomponent(t, 11, sm.rt, testStateNonPrimary)
verifySubcomponent(t, 12, sm.watcher, testStateOpen)
verifySubcomponent(t, 13, sm.throttler, testStateOpen)
verifySubcomponent(t, 5, sm.se, testStateOpen)
verifySubcomponent(t, 6, sm.vstreamer, testStateOpen)
verifySubcomponent(t, 7, sm.qe, testStateOpen)
verifySubcomponent(t, 8, sm.txThrottler, testStateOpen)
verifySubcomponent(t, 9, sm.te, testStateNonPrimary)
verifySubcomponent(t, 10, sm.rt, testStateNonPrimary)
verifySubcomponent(t, 11, sm.watcher, testStateOpen)
verifySubcomponent(t, 12, sm.throttler, testStateOpen)

assert.Equal(t, topodatapb.TabletType_REPLICA, sm.target.TabletType)
assert.Equal(t, StateServing, sm.state)
Expand Down Expand Up @@ -825,7 +820,6 @@ func newTestStateManager() *stateManager {
vstreamer: &testSubcomponent{},
tracker: &testSubcomponent{},
watcher: &testSubcomponent{},
semiSyncMonitor: &testSubcomponent{},
qe: &testQueryEngine{},
txThrottler: &testTxThrottler{},
te: &testTxEngine{},
Expand Down
Loading

0 comments on commit 4725dd9

Please sign in to comment.