Skip to content

Commit

Permalink
feat: address review comments
Browse files Browse the repository at this point in the history
Signed-off-by: Manan Gupta <manan@planetscale.com>
  • Loading branch information
GuptaManan100 committed Feb 21, 2025
1 parent f5fdb7c commit 4ccfe5d
Show file tree
Hide file tree
Showing 2 changed files with 5 additions and 58 deletions.
8 changes: 5 additions & 3 deletions go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ var (
hcPrimaryPromotedCounters = stats.NewCountersWithMultiLabels("HealthcheckPrimaryPromoted", "Primary promoted in keyspace/shard name because of health check errors", []string{"Keyspace", "ShardName"})
healthcheckOnce sync.Once

hcMessagesDropped = stats.NewCounter("HealthCheckMessagesDropped", "Number of messages dropped by the healthcheck because the subscriber buffer was full")
// counter that tells us how many healthcheck messages have been dropped
hcChannelFullCounter = stats.NewCounter("HealthCheckChannelFullErrors", "Number of times the healthcheck broadcast channel was full")

// TabletURLTemplateString is a flag to generate URLs for the tablets that vtgate discovers.
TabletURLTemplateString = "http://{{.GetTabletHostPort}}"
Expand Down Expand Up @@ -653,8 +654,9 @@ func (hc *HealthCheckImpl) broadcast(th *TabletHealth) {
select {
case c <- th:
default:
log.Errorf("HealthCheckImpl.broadcast: subscriber buffer full, dropping message")
hcMessagesDropped.Add(1)
// If the channel is full, we drop the message.
hcChannelFullCounter.Add(1)
log.Warningf("HealthCheck broadcast channel is full, dropping message for %s", topotools.TabletIdent(th.Tablet))
}
}
}
Expand Down
55 changes: 0 additions & 55 deletions go/vt/discovery/healthcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1515,61 +1515,6 @@ func TestConcurrentUpdates(t *testing.T) {
}, 5*time.Second, 100*time.Millisecond, "expected all updates to be processed")
}

// BenchmarkAccess_FastConsumer benchmarks the access time of the healthcheck for a fast consumer.
func BenchmarkAccess_FastConsumer(b *testing.B) {
ctx := context.Background()
// reset error counters
hcErrorCounters.ResetAll()
ts := memorytopo.NewServer(ctx, "cell")
defer ts.Close()
hc := createTestHc(ctx, ts)
// close healthcheck
defer hc.Close()

for i := 0; i < b.N; i++ {
// Subscribe to the healthcheck with a fast consumer.
ch := hc.Subscribe()
go func() {
for range ch {
}
}()

for id := 0; id < 1000; id++ {
hc.broadcast(&TabletHealth{})
}
hc.Unsubscribe(ch)
waitForEmptyMessageQueue(ch)
}
}

// BenchmarkAccess_SlowConsumer benchmarks the access time of the healthcheck for a slow consumer.
func BenchmarkAccess_SlowConsumer(b *testing.B) {
ctx := context.Background()
// reset error counters
hcErrorCounters.ResetAll()
ts := memorytopo.NewServer(ctx, "cell")
defer ts.Close()
hc := createTestHc(ctx, ts)
// close healthcheck
defer hc.Close()

for i := 0; i < b.N; i++ {
// Subscribe to the healthcheck with a slow consumer.
ch := hc.Subscribe()
go func() {
for range ch {
time.Sleep(50 * time.Millisecond)
}
}()

for id := 0; id < 100; id++ {
hc.broadcast(&TabletHealth{})
}
hc.Unsubscribe(ch)
waitForEmptyMessageQueue(ch)
}
}

func waitForEmptyMessageQueue(queue chan *TabletHealth) {
for {
if len(queue) == 0 {
Expand Down

0 comments on commit 4ccfe5d

Please sign in to comment.