Skip to content

Commit e28b98c

Browse files
authored
Add iteration support to memory backend (#52575)
This builds on top of #52199 by updating the memory backend to implement backend.BackendWithItems. Only GetRange, and not DeletRange, was refactored to use Items to retrieve a range.
1 parent 904ff4f commit e28b98c

File tree

1 file changed

+108
-32
lines changed

1 file changed

+108
-32
lines changed

lib/backend/memory/memory.go

+108-32
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ package memory
2121
import (
2222
"bytes"
2323
"context"
24+
"iter"
2425
"log/slog"
2526
"sync"
2627
"time"
@@ -276,8 +277,19 @@ func (m *Memory) DeleteRange(ctx context.Context, startKey, endKey backend.Key)
276277
m.Lock()
277278
defer m.Unlock()
278279
m.removeExpired()
279-
re := m.getRange(ctx, startKey, endKey, backend.NoLimit)
280-
for _, item := range re.Items {
280+
281+
var items []backend.Item
282+
m.tree.AscendGreaterOrEqual(&btreeItem{Item: backend.Item{Key: startKey}}, func(item *btreeItem) bool {
283+
if endKey.Compare(item.Key) < 0 {
284+
return false
285+
}
286+
287+
items = append(items, item.Item)
288+
289+
return true
290+
})
291+
292+
for _, item := range items {
281293
event := backend.Event{
282294
Type: types.OpDelete,
283295
Item: item,
@@ -290,25 +302,106 @@ func (m *Memory) DeleteRange(ctx context.Context, startKey, endKey backend.Key)
290302
return nil
291303
}
292304

293-
// GetRange returns query range
294-
func (m *Memory) GetRange(ctx context.Context, startKey, endKey backend.Key, limit int) (*backend.GetResult, error) {
295-
if startKey.IsZero() {
296-
return nil, trace.BadParameter("missing parameter startKey")
305+
func (m *Memory) Items(ctx context.Context, params backend.IterateParams) iter.Seq2[backend.Item, error] {
306+
if params.StartKey.IsZero() {
307+
err := trace.BadParameter("missing parameter startKey")
308+
return func(yield func(backend.Item, error) bool) { yield(backend.Item{}, err) }
297309
}
298-
if endKey.IsZero() {
299-
return nil, trace.BadParameter("missing parameter endKey")
310+
if params.EndKey.IsZero() {
311+
err := trace.BadParameter("missing parameter endKey")
312+
return func(yield func(backend.Item, error) bool) { yield(backend.Item{}, err) }
300313
}
314+
315+
limit := params.Limit
301316
if limit <= 0 {
302317
limit = backend.DefaultRangeLimit
303318
}
304-
m.Lock()
305-
defer m.Unlock()
306-
m.removeExpired()
307-
re := m.getRange(ctx, startKey, endKey, limit)
308-
if len(re.Items) == backend.DefaultRangeLimit {
309-
m.logger.WarnContext(ctx, "Range query hit backend limit. (this is a bug!)", "start_key", startKey, "limit", backend.DefaultRangeLimit)
319+
320+
const defaultPageSize = 1000
321+
return func(yield func(backend.Item, error) bool) {
322+
var totalCount int
323+
defer func() {
324+
if totalCount >= backend.DefaultRangeLimit {
325+
m.logger.WarnContext(ctx, "Range query hit backend limit. (this is a bug!)", "start_key", params.StartKey, "limit", backend.DefaultRangeLimit)
326+
}
327+
}()
328+
329+
startKey := params.StartKey
330+
endKey := params.EndKey
331+
compareDirection := 1
332+
itemIter := m.tree.AscendGreaterOrEqual
333+
if params.Descending {
334+
startKey = params.EndKey
335+
endKey = params.StartKey
336+
compareDirection = -1
337+
itemIter = m.tree.DescendLessOrEqual
338+
}
339+
340+
btreeItems := func(start *btreeItem) iter.Seq[*btreeItem] {
341+
return func(yield func(*btreeItem) bool) {
342+
m.Lock()
343+
defer m.Unlock()
344+
m.removeExpired()
345+
346+
itemIter(start, yield)
347+
}
348+
}
349+
350+
var excludedStart bool
351+
items := make([]backend.Item, 0, min(limit, defaultPageSize))
352+
startItem := &btreeItem{Item: backend.Item{Key: startKey}}
353+
for {
354+
pageLimit := min(limit-totalCount, defaultPageSize)
355+
items = items[:0]
356+
357+
for item := range btreeItems(startItem) {
358+
if item.Key.Compare(endKey)*compareDirection > 0 {
359+
break
360+
}
361+
362+
if excludedStart {
363+
excludedStart = false
364+
if item.Key.Compare(startItem.Key) <= 0 {
365+
continue
366+
}
367+
}
368+
369+
items = append(items, item.Item)
370+
if len(items) >= pageLimit {
371+
startItem = item
372+
excludedStart = true
373+
break
374+
}
375+
}
376+
377+
for _, item := range items {
378+
if !yield(item, nil) {
379+
return
380+
}
381+
382+
totalCount++
383+
if limit != backend.NoLimit && totalCount >= limit {
384+
return
385+
}
386+
}
387+
388+
if len(items) < pageLimit {
389+
return
390+
}
391+
}
392+
}
393+
}
394+
395+
// GetRange returns query range
396+
func (m *Memory) GetRange(ctx context.Context, startKey, endKey backend.Key, limit int) (*backend.GetResult, error) {
397+
var result backend.GetResult
398+
for item, err := range m.Items(ctx, backend.IterateParams{StartKey: startKey, EndKey: endKey, Limit: limit}) {
399+
if err != nil {
400+
return nil, trace.Wrap(err)
401+
}
402+
result.Items = append(result.Items, item)
310403
}
311-
return &re, nil
404+
return &result, nil
312405
}
313406

314407
// KeepAlive updates TTL on the lease
@@ -439,23 +532,6 @@ func (m *Memory) NewWatcher(ctx context.Context, watch backend.Watch) (backend.W
439532
return m.buf.NewWatcher(ctx, watch)
440533
}
441534

442-
func (m *Memory) getRange(ctx context.Context, startKey, endKey backend.Key, limit int) backend.GetResult {
443-
var res backend.GetResult
444-
startItem := &btreeItem{Item: backend.Item{Key: startKey}}
445-
endItem := &btreeItem{Item: backend.Item{Key: endKey}}
446-
m.tree.AscendGreaterOrEqual(startItem, func(item *btreeItem) bool {
447-
if endItem.Less(item) {
448-
return false
449-
}
450-
res.Items = append(res.Items, item.Item)
451-
if limit > 0 && len(res.Items) >= limit {
452-
return false
453-
}
454-
return true
455-
})
456-
return res
457-
}
458-
459535
// removeExpired makes a pass through map and removes expired elements
460536
// returns the number of expired elements removed
461537
func (m *Memory) removeExpired() int {

0 commit comments

Comments
 (0)