Skip to content

Commit

Permalink
feat: change healthcheck to only reload specific keyspace shard tablets
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <manan@planetscale.com>
  • Loading branch information
GuptaManan100 committed Feb 26, 2025
1 parent b05df12 commit 278bd13
Show file tree
Hide file tree
Showing 6 changed files with 172 additions and 115 deletions.
2 changes: 1 addition & 1 deletion go/vt/discovery/fake_healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func (fhc *FakeHealthCheck) Unsubscribe(c chan *TabletHealth) {
}

// GetLoadTabletsTrigger is not implemented.
func (fhc *FakeHealthCheck) GetLoadTabletsTrigger() chan struct{} {
func (fhc *FakeHealthCheck) GetLoadTabletsTrigger() chan topo.KeyspaceShard {
return nil
}

Expand Down
23 changes: 13 additions & 10 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ type HealthCheck interface {
Unsubscribe(c chan *TabletHealth)

// GetLoadTabletsTrigger returns a channel that is used to inform when to load tablets.
GetLoadTabletsTrigger() chan struct{}
GetLoadTabletsTrigger() chan topo.KeyspaceShard
}

var _ HealthCheck = (*HealthCheckImpl)(nil)
Expand Down Expand Up @@ -297,8 +297,8 @@ type HealthCheckImpl struct {
subMu sync.Mutex
// subscribers
subscribers map[chan *TabletHealth]struct{}
// loadTablets trigger is used to immediately load a new primary tablet when the current one has been demoted
loadTabletsTrigger chan struct{}
// loadTabletsTrigger is used to immediately load information about tablets of a specific shard.
loadTabletsTrigger chan topo.KeyspaceShard
}

// NewVTGateHealthCheckFilters returns healthcheck filters for vtgate.
Expand Down Expand Up @@ -363,7 +363,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
healthy: make(map[KeyspaceShardTabletType][]*TabletHealth),
subscribers: make(map[chan *TabletHealth]struct{}),
cellAliases: make(map[string]string),
loadTabletsTrigger: make(chan struct{}, 1),
loadTabletsTrigger: make(chan topo.KeyspaceShard, 1024),
}
var topoWatchers []*TopologyWatcher
cells := strings.Split(cellsToWatch, ",")
Expand Down Expand Up @@ -535,18 +535,21 @@ func (hc *HealthCheckImpl) updateHealth(th *TabletHealth, prevTarget *query.Targ
}

// If the previous tablet type was primary, we need to check if the next new primary has already been assigned.
// If no new primary has been assigned, we will trigger a `loadTablets` call to immediately redirect traffic to the new primary.
// If no new primary has been assigned, we will trigger loading of tablets for this keyspace shard to immediately redirect traffic to the new primary.
//
// This is to avoid a situation where a newly primary tablet for a shard has just been started and the tableRefreshInterval has not yet passed,
// causing an interruption where no primary is assigned to the shard.
if prevTarget.TabletType == topodata.TabletType_PRIMARY {
if primaries := hc.healthData[oldTargetKey]; len(primaries) == 0 {
log.Infof("We will have no health data for the next new primary tablet after demoting the tablet: %v, so start loading tablets now", topotools.TabletIdent(th.Tablet))
// We want to trigger a loadTablets call, but if the channel is not empty
// then a trigger is already scheduled, we don't need to trigger another one.
// This also prevents the code from deadlocking as described in https://github.com/vitessio/vitess/issues/16994.
// We want to trigger a call to load tablets for this keyspace-shard,
// but we want this to be non-blocking to prevent the code from deadlocking as described in https://github.com/vitessio/vitess/issues/16994.
// If the buffer is exhausted, then we'll just receive the update when all the tablets are loaded on the ticker.
select {
case hc.loadTabletsTrigger <- struct{}{}:
case hc.loadTabletsTrigger <- topo.KeyspaceShard{
Keyspace: th.Target.Keyspace,
Shard: th.Target.Shard,
}:
default:
}
}
Expand Down Expand Up @@ -662,7 +665,7 @@ func (hc *HealthCheckImpl) broadcast(th *TabletHealth) {
}

// GetLoadTabletsTrigger returns a channel that is used to inform when to load tablets.
func (hc *HealthCheckImpl) GetLoadTabletsTrigger() chan struct{} {
func (hc *HealthCheckImpl) GetLoadTabletsTrigger() chan topo.KeyspaceShard {
return hc.loadTabletsTrigger
}

Expand Down
65 changes: 50 additions & 15 deletions go/vt/discovery/topology_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,23 +110,45 @@ func (tw *TopologyWatcher) getTablets() ([]*topo.TabletInfo, error) {
return tw.topoServer.GetTabletsByCell(tw.ctx, tw.cell, nil)
}

func (tw *TopologyWatcher) getTabletsByShard(keyspace string, shard string) ([]*topo.TabletInfo, error) {
return tw.topoServer.GetTabletsByShardCell(tw.ctx, keyspace, shard, []string{tw.cell})
}

// Start starts the topology watcher.
func (tw *TopologyWatcher) Start() {
tw.wg.Add(1)
tw.wg.Add(2)
// Goroutine to refresh the tablets list periodically.
go func(t *TopologyWatcher) {
defer t.wg.Done()
ticker := time.NewTicker(t.refreshInterval)
defer ticker.Stop()
for {
t.loadTablets()
// Since we are going to load all the tablets,
// we can clear out the entire list for reloading
// specific keyspace shards.
for range tw.healthcheck.GetLoadTabletsTrigger() {
}
t.loadTablets("", "")
select {
case <-t.ctx.Done():
return
case <-tw.healthcheck.GetLoadTabletsTrigger():
case <-ticker.C:
}
}
}(tw)
// Go routine to refresh tablets for a specific
// keyspace shard.
go func(t *TopologyWatcher) {
defer t.wg.Done()
for {
select {
case <-t.ctx.Done():
return
case kss := <-tw.healthcheck.GetLoadTabletsTrigger():
t.loadTablets(kss.Keyspace, kss.Shard)
}
}
}(tw)
}

// Stop stops the watcher. It does not clean up the tablets added to HealthCheck.
Expand All @@ -136,23 +158,36 @@ func (tw *TopologyWatcher) Stop() {
tw.wg.Wait()
}

func (tw *TopologyWatcher) loadTablets() {
func (tw *TopologyWatcher) loadTablets(keyspace string, shard string) {
newTablets := make(map[string]*tabletInfo)
var partialResult bool

// First get the list of all tablets.
tabletInfos, err := tw.getTablets()
topologyWatcherOperations.Add(topologyWatcherOpListTablets, 1)
if err != nil {
topologyWatcherErrors.Add(topologyWatcherOpListTablets, 1)
// If we get a partial result error, we just log it and process the tablets that we did manage to fetch.
if topo.IsErrType(err, topo.PartialResult) {
log.Errorf("received partial result from getTablets for cell %v: %v", tw.cell, err)
partialResult = true
} else { // For all other errors, just return.
log.Errorf("error getting tablets for cell: %v: %v", tw.cell, err)
var tabletInfos []*topo.TabletInfo
var err error
if keyspace != "" && shard != "" {
tabletInfos, err = tw.getTabletsByShard(keyspace, shard)
if err != nil {
log.Errorf("error getting tablets for keyspace-shard: %v:%v: %v", keyspace, shard, err)
return
}
// Since we are only reading tablets for a keyspace shard,
// this is by default a partial result.
partialResult = true
} else {
// First get the list of all tablets.
tabletInfos, err = tw.getTablets()
topologyWatcherOperations.Add(topologyWatcherOpListTablets, 1)
if err != nil {
topologyWatcherErrors.Add(topologyWatcherOpListTablets, 1)
// If we get a partial result error, we just log it and process the tablets that we did manage to fetch.
if topo.IsErrType(err, topo.PartialResult) {
log.Errorf("received partial result from getTablets for cell %v: %v", tw.cell, err)
partialResult = true
} else { // For all other errors, just return.
log.Errorf("error getting tablets for cell: %v: %v", tw.cell, err)
return
}
}
}

// Accumulate a list of all known alias strings to use later
Expand Down
38 changes: 19 additions & 19 deletions go/vt/discovery/topology_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
}
require.NoError(t, ts.CreateTablet(context.Background(), tablet), "CreateTablet failed for %v", tablet.Alias)

tw.loadTablets()
tw.loadTablets("", "")
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "AddTablet": 1})
checkChecksum(t, tw, 3238442862)

Expand All @@ -172,7 +172,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
Shard: "shard",
}
require.NoError(t, ts.CreateTablet(context.Background(), tablet2), "CreateTablet failed for %v", tablet2.Alias)
tw.loadTablets()
tw.loadTablets("", "")

// Confirm second tablet triggers ListTablets + AddTablet calls.
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "AddTablet": 1})
Expand All @@ -192,7 +192,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
Shard: "shard",
}
require.NoError(t, ts.CreateTablet(context.Background(), tablet3), "CreateTablet failed for %v", tablet3.Alias)
tw.loadTablets()
tw.loadTablets("", "")

// Confirm filtered tablet did not trigger an AddTablet call.
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "AddTablet": 0})
Expand Down Expand Up @@ -220,7 +220,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
})
require.Nil(t, err, "UpdateTabletFields failed")

tw.loadTablets()
tw.loadTablets("", "")
allTablets = fhc.GetAllTablets()
key = TabletToMapKey(tablet)

Expand Down Expand Up @@ -259,7 +259,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
return nil
})
require.Nil(t, err, "UpdateTabletFields failed")
tw.loadTablets()
tw.loadTablets("", "")
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "ReplaceTablet": 2})
allTablets = fhc.GetAllTablets()
key2 := TabletToMapKey(tablet2)
Expand All @@ -281,7 +281,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
})
require.Nil(t, err, "UpdateTabletFields failed")

tw.loadTablets()
tw.loadTablets("", "")
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "ReplaceTablet": 2})
}

Expand All @@ -290,7 +290,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {

_, err = topo.FixShardReplication(context.Background(), ts, logger, "aa", "keyspace", "shard")
require.Nil(t, err, "FixShardReplication failed")
tw.loadTablets()
tw.loadTablets("", "")
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "RemoveTablet": 1})
checkChecksum(t, tw, 852159264)

Expand All @@ -309,7 +309,7 @@ func checkWatcher(t *testing.T, refreshKnownTablets bool) {
require.NoError(t, ts.DeleteTablet(context.Background(), tablet3.Alias))
_, err = topo.FixShardReplication(context.Background(), ts, logger, "aa", "keyspace", "shard")
require.Nil(t, err, "FixShardReplication failed")
tw.loadTablets()
tw.loadTablets("", "")
checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "RemoveTablet": 1})
checkChecksum(t, tw, 0)

Expand Down Expand Up @@ -444,7 +444,7 @@ func TestFilterByKeyspace(t *testing.T) {
// Make this fatal because there is no point continuing if CreateTablet fails
require.NoError(t, ts.CreateTablet(context.Background(), tablet))

tw.loadTablets()
tw.loadTablets("", "")
key := TabletToMapKey(tablet)
allTablets := hc.GetAllTablets()

Expand All @@ -471,7 +471,7 @@ func TestFilterByKeyspace(t *testing.T) {
assert.Equal(t, test.expected, f.IsIncluded(tabletReplacement))
require.NoError(t, ts.CreateTablet(context.Background(), tabletReplacement))

tw.loadTablets()
tw.loadTablets("", "")
key = TabletToMapKey(tabletReplacement)
allTablets = hc.GetAllTablets()

Expand Down Expand Up @@ -522,7 +522,7 @@ func TestFilterByKeyspaceSkipsIgnoredTablets(t *testing.T) {
}
require.NoError(t, ts.CreateTablet(context.Background(), tablet))

tw.loadTablets()
tw.loadTablets("", "")
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "AddTablet": 1})
checkChecksum(t, tw, 3238442862)

Expand All @@ -547,7 +547,7 @@ func TestFilterByKeyspaceSkipsIgnoredTablets(t *testing.T) {
}
require.NoError(t, ts.CreateTablet(context.Background(), tablet2))

tw.loadTablets()
tw.loadTablets("", "")
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0})
checkChecksum(t, tw, 2762153755)

Expand All @@ -559,7 +559,7 @@ func TestFilterByKeyspaceSkipsIgnoredTablets(t *testing.T) {

// Load the tablets again to show that when refreshKnownTablets is disabled,
// only the list is read from the topo and the checksum doesn't change
tw.loadTablets()
tw.loadTablets("", "")
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0})
checkChecksum(t, tw, 2762153755)

Expand All @@ -571,7 +571,7 @@ func TestFilterByKeyspaceSkipsIgnoredTablets(t *testing.T) {
})
require.NoError(t, err)

tw.loadTablets()
tw.loadTablets("", "")
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0})
checkChecksum(t, tw, 2762153755)

Expand All @@ -587,15 +587,15 @@ func TestFilterByKeyspaceSkipsIgnoredTablets(t *testing.T) {
// Remove the tracked tablet from the topo and check that it is detected as being gone.
require.NoError(t, ts.DeleteTablet(context.Background(), tablet.Alias))

tw.loadTablets()
tw.loadTablets("", "")
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0, "RemoveTablet": 1})
checkChecksum(t, tw, 789108290)
assert.Empty(t, fhc.GetAllTablets())

// Remove ignored tablet and check that we didn't try to remove it from the health check
require.NoError(t, ts.DeleteTablet(context.Background(), tablet2.Alias))

tw.loadTablets()
tw.loadTablets("", "")
checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "GetTablet": 0})
checkChecksum(t, tw, 0)
assert.Empty(t, fhc.GetAllTablets())
Expand Down Expand Up @@ -663,7 +663,7 @@ func TestGetTabletErrorDoesNotRemoveFromHealthcheck(t *testing.T) {
}
require.NoError(t, ts.CreateTablet(ctx, tablet1), "CreateTablet failed for %v", tablet1.Alias)

tw.loadTablets()
tw.loadTablets("", "")
counts = checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "AddTablet": 1})
checkChecksum(t, tw, 3238442862)

Expand Down Expand Up @@ -697,7 +697,7 @@ func TestGetTabletErrorDoesNotRemoveFromHealthcheck(t *testing.T) {
require.ErrorContains(t, err, "partial result")

// Now force the error during loadTablets.
tw.loadTablets()
tw.loadTablets("", "")
checkOpCounts(t, counts, map[string]int64{"ListTablets": 1, "AddTablet": 1})
checkChecksum(t, tw, 2762153755)

Expand Down Expand Up @@ -739,7 +739,7 @@ func TestDeadlockBetweenTopologyWatcherAndHealthCheck(t *testing.T) {
}
err := ts.CreateTablet(ctx, tablet1)
// Run the first loadTablets call to ensure the tablet is present in the topology watcher.
hc.topoWatchers[0].loadTablets()
hc.topoWatchers[0].loadTablets("", "")
require.NoError(t, err)

// We want to run updateHealth with arguments that always
Expand Down
Loading

0 comments on commit 278bd13

Please sign in to comment.