Skip to content

Commit d717473

Browse files
authored
Add iteration support to dynamo (#52517)
This builds on top of #52199 by updating the dynamo backend to implement backend.BackendWithItems. Both GetRange and DeleteRange were refactored to call Items instead of getAllRecords to unify logic and vet the implementation of Items. The custom pagination logic to retrieve items was also replaced with an iterator that works in a similar fashio to the query paginator from the aws sdk. In addition to simplifying logic this also removed some extraneous sorting.
1 parent 674f4d6 commit d717473

File tree

1 file changed

+159
-160
lines changed

1 file changed

+159
-160
lines changed

lib/backend/dynamo/dynamodbbk.go

+159-160
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ package dynamo
2121
import (
2222
"context"
2323
"errors"
24+
"iter"
2425
"log/slog"
2526
"net/http"
26-
"sort"
2727
"strconv"
2828
"sync/atomic"
2929
"time"
@@ -548,63 +548,132 @@ func (b *Backend) Update(ctx context.Context, item backend.Item) (*backend.Lease
548548
return backend.NewLease(item), nil
549549
}
550550

551-
// GetRange returns range of elements
552-
func (b *Backend) GetRange(ctx context.Context, startKey, endKey backend.Key, limit int) (*backend.GetResult, error) {
553-
if startKey.IsZero() {
554-
return nil, trace.BadParameter("missing parameter startKey")
555-
}
556-
if endKey.IsZero() {
557-
return nil, trace.BadParameter("missing parameter endKey")
558-
}
551+
func (b *Backend) queryOutputPages(ctx context.Context, limit int, input *dynamodb.QueryInput) iter.Seq2[*dynamodb.QueryOutput, error] {
559552
if limit <= 0 {
560553
limit = backend.DefaultRangeLimit
561554
}
562555

563-
result, err := b.getAllRecords(ctx, startKey, endKey, limit)
564-
if err != nil {
565-
return nil, trace.Wrap(err)
566-
}
567-
sort.Sort(records(result.records))
568-
values := make([]backend.Item, len(result.records))
569-
for i, r := range result.records {
570-
values[i] = backend.Item{
571-
Key: trimPrefix(r.FullPath),
572-
Value: r.Value,
573-
Revision: r.Revision,
574-
}
575-
if r.Expires != nil {
576-
values[i].Expires = time.Unix(*r.Expires, 0).UTC()
577-
}
578-
if values[i].Revision == "" {
579-
values[i].Revision = backend.BlankRevision
556+
return func(yield func(*dynamodb.QueryOutput, error) bool) {
557+
const defaultPageSize = 1000
558+
var nextToken map[string]types.AttributeValue
559+
560+
totalCount := 0
561+
for {
562+
input.Limit = aws.Int32(int32(min(limit-totalCount, defaultPageSize)))
563+
564+
result, err := b.svc.Query(ctx, input)
565+
if err != nil {
566+
yield(nil, trace.Wrap(err))
567+
return
568+
}
569+
570+
nextToken = result.LastEvaluatedKey
571+
if !yield(result, nil) {
572+
return
573+
}
574+
575+
if nextToken == nil {
576+
return
577+
}
578+
579+
totalCount += len(result.Items)
580+
if totalCount >= limit {
581+
return
582+
}
583+
input.ExclusiveStartKey = nextToken
580584
}
581585
}
582-
return &backend.GetResult{Items: values}, nil
583586
}
584587

585-
func (b *Backend) getAllRecords(ctx context.Context, startKey, endKey backend.Key, limit int) (*getResult, error) {
586-
var result getResult
588+
func (b *Backend) Items(ctx context.Context, params backend.IterateParams) iter.Seq2[backend.Item, error] {
589+
if params.StartKey.IsZero() {
590+
err := trace.BadParameter("missing parameter startKey")
591+
return func(yield func(backend.Item, error) bool) { yield(backend.Item{}, err) }
592+
}
593+
if params.EndKey.IsZero() {
594+
err := trace.BadParameter("missing parameter endKey")
595+
return func(yield func(backend.Item, error) bool) { yield(backend.Item{}, err) }
596+
}
587597

588-
// this code is being extra careful here not to introduce endless loop
589-
// by some unfortunate series of events
590-
for i := 0; i < backend.DefaultRangeLimit/100; i++ {
591-
re, err := b.getRecords(ctx, prependPrefix(startKey), prependPrefix(endKey), limit, result.lastEvaluatedKey)
598+
const (
599+
query = "HashKey = :hashKey AND FullPath BETWEEN :rangeStart AND :rangeEnd"
600+
601+
// filter out expired items, otherwise they might show up in the query
602+
// http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/howitworks-ttl.html
603+
filter = "attribute_not_exists(Expires) OR Expires >= :timestamp"
604+
)
605+
606+
av := map[string]types.AttributeValue{
607+
":rangeStart": &types.AttributeValueMemberS{Value: prependPrefix(params.StartKey)},
608+
":rangeEnd": &types.AttributeValueMemberS{Value: prependPrefix(params.EndKey)},
609+
":timestamp": timeToAttributeValue(b.clock.Now().UTC()),
610+
":hashKey": &types.AttributeValueMemberS{Value: hashKey},
611+
}
612+
613+
input := dynamodb.QueryInput{
614+
KeyConditionExpression: aws.String(query),
615+
TableName: &b.TableName,
616+
ExpressionAttributeValues: av,
617+
FilterExpression: aws.String(filter),
618+
ConsistentRead: aws.Bool(true),
619+
ScanIndexForward: aws.Bool(!params.Descending),
620+
}
621+
622+
return func(yield func(backend.Item, error) bool) {
623+
count := 0
624+
defer func() {
625+
if count >= backend.DefaultRangeLimit {
626+
b.logger.WarnContext(ctx, "Range query hit backend limit. (this is a bug!)", "start_key", params.StartKey, "limit", backend.DefaultRangeLimit)
627+
}
628+
}()
629+
630+
for page, err := range b.queryOutputPages(ctx, params.Limit, &input) {
631+
if err != nil {
632+
yield(backend.Item{}, convertError(err))
633+
return
634+
}
635+
636+
for _, itemAttributes := range page.Items {
637+
var r record
638+
if err := attributevalue.UnmarshalMap(itemAttributes, &r); err != nil {
639+
yield(backend.Item{}, convertError(err))
640+
return
641+
}
642+
643+
item := backend.Item{
644+
Key: trimPrefix(r.FullPath),
645+
Value: r.Value,
646+
Revision: r.Revision,
647+
}
648+
if r.Expires != nil {
649+
item.Expires = time.Unix(*r.Expires, 0).UTC()
650+
}
651+
if item.Revision == "" {
652+
item.Revision = backend.BlankRevision
653+
}
654+
655+
if !yield(item, nil) {
656+
return
657+
}
658+
count++
659+
if params.Limit != backend.NoLimit && count >= params.Limit {
660+
return
661+
}
662+
}
663+
}
664+
}
665+
}
666+
667+
// GetRange returns range of elements
668+
func (b *Backend) GetRange(ctx context.Context, startKey, endKey backend.Key, limit int) (*backend.GetResult, error) {
669+
var result backend.GetResult
670+
for i, err := range b.Items(ctx, backend.IterateParams{StartKey: startKey, EndKey: endKey, Limit: limit}) {
592671
if err != nil {
593672
return nil, trace.Wrap(err)
594673
}
595-
result.records = append(result.records, re.records...)
596-
// If the limit was exceeded or there are no more records to fetch return the current result
597-
// otherwise updated lastEvaluatedKey and proceed with obtaining new records.
598-
if (limit != 0 && len(result.records) >= limit) || len(re.lastEvaluatedKey) == 0 {
599-
if len(result.records) == backend.DefaultRangeLimit {
600-
b.logger.WarnContext(ctx, "Range query hit backend limit. (this is a bug!)", "start_key", startKey, "limit", backend.DefaultRangeLimit)
601-
}
602-
result.lastEvaluatedKey = nil
603-
return &result, nil
604-
}
605-
result.lastEvaluatedKey = re.lastEvaluatedKey
674+
result.Items = append(result.Items, i)
606675
}
607-
return nil, trace.BadParameter("backend entered endless loop")
676+
return &result, nil
608677
}
609678

610679
const (
@@ -617,45 +686,63 @@ const (
617686

618687
// DeleteRange deletes range of items with keys between startKey and endKey
619688
func (b *Backend) DeleteRange(ctx context.Context, startKey, endKey backend.Key) error {
620-
if startKey.IsZero() {
621-
return trace.BadParameter("missing parameter startKey")
622-
}
623-
if endKey.IsZero() {
624-
return trace.BadParameter("missing parameter endKey")
625-
}
626-
// keep fetching and deleting until no records left,
627-
// keep the very large limit, just in case if someone else
628-
// keeps adding records
629-
for i := 0; i < backend.DefaultRangeLimit/100; i++ {
630-
result, err := b.getRecords(ctx, prependPrefix(startKey), prependPrefix(endKey), batchOperationItemsLimit, nil)
689+
// Attempt to pull all existing items and delete them in batches
690+
// in accordance with the BatchWriteItem limits. There is a hard
691+
// cap on the total number of items that can be deleted in a single
692+
// DeleteRange call to avoid racing with additional records being added.
693+
const maxDeletionOperations = backend.DefaultRangeLimit / 100 / batchOperationItemsLimit
694+
requests := make([]types.WriteRequest, 0, batchOperationItemsLimit)
695+
var deletions int
696+
for item, err := range b.Items(ctx, backend.IterateParams{StartKey: startKey, EndKey: endKey}) {
631697
if err != nil {
632698
return trace.Wrap(err)
633699
}
634-
if len(result.records) == 0 {
635-
return nil
700+
701+
if deletions >= maxDeletionOperations {
702+
break
636703
}
637-
requests := make([]types.WriteRequest, 0, len(result.records))
638-
for _, record := range result.records {
639-
requests = append(requests, types.WriteRequest{
640-
DeleteRequest: &types.DeleteRequest{
641-
Key: map[string]types.AttributeValue{
642-
hashKeyKey: &types.AttributeValueMemberS{Value: hashKey},
643-
fullPathKey: &types.AttributeValueMemberS{Value: record.FullPath},
644-
},
704+
705+
requests = append(requests, types.WriteRequest{
706+
DeleteRequest: &types.DeleteRequest{
707+
Key: map[string]types.AttributeValue{
708+
hashKeyKey: &types.AttributeValueMemberS{Value: hashKey},
709+
fullPathKey: &types.AttributeValueMemberS{Value: prependPrefix(item.Key)},
645710
},
646-
})
711+
},
712+
})
713+
714+
if len(requests) == batchOperationItemsLimit {
715+
if _, err := b.svc.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{
716+
RequestItems: map[string][]types.WriteRequest{
717+
b.TableName: requests,
718+
},
719+
}); err != nil {
720+
return trace.Wrap(err)
721+
}
722+
723+
requests = requests[:0]
724+
deletions++
725+
if deletions >= maxDeletionOperations {
726+
break
727+
}
647728
}
648-
input := dynamodb.BatchWriteItemInput{
729+
}
730+
731+
if deletions >= maxDeletionOperations {
732+
return trace.ConnectionProblem(nil, "not all items deleted, too many requests")
733+
}
734+
735+
if len(requests) > 0 {
736+
if _, err := b.svc.BatchWriteItem(ctx, &dynamodb.BatchWriteItemInput{
649737
RequestItems: map[string][]types.WriteRequest{
650738
b.TableName: requests,
651739
},
652-
}
653-
654-
if _, err = b.svc.BatchWriteItem(ctx, &input); err != nil {
740+
}); err != nil {
655741
return trace.Wrap(err)
656742
}
657743
}
658-
return trace.ConnectionProblem(nil, "not all items deleted, too many requests")
744+
745+
return nil
659746
}
660747

661748
// Get returns a single item or not found error
@@ -961,60 +1048,6 @@ func (b *Backend) createTable(ctx context.Context, tableName *string, rangeKey s
9611048
return trace.Wrap(err)
9621049
}
9631050

964-
type getResult struct {
965-
// lastEvaluatedKey is the primary key of the item where the operation stopped, inclusive of the
966-
// previous result set. Use this value to start a new operation, excluding this
967-
// value in the new request.
968-
lastEvaluatedKey map[string]types.AttributeValue
969-
records []record
970-
}
971-
972-
// getRecords retrieves all keys by path
973-
func (b *Backend) getRecords(ctx context.Context, startKey, endKey string, limit int, lastEvaluatedKey map[string]types.AttributeValue) (*getResult, error) {
974-
query := "HashKey = :hashKey AND FullPath BETWEEN :fullPath AND :rangeEnd"
975-
attrV := map[string]interface{}{
976-
":fullPath": startKey,
977-
":hashKey": hashKey,
978-
":timestamp": b.clock.Now().UTC().Unix(),
979-
":rangeEnd": endKey,
980-
}
981-
982-
// filter out expired items, otherwise they might show up in the query
983-
// http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/howitworks-ttl.html
984-
filter := "attribute_not_exists(Expires) OR Expires >= :timestamp"
985-
av, err := attributevalue.MarshalMap(attrV)
986-
if err != nil {
987-
return nil, convertError(err)
988-
}
989-
input := dynamodb.QueryInput{
990-
KeyConditionExpression: aws.String(query),
991-
TableName: &b.TableName,
992-
ExpressionAttributeValues: av,
993-
FilterExpression: aws.String(filter),
994-
ConsistentRead: aws.Bool(true),
995-
ExclusiveStartKey: lastEvaluatedKey,
996-
}
997-
if limit > 0 {
998-
input.Limit = aws.Int32(int32(limit))
999-
}
1000-
out, err := b.svc.Query(ctx, &input)
1001-
if err != nil {
1002-
return nil, trace.Wrap(err)
1003-
}
1004-
var result getResult
1005-
for _, item := range out.Items {
1006-
var r record
1007-
if err := attributevalue.UnmarshalMap(item, &r); err != nil {
1008-
return nil, trace.Wrap(err)
1009-
}
1010-
result.records = append(result.records, r)
1011-
}
1012-
sort.Sort(records(result.records))
1013-
result.records = removeDuplicates(result.records)
1014-
result.lastEvaluatedKey = out.LastEvaluatedKey
1015-
return &result, nil
1016-
}
1017-
10181051
// isExpired returns 'true' if the given object (record) has a TTL and
10191052
// it's due.
10201053
func (r *record) isExpired(now time.Time) bool {
@@ -1025,23 +1058,6 @@ func (r *record) isExpired(now time.Time) bool {
10251058
return now.UTC().After(expiryDateUTC)
10261059
}
10271060

1028-
func removeDuplicates(elements []record) []record {
1029-
// Use map to record duplicates as we find them.
1030-
encountered := map[string]bool{}
1031-
var result []record
1032-
1033-
for v := range elements {
1034-
if !encountered[elements[v].FullPath] {
1035-
// Record this element as an encountered element.
1036-
encountered[elements[v].FullPath] = true
1037-
// Append to result slice.
1038-
result = append(result, elements[v])
1039-
}
1040-
}
1041-
// Return the new slice.
1042-
return result
1043-
}
1044-
10451061
const (
10461062
modeCreate = iota
10471063
modePut
@@ -1235,23 +1251,6 @@ func convertError(err error) error {
12351251
return err
12361252
}
12371253

1238-
type records []record
1239-
1240-
// Len is part of sort.Interface.
1241-
func (r records) Len() int {
1242-
return len(r)
1243-
}
1244-
1245-
// Swap is part of sort.Interface.
1246-
func (r records) Swap(i, j int) {
1247-
r[i], r[j] = r[j], r[i]
1248-
}
1249-
1250-
// Less is part of sort.Interface.
1251-
func (r records) Less(i, j int) bool {
1252-
return r[i].FullPath < r[j].FullPath
1253-
}
1254-
12551254
func fullPathToAttributeValueMap(fullPath string) map[string]types.AttributeValue {
12561255
return map[string]types.AttributeValue{
12571256
hashKeyKey: &types.AttributeValueMemberS{Value: hashKey},

0 commit comments

Comments
 (0)