Skip to content

Commit

Permalink
Use a new flag
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>
  • Loading branch information
timvaillancourt committed Jan 18, 2024
1 parent fe2bdd8 commit d55c5c2
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 6 deletions.
4 changes: 3 additions & 1 deletion go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ var (
refreshKnownTablets = flag.Bool("tablet_refresh_known_tablets", true, "tablet refresh reloads the tablet address/port map from topo in case it changes")
// topoReadConcurrency tells us how many topo reads are allowed in parallel
topoReadConcurrency = flag.Int("topo_read_concurrency", 32, "concurrent topo reads")
// tabletDiscoveryConcurrency tells us how many tablets can be discovered in parallel
tabletDiscoveryConcurrency = flag.Int("tablet_discovery_concurrency", 1024, "concurrent tablet discoveries")
)

// See the documentation for NewHealthCheck below for an explanation of these parameters.
Expand Down Expand Up @@ -324,7 +326,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
} else if len(KeyspacesToWatch) > 0 {
filter = NewFilterByKeyspace(KeyspacesToWatch)
}
topoWatchers = append(topoWatchers, NewCellTabletsWatcher(ctx, topoServer, hc, filter, c, *refreshInterval, *refreshKnownTablets, *topoReadConcurrency))
topoWatchers = append(topoWatchers, NewCellTabletsWatcher(ctx, topoServer, hc, filter, c, *refreshInterval, *refreshKnownTablets, *topoReadConcurrency, *tabletDiscoveryConcurrency))
}

hc.topoWatchers = topoWatchers
Expand Down
12 changes: 7 additions & 5 deletions go/vt/discovery/topology_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ type TopologyWatcher struct {
refreshKnownTablets bool
getTablets func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error)
sem chan int
tabletDiscoverySem chan int
ctx context.Context
cancelFunc context.CancelFunc
// wg keeps track of all launched Go routines.
Expand All @@ -94,7 +95,7 @@ type TopologyWatcher struct {

// NewTopologyWatcher returns a TopologyWatcher that monitors all
// the tablets in a cell, and starts refreshing.
func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, tr TabletRecorder, filter TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int, getTablets func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error)) *TopologyWatcher {
func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, tr TabletRecorder, filter TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency, tabletDiscoveryConcurrency int, getTablets func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error)) *TopologyWatcher {
tw := &TopologyWatcher{
topoServer: topoServer,
tabletRecorder: tr,
Expand All @@ -104,6 +105,7 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, tr TabletR
refreshKnownTablets: refreshKnownTablets,
getTablets: getTablets,
sem: make(chan int, topoReadConcurrency),
tabletDiscoverySem: make(chan int, tabletDiscoveryConcurrency),
tablets: make(map[string]*tabletInfo),
}
tw.firstLoadChan = make(chan struct{})
Expand All @@ -116,8 +118,8 @@ func NewTopologyWatcher(ctx context.Context, topoServer *topo.Server, tr TabletR

// NewCellTabletsWatcher returns a TopologyWatcher that monitors all
// the tablets in a cell, and starts refreshing.
func NewCellTabletsWatcher(ctx context.Context, topoServer *topo.Server, tr TabletRecorder, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency int) *TopologyWatcher {
return NewTopologyWatcher(ctx, topoServer, tr, f, cell, refreshInterval, refreshKnownTablets, topoReadConcurrency, func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error) {
func NewCellTabletsWatcher(ctx context.Context, topoServer *topo.Server, tr TabletRecorder, f TabletFilter, cell string, refreshInterval time.Duration, refreshKnownTablets bool, topoReadConcurrency, tabletDiscoveryConcurrency int) *TopologyWatcher {
return NewTopologyWatcher(ctx, topoServer, tr, f, cell, refreshInterval, refreshKnownTablets, topoReadConcurrency, tabletDiscoveryConcurrency, func(tw *TopologyWatcher) ([]*topodata.TabletAlias, error) {
return tw.topoServer.GetTabletAliasesByCell(ctx, tw.cell)
})
}
Expand Down Expand Up @@ -230,11 +232,11 @@ func (tw *TopologyWatcher) loadTablets() {
topologyWatcherOperations.Add(topologyWatcherOpReplaceTablet, 1)
}
} else {
tw.sem <- 1 // Wait for active queue to drain.
tw.tabletDiscoverySem <- 1 // Wait for active queue to drain.
// This is a new tablet record, let's add it to the healthcheck
tw.tabletRecorder.AddTablet(newVal.tablet)
topologyWatcherOperations.Add(topologyWatcherOpAddTablet, 1)
<-tw.sem // Done; enable next request to run
<-tw.tabletDiscoverySem // Done; enable next request to run
}
}

Expand Down

0 comments on commit d55c5c2

Please sign in to comment.