-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Protect the .Conn
property of tabletHealthCheck
from being read/modified concurrently.
#16362
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -840,7 +840,7 @@ func (hc *HealthCheckImpl) TabletConnection(ctx context.Context, alias *topodata | |
hc.mu.Lock() | ||
thc := hc.healthByAlias[tabletAliasString(topoproto.TabletAliasString(alias))] | ||
hc.mu.Unlock() | ||
if thc == nil || thc.Conn == nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I removed this because I don't think this particular check makes sense (and it causes the race detector to be unhappy).
Calling Removing this line seems to have impact on some integration tests and I'll try if I can figure out how to update them accordingly, but as-is this check simply leads to flaky behaviour and can't be trusted. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It seems correct to remove this check for the reasons you stated. We should wait and see what your test failures tell you. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I went back and looked at the blame for this. Perhaps it is actually protecting us from some other race condition. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We may end up recreating a connection to a tablet that we are trying to drop. |
||
if thc == nil { | ||
// TODO: test that throws this error | ||
return nil, vterrors.Errorf(vtrpc.Code_NOT_FOUND, "tablet: %v is either down or nonexistent", alias) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -63,6 +63,78 @@ func init() { | |
refreshInterval = time.Minute | ||
} | ||
|
||
func TestHealthCheckRace(t *testing.T) { | ||
ctx := utils.LeakCheckContext(t) | ||
|
||
// reset error counters | ||
hcErrorCounters.ResetAll() | ||
ts := memorytopo.NewServer(ctx, "cell") | ||
defer ts.Close() | ||
hc := createTestHc(ctx, ts) | ||
// close healthcheck | ||
defer hc.Close() | ||
tablet := createTestTablet(0, "cell", "a") | ||
tablet.Type = topodatapb.TabletType_REPLICA | ||
|
||
target := &querypb.Target{Keyspace: "k", Shard: "s", TabletType: topodatapb.TabletType_REPLICA} | ||
shr := &querypb.StreamHealthResponse{ | ||
TabletAlias: tablet.Alias, | ||
Target: target, | ||
Serving: true, | ||
|
||
PrimaryTermStartTimestamp: 0, | ||
RealtimeStats: &querypb.RealtimeStats{ReplicationLagSeconds: 1, CpuUsage: 0.5}, | ||
} | ||
|
||
input := make(chan *querypb.StreamHealthResponse) | ||
fc := createFakeConn(tablet, input) | ||
fc.errCh = make(chan error) | ||
|
||
go func() { | ||
ticker := time.NewTicker(10 * time.Millisecond) | ||
defer ticker.Stop() | ||
|
||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
case <-ticker.C: | ||
input <- shr | ||
|
||
} | ||
} | ||
}() | ||
|
||
hc.AddTablet(tablet) | ||
|
||
// Wait for the tablet to become healthy | ||
time.Sleep(30 * time.Millisecond) | ||
Comment on lines
+110
to
+111
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To avoid flakiness, we should probably check in |
||
|
||
// This goroutine simulates accessing the tablet connection | ||
go func() { | ||
ticker := time.NewTicker(5 * time.Millisecond) | ||
defer ticker.Stop() | ||
|
||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
case <-ticker.C: | ||
conn, err := hc.TabletConnection(context.Background(), tablet.Alias, target) | ||
|
||
assert.NoError(t, err) | ||
assert.NotNil(t, conn) | ||
} | ||
} | ||
}() | ||
|
||
// Inject connection failures to simulate `closeConnection` being called | ||
for i := 0; i < 10; i++ { | ||
fc.errCh <- fmt.Errorf("some stream error") | ||
time.Sleep(30 * time.Millisecond) | ||
} | ||
} | ||
|
||
func TestHealthCheck(t *testing.T) { | ||
ctx := utils.LeakCheckContext(t) | ||
// reset error counters | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll need a replacement test that produces the original error.