diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index f4d75d2bb07..5734749b167 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -69,7 +69,8 @@ var ( hcPrimaryPromotedCounters = stats.NewCountersWithMultiLabels("HealthcheckPrimaryPromoted", "Primary promoted in keyspace/shard name because of health check errors", []string{"Keyspace", "ShardName"}) healthcheckOnce sync.Once - hcMessagesDropped = stats.NewCounter("HealthCheckMessagesDropped", "Number of messages dropped by the healthcheck because the subscriber buffer was full") + // counter that tells us how many healthcheck messages have been dropped + hcChannelFullCounter = stats.NewCounter("HealthCheckChannelFullErrors", "Number of times the healthcheck broadcast channel was full") // TabletURLTemplateString is a flag to generate URLs for the tablets that vtgate discovers. TabletURLTemplateString = "http://{{.GetTabletHostPort}}" @@ -653,8 +654,9 @@ func (hc *HealthCheckImpl) broadcast(th *TabletHealth) { select { case c <- th: default: - log.Errorf("HealthCheckImpl.broadcast: subscriber buffer full, dropping message") - hcMessagesDropped.Add(1) + // If the channel is full, we drop the message. + hcChannelFullCounter.Add(1) + log.Warningf("HealthCheck broadcast channel is full, dropping message for %s", topotools.TabletIdent(th.Tablet)) } } } diff --git a/go/vt/discovery/healthcheck_test.go b/go/vt/discovery/healthcheck_test.go index 83df1718884..b78450d4843 100644 --- a/go/vt/discovery/healthcheck_test.go +++ b/go/vt/discovery/healthcheck_test.go @@ -1515,61 +1515,6 @@ func TestConcurrentUpdates(t *testing.T) { }, 5*time.Second, 100*time.Millisecond, "expected all updates to be processed") } -// BenchmarkAccess_FastConsumer benchmarks the access time of the healthcheck for a fast consumer. -func BenchmarkAccess_FastConsumer(b *testing.B) { - ctx := context.Background() - // reset error counters - hcErrorCounters.ResetAll() - ts := memorytopo.NewServer(ctx, "cell") - defer ts.Close() - hc := createTestHc(ctx, ts) - // close healthcheck - defer hc.Close() - - for i := 0; i < b.N; i++ { - // Subscribe to the healthcheck with a fast consumer. - ch := hc.Subscribe() - go func() { - for range ch { - } - }() - - for id := 0; id < 1000; id++ { - hc.broadcast(&TabletHealth{}) - } - hc.Unsubscribe(ch) - waitForEmptyMessageQueue(ch) - } -} - -// BenchmarkAccess_SlowConsumer benchmarks the access time of the healthcheck for a slow consumer. -func BenchmarkAccess_SlowConsumer(b *testing.B) { - ctx := context.Background() - // reset error counters - hcErrorCounters.ResetAll() - ts := memorytopo.NewServer(ctx, "cell") - defer ts.Close() - hc := createTestHc(ctx, ts) - // close healthcheck - defer hc.Close() - - for i := 0; i < b.N; i++ { - // Subscribe to the healthcheck with a slow consumer. - ch := hc.Subscribe() - go func() { - for range ch { - time.Sleep(50 * time.Millisecond) - } - }() - - for id := 0; id < 100; id++ { - hc.broadcast(&TabletHealth{}) - } - hc.Unsubscribe(ch) - waitForEmptyMessageQueue(ch) - } -} - func waitForEmptyMessageQueue(queue chan *TabletHealth) { for { if len(queue) == 0 {