diff --git a/go/test/endtoend/tabletgateway/vtgate_test.go b/go/test/endtoend/tabletgateway/vtgate_test.go index 1f4f8758e16..e0976b1b2e3 100644 --- a/go/test/endtoend/tabletgateway/vtgate_test.go +++ b/go/test/endtoend/tabletgateway/vtgate_test.go @@ -265,10 +265,10 @@ func TestReplicaTransactions(t *testing.T) { _ = replicaTablet.VttabletProcess.TearDown() // Healthcheck interval on tablet is set to 1s, so sleep for 2s time.Sleep(2 * time.Second) - utils.AssertContainsError(t, readConn, fetchAllCustomers, "is either down or nonexistent") + utils.AssertContainsError(t, readConn, fetchAllCustomers, "connect: connection refused") // bring up the tablet again - // trying to use the same session/transaction should fail as the vtgate has + // trying to use the same session/transaction should fail as the vttablet has // been restarted and the session lost replicaTablet.VttabletProcess.ServingStatus = "SERVING" err = replicaTablet.VttabletProcess.Setup() diff --git a/go/vt/discovery/healthcheck.go b/go/vt/discovery/healthcheck.go index 70799b0f6bc..6cf765f96af 100644 --- a/go/vt/discovery/healthcheck.go +++ b/go/vt/discovery/healthcheck.go @@ -840,7 +840,7 @@ func (hc *HealthCheckImpl) TabletConnection(ctx context.Context, alias *topodata hc.mu.Lock() thc := hc.healthByAlias[tabletAliasString(topoproto.TabletAliasString(alias))] hc.mu.Unlock() - if thc == nil || thc.Conn == nil { + if thc == nil { // TODO: test that throws this error return nil, vterrors.Errorf(vtrpc.Code_NOT_FOUND, "tablet: %v is either down or nonexistent", alias) } diff --git a/go/vt/discovery/healthcheck_test.go b/go/vt/discovery/healthcheck_test.go index c87ba699234..f3c2045181b 100644 --- a/go/vt/discovery/healthcheck_test.go +++ b/go/vt/discovery/healthcheck_test.go @@ -63,6 +63,78 @@ func init() { refreshInterval = time.Minute } +func TestHealthCheckRace(t *testing.T) { + ctx := utils.LeakCheckContext(t) + + // reset error counters + hcErrorCounters.ResetAll() + ts := memorytopo.NewServer(ctx, "cell") + defer ts.Close() + hc := createTestHc(ctx, ts) + // close healthcheck + defer hc.Close() + tablet := createTestTablet(0, "cell", "a") + tablet.Type = topodatapb.TabletType_REPLICA + + target := &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA} + shr := &querypb.StreamHealthResponse{ + TabletAlias: tablet.Alias, + Target: target, + Serving: true, + + PrimaryTermStartTimestamp: 0, + RealtimeStats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.5}, + } + + input := make(chan *querypb.StreamHealthResponse) + fc := createFakeConn(tablet, input) + fc.errCh = make(chan error) + + go func() { + ticker := time.NewTicker(10 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + input <- shr + + } + } + }() + + hc.AddTablet(tablet) + + // Wait for the tablet to become healthy + time.Sleep(30 * time.Millisecond) + + // This goroutine simulates accessing the tablet connection + go func() { + ticker := time.NewTicker(5 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + conn, err := hc.TabletConnection(context.Background(), tablet.Alias, target) + + assert.NoError(t, err) + assert.NotNil(t, conn) + } + } + }() + + // Inject connection failures to simulate `closeConnection` being called + for i := 0; i < 10; i++ { + fc.errCh <- fmt.Errorf("some stream error") + time.Sleep(30 * time.Millisecond) + } +} + func TestHealthCheck(t *testing.T) { ctx := utils.LeakCheckContext(t) // reset error counters diff --git a/go/vt/discovery/tablet_health_check.go b/go/vt/discovery/tablet_health_check.go index 64450f4c8c6..4ed9608f319 100644 --- a/go/vt/discovery/tablet_health_check.go +++ b/go/vt/discovery/tablet_health_check.go @@ -356,6 +356,9 @@ func (thc *tabletHealthCheck) checkConn(hc *HealthCheckImpl) { } func (thc *tabletHealthCheck) closeConnection(ctx context.Context, err error) { + thc.connMu.Lock() + defer thc.connMu.Unlock() + log.Warningf("tablet %v healthcheck stream error: %v", thc.Tablet, err) thc.setServingState(false, err.Error()) thc.LastError = err @@ -366,6 +369,9 @@ func (thc *tabletHealthCheck) closeConnection(ctx context.Context, err error) { // finalizeConn closes the health checking connection. // To be called only on exit from checkConn(). func (thc *tabletHealthCheck) finalizeConn() { + thc.connMu.Lock() + defer thc.connMu.Unlock() + thc.setServingState(false, "finalizeConn closing connection") // Note: checkConn() exits only when thc.ctx.Done() is closed. Thus it's // safe to simply get Err() value here and assign to LastError.