Skip to content

Commit

Permalink
Implement batching by event size
Browse files Browse the repository at this point in the history
Signed-off-by: Marek Siarkowicz <siarkowicz@google.com>
  • Loading branch information
serathius committed Dec 1, 2024
1 parent 0600023 commit 3880548
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 43 deletions.
6 changes: 0 additions & 6 deletions server/storage/mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,12 +450,6 @@ func kvsToEvents(lg *zap.Logger, wg *watcherGroup, revs, vals [][]byte) (evs []m
func (s *watchableStore) notify(rev int64, evs []mvccpb.Event) {
victim := make(watcherBatch)
for w, eb := range newWatcherBatch(&s.synced, evs) {
if eb.revs != 1 {
s.store.lg.Panic(
"unexpected multiple revisions in watch notification",
zap.Int("number-of-revisions", eb.revs),
)
}
if w.send(WatchResponse{WatchID: w.id, Events: eb.evs, Revision: rev}) {
pendingEventsGauge.Add(float64(len(eb.evs)))
} else {
Expand Down
61 changes: 43 additions & 18 deletions server/storage/mvcc/watchable_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,64 +399,89 @@ func TestWatchBatchUnsynced(t *testing.T) {
tcs := []struct {
name string
revisions int
watchBatchMaxRevs int
eventSize int
watchBatchMaxSize int
eventsPerRevision int
expectRevisionBatches [][]int64
}{
{
name: "3 revisions, 4 revs per batch, 1 events per revision",
revisions: 12,
watchBatchMaxRevs: 4,
name: "Fits into a single batch",
revisions: 10,
eventSize: 100,
watchBatchMaxSize: 1000,
eventsPerRevision: 1,
expectRevisionBatches: [][]int64{
{2, 3, 4, 5},
{6, 7, 8, 9},
{10, 11, 12, 13},
{2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
},
},
{
name: "3 revisions, 4 revs per batch, 3 events per revision",
revisions: 12,
watchBatchMaxRevs: 4,
name: "Spills to second batch",
revisions: 11,
eventSize: 100,
watchBatchMaxSize: 1000,
eventsPerRevision: 1,
expectRevisionBatches: [][]int64{
{2, 3, 4, 5, 6, 7, 8, 9, 10, 11},
{12},
},
},
{
name: "Spills to second batch, but maintains revision pairs",
revisions: 6,
eventSize: 100,
watchBatchMaxSize: 1000,
eventsPerRevision: 2,
expectRevisionBatches: [][]int64{
{2, 2, 3, 3, 4, 4, 5, 5, 6, 6},
{7, 7},
},
},
{
name: "Spills to second batch, but maintains revision triples",
revisions: 5,
eventSize: 100,
watchBatchMaxSize: 1000,
eventsPerRevision: 3,
expectRevisionBatches: [][]int64{
{2, 2, 2, 3, 3, 3, 4, 4, 4, 5, 5, 5},
{6, 6, 6, 7, 7, 7, 8, 8, 8, 9, 9, 9},
{10, 10, 10, 11, 11, 11, 12, 12, 12, 13, 13, 13},
{6, 6, 6},
},
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
b, _ := betesting.NewDefaultTmpBackend(t)
s := New(zaptest.NewLogger(t), b, &lease.FakeLessor{}, StoreConfig{})
oldMaxRevs := watchBatchMaxRevs
oldMaxRevs := watchBatchMaxSize
defer func() {
watchBatchMaxRevs = oldMaxRevs
watchBatchMaxSize = oldMaxRevs
cleanup(s, b)
}()
watchBatchMaxRevs = tc.watchBatchMaxRevs
watchBatchMaxSize = tc.watchBatchMaxSize

v := []byte("foo")
k := []byte("k")
eventProtoOverhead := 13
v := make([]byte, tc.eventSize-eventProtoOverhead)
for i := 0; i < tc.revisions; i++ {
txn := s.Write(traceutil.TODO())
for j := 0; j < tc.eventsPerRevision; j++ {
txn.Put(v, v, lease.NoLease)
txn.Put(k, v, lease.NoLease)
}
txn.End()
}

w := s.NewWatchStream()
defer w.Close()

w.Watch(0, v, nil, 1)
w.Watch(0, k, nil, 1)
var revisionBatches [][]int64
eventCount := 0
for eventCount < tc.revisions*tc.eventsPerRevision {
var revisions []int64
for _, e := range (<-w.Chan()).Events {
revisions = append(revisions, e.Kv.ModRevision)
eventCount++
assert.Equal(t, tc.eventSize, e.Size())
}
revisionBatches = append(revisionBatches, revisions)
}
Expand Down
36 changes: 17 additions & 19 deletions server/storage/mvcc/watcher_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,44 +22,42 @@ import (
"go.etcd.io/etcd/pkg/v3/adt"
)

// watchBatchMaxRevs is the maximum distinct revisions that
// watchBatchMaxSize is the maximum distinct revisions that
// may be sent to an unsynced watcher at a time. Declared as
// var instead of const for testing purposes.
var watchBatchMaxRevs = 1000
var watchBatchMaxSize = 2 * 1024 * 1024

type eventBatch struct {
// evs is a batch of revision-ordered events
evs []mvccpb.Event
// revs is the minimum unique revisions observed for this batch
revs int
// evsSize is total size of events in the batch.
evsSize int
// moreRev is first revision with more events following this batch
moreRev int64
}

func (eb *eventBatch) add(ev mvccpb.Event) {
if eb.revs > watchBatchMaxRevs {
// maxed out batch size
return
}

if len(eb.evs) == 0 {
// base case
eb.revs = 1
eb.evsSize = ev.Size()
eb.evs = append(eb.evs, ev)
return
}

// revision accounting
ebRev := eb.evs[len(eb.evs)-1].Kv.ModRevision
evRev := ev.Kv.ModRevision
if evRev > ebRev {
eb.revs++
if eb.revs > watchBatchMaxRevs {
eb.moreRev = evRev
return

if evRev == ebRev {
eb.evsSize += ev.Size()
eb.evs = append(eb.evs, ev)
return
}
size := ev.Size()
if eb.evsSize+size > watchBatchMaxSize {
if eb.moreRev == 0 {
eb.moreRev = ev.Kv.ModRevision
}
return
}

eb.evsSize += size
eb.evs = append(eb.evs, ev)
}

Expand Down

0 comments on commit 3880548

Please sign in to comment.