Skip to content

Commit

Permalink
Draft fix etcd watch during compaction
Browse files Browse the repository at this point in the history
  • Loading branch information
serathius committed Mar 12, 2024
1 parent ddf5471 commit b9efa83
Show file tree
Hide file tree
Showing 7 changed files with 75 additions and 12 deletions.
1 change: 1 addition & 0 deletions etcdctl/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ require (
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/mattn/go-runewidth v0.0.15 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
go.etcd.io/gofail v0.1.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/net v0.21.0 // indirect
golang.org/x/sys v0.17.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions etcdctl/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcU
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.etcd.io/gofail v0.1.0 h1:XItAMIhOojXFQMgrxjnd2EIIHun/d5qL0Pf7FzVTkFg=
go.etcd.io/gofail v0.1.0/go.mod h1:VZBCXYGZhHAinaBiiqYvuDynvahNsAyLFwB3kEHKz1M=
go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk=
go.uber.org/goleak v1.2.0/go.mod h1:XJYK+MuIchqpmGmUSAzotztawfKvYLUIgg7guXrwVUo=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
Expand Down
1 change: 1 addition & 0 deletions server/etcdserver/api/v3rpc/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,7 @@ func (sws *serverWatchStream) sendLoop() {
sws.mu.RUnlock()

var serr error
// gofail: var watchResponseSend struct{}
if !fragmented && !ok {
serr = sws.gRPCStream.Send(wr)
} else {
Expand Down
20 changes: 14 additions & 6 deletions server/storage/mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,13 @@ import (

// non-const so modifiable by tests
var (
// chanBufLen is the length of the buffered chan
// WatchStreamResponseBufferLen is the length of the buffered chan
// for sending out watched events.
// See https://github.com/etcd-io/etcd/issues/11906 for more detail.
chanBufLen = 128
WatchStreamResponseBufferLen = 128

// SyncWatchersPeriod is the period of syncing watchers.
SyncWatchersPeriod = 100 * time.Millisecond

// maxWatchersPerSync is the number of watchers to sync in a single batch
maxWatchersPerSync = 512
Expand Down Expand Up @@ -109,7 +112,7 @@ func (s *watchableStore) NewWatchStream() WatchStream {
watchStreamGauge.Inc()
return &watchStream{
watchable: s,
ch: make(chan WatchResponse, chanBufLen),
ch: make(chan WatchResponse, WatchStreamResponseBufferLen),
cancels: make(map[WatchID]cancelFunc),
watchers: make(map[WatchID]*watcher),
}
Expand Down Expand Up @@ -213,8 +216,7 @@ func (s *watchableStore) Restore(b backend.Backend) error {
func (s *watchableStore) syncWatchersLoop() {
defer s.wg.Done()

waitDuration := 100 * time.Millisecond
delayTicker := time.NewTicker(waitDuration)
delayTicker := time.NewTicker(SyncWatchersPeriod)
defer delayTicker.Stop()

for {
Expand All @@ -229,7 +231,7 @@ func (s *watchableStore) syncWatchersLoop() {
}
syncDuration := time.Since(st)

delayTicker.Reset(waitDuration)
delayTicker.Reset(SyncWatchersPeriod)
// more work pending?
if unsyncedWatchers != 0 && lastUnsyncedWatchers > unsyncedWatchers {
// be fair to other store operations by yielding time taken
Expand Down Expand Up @@ -370,6 +372,12 @@ func (s *watchableStore) syncWatchers() int {
victims := make(watcherBatch)
wb := newWatcherBatch(wg, evs)
for w := range wg.watchers {
if w.minRev < compactionRev {
// skip the watcher that failed to send compacted watch response due to w.ch is full
// next retry of syncWatchers would try to resend the compacted watch response to w.ch
// TODO prioritize sending compacted watch response over other watch responses with events.
continue
}
w.minRev = curRev + 1

eb, ok := wb[w]
Expand Down
8 changes: 4 additions & 4 deletions server/storage/mvcc/watchable_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -523,18 +523,18 @@ func TestNewMapwatcherToEventMap(t *testing.T) {
// TestWatchVictims tests that watchable store delivers watch events
// when the watch channel is temporarily clogged with too many events.
func TestWatchVictims(t *testing.T) {
oldChanBufLen, oldMaxWatchersPerSync := chanBufLen, maxWatchersPerSync
oldChanBufLen, oldMaxWatchersPerSync := WatchStreamResponseBufferLen, maxWatchersPerSync

b, _ := betesting.NewDefaultTmpBackend(t)
s := newWatchableStore(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})

defer func() {
cleanup(s, b)
chanBufLen, maxWatchersPerSync = oldChanBufLen, oldMaxWatchersPerSync
WatchStreamResponseBufferLen, maxWatchersPerSync = oldChanBufLen, oldMaxWatchersPerSync
}()

chanBufLen, maxWatchersPerSync = 1, 2
numPuts := chanBufLen * 64
WatchStreamResponseBufferLen, maxWatchersPerSync = 1, 2
numPuts := WatchStreamResponseBufferLen * 64
testKey, testValue := []byte("foo"), []byte("bar")

var wg sync.WaitGroup
Expand Down
51 changes: 51 additions & 0 deletions tests/integration/v3_watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,12 @@ import (
"bytes"
"context"
"fmt"
"github.com/stretchr/testify/assert"
"go.etcd.io/etcd/server/v3/storage/mvcc"
gofail "go.etcd.io/gofail/runtime"
"reflect"
"sort"
"strings"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -1512,3 +1516,50 @@ func TestV3WatchProgressWaitsForSyncNoEvents(t *testing.T) {
}
require.True(t, gotProgressNotification, "Expected to get progress notification")
}

func TestV3NoEventsLostOnCompact(t *testing.T) {
if integration.ThroughProxy {
t.Skip("grpc proxy currently does not support requesting progress notifications")
}
integration.BeforeTest(t)

clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1})
defer clus.Terminate(t)

client := clus.RandClient()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

writeCount := mvcc.WatchStreamResponseBufferLen * 11 / 10

wch := client.Watch(ctx, "foo")
require.NoError(t, gofail.Enable("watchResponseSend", `sleep(1000)`))
var rev int64 = 0
for i := 0; i < writeCount; i++ {
resp, err := client.Put(ctx, "foo", "bar")
require.NoError(t, err)
rev = resp.Header.Revision
}
_, err := client.Compact(ctx, rev)
require.NoError(t, err)
time.Sleep(time.Second)
require.NoError(t, gofail.Disable("watchResponseSend"))

event_count := 0
compacted := false
for resp := range wch {
err = resp.Err()
if err != nil {
if !strings.Contains(err.Error(), "required revision has been compacted") {
t.Fatal(err)
}
compacted = true
break
}
event_count += len(resp.Events)
if event_count == writeCount {
break
}
}
assert.Truef(t, compacted, "Expected stream to get compacted, instead we got %d events out of %d events", event_count, writeCount)
}
4 changes: 2 additions & 2 deletions tests/robustness/makefile.mk
Original file line number Diff line number Diff line change
Expand Up @@ -36,15 +36,15 @@ GOFAIL_VERSION = $(shell cd tools/mod && go list -m -f {{.Version}} go.etcd.io/g

.PHONY: gofail-enable
gofail-enable: install-gofail
gofail enable server/etcdserver/ server/storage/backend/ server/storage/mvcc/ server/storage/wal/
gofail enable server/etcdserver/ server/storage/backend/ server/storage/mvcc/ server/storage/wal/ server/etcdserver/api/v3rpc/
cd ./server && go get go.etcd.io/gofail@${GOFAIL_VERSION}
cd ./etcdutl && go get go.etcd.io/gofail@${GOFAIL_VERSION}
cd ./etcdctl && go get go.etcd.io/gofail@${GOFAIL_VERSION}
cd ./tests && go get go.etcd.io/gofail@${GOFAIL_VERSION}

.PHONY: gofail-disable
gofail-disable: install-gofail
gofail disable server/etcdserver/ server/storage/backend/ server/storage/mvcc/ server/storage/wal/
gofail disable server/etcdserver/ server/storage/backend/ server/storage/mvcc/ server/storage/wal/ server/etcdserver/api/v3rpc/
cd ./server && go mod tidy
cd ./etcdutl && go mod tidy
cd ./etcdctl && go mod tidy
Expand Down

0 comments on commit b9efa83

Please sign in to comment.