@@ -25,6 +25,7 @@ import (
25
25
"errors"
26
26
"fmt"
27
27
"io/fs"
28
+ "iter"
28
29
"log/slog"
29
30
"net/url"
30
31
"os"
@@ -629,6 +630,98 @@ func (l *Backend) getInTransaction(ctx context.Context, key backend.Key, tx *sql
629
630
return nil
630
631
}
631
632
633
+ func (l * Backend ) Items (ctx context.Context , params backend.IterateParams ) iter.Seq2 [backend.Item , error ] {
634
+ if params .StartKey .IsZero () {
635
+ err := trace .BadParameter ("missing parameter startKey" )
636
+ return func (yield func (backend.Item , error ) bool ) { yield (backend.Item {}, err ) }
637
+ }
638
+ if params .EndKey .IsZero () {
639
+ err := trace .BadParameter ("missing parameter endKey" )
640
+ return func (yield func (backend.Item , error ) bool ) { yield (backend.Item {}, err ) }
641
+ }
642
+
643
+ const (
644
+ queryAsc = "SELECT key, value, expires, revision FROM kv WHERE (key BETWEEN ? AND ?) AND (? == '' OR key > ?) AND (expires IS NULL OR expires > ?) ORDER BY key ASC LIMIT ?"
645
+ queryDesc = "SELECT key, value, expires, revision FROM kv WHERE (key BETWEEN ? AND ?) AND (? == '' OR key < ?) AND (expires IS NULL OR expires > ?) ORDER BY key DESC LIMIT ?"
646
+ defaultPageSize = 1000
647
+ )
648
+ return func (yield func (backend.Item , error ) bool ) {
649
+ limit := params .Limit
650
+ if limit <= 0 {
651
+ limit = backend .DefaultRangeLimit
652
+ }
653
+
654
+ var exclusiveStartKey string
655
+ startKey := params .StartKey .String ()
656
+ endKey := params .EndKey .String ()
657
+
658
+ query := queryAsc
659
+ if params .Descending {
660
+ query = queryDesc
661
+ }
662
+
663
+ var pageLimit , totalCount int
664
+ items := make ([]backend.Item , 0 , min (limit , defaultPageSize ))
665
+ for {
666
+ items = items [:0 ]
667
+ pageLimit = min (limit - totalCount , defaultPageSize )
668
+ if err := l .inTransaction (ctx , func (tx * sql.Tx ) error {
669
+ q , err := tx .PrepareContext (ctx , query )
670
+ if err != nil {
671
+ return trace .Wrap (err )
672
+ }
673
+ defer q .Close ()
674
+
675
+ rows , err := q .QueryContext (ctx , startKey , endKey , exclusiveStartKey , exclusiveStartKey , l .clock .Now ().UTC (), pageLimit )
676
+ if err != nil {
677
+ return trace .Wrap (err )
678
+ }
679
+ defer rows .Close ()
680
+
681
+ for rows .Next () {
682
+ var item backend.Item
683
+ var expires sql.NullTime
684
+ if err := rows .Scan (& item .Key , & item .Value , & expires , & item .Revision ); err != nil {
685
+ return trace .Wrap (err )
686
+ }
687
+ item .Expires = expires .Time
688
+ if item .Revision == "" {
689
+ item .Revision = backend .BlankRevision
690
+ }
691
+
692
+ items = append (items , item )
693
+ }
694
+
695
+ // Explicitly call rows.Close() to return the error instead of
696
+ // it being ignored in the defer above.
697
+ return trace .Wrap (rows .Close ())
698
+ }); err != nil {
699
+ yield (backend.Item {}, trace .Wrap (err ))
700
+ return
701
+ }
702
+
703
+ if len (items ) >= pageLimit {
704
+ exclusiveStartKey = items [len (items )- 1 ].Key .String ()
705
+ }
706
+
707
+ for _ , item := range items {
708
+ if ! yield (item , nil ) {
709
+ return
710
+ }
711
+
712
+ totalCount ++
713
+ if limit != backend .NoLimit && totalCount >= limit {
714
+ return
715
+ }
716
+ }
717
+
718
+ if len (items ) < pageLimit {
719
+ return
720
+ }
721
+ }
722
+ }
723
+ }
724
+
632
725
// GetRange returns query range
633
726
func (l * Backend ) GetRange (ctx context.Context , startKey , endKey backend.Key , limit int ) (* backend.GetResult , error ) {
634
727
if startKey .IsZero () {
@@ -642,37 +735,13 @@ func (l *Backend) GetRange(ctx context.Context, startKey, endKey backend.Key, li
642
735
}
643
736
644
737
var result backend.GetResult
645
- err := l .inTransaction (ctx , func (tx * sql.Tx ) error {
646
- q , err := tx .PrepareContext (ctx ,
647
- "SELECT key, value, expires, revision FROM kv WHERE (key >= ? and key <= ?) AND (expires is NULL or expires > ?) ORDER BY key LIMIT ?" )
648
- if err != nil {
649
- return trace .Wrap (err )
650
- }
651
- defer q .Close ()
652
-
653
- rows , err := q .QueryContext (ctx , startKey .String (), endKey .String (), l .clock .Now ().UTC (), limit )
738
+ for item , err := range l .Items (ctx , backend.IterateParams {StartKey : startKey , EndKey : endKey , Limit : limit }) {
654
739
if err != nil {
655
- return trace .Wrap (err )
656
- }
657
- defer rows .Close ()
658
-
659
- for rows .Next () {
660
- var i backend.Item
661
- var expires sql.NullTime
662
- if err := rows .Scan (& i .Key , & i .Value , & expires , & i .Revision ); err != nil {
663
- return trace .Wrap (err )
664
- }
665
- i .Expires = expires .Time
666
- if i .Revision == "" {
667
- i .Revision = backend .BlankRevision
668
- }
669
- result .Items = append (result .Items , i )
740
+ return nil , trace .Wrap (err )
670
741
}
671
- return nil
672
- })
673
- if err != nil {
674
- return nil , trace .Wrap (err )
742
+ result .Items = append (result .Items , item )
675
743
}
744
+
676
745
if len (result .Items ) == backend .DefaultRangeLimit {
677
746
l .logger .WarnContext (ctx , "Range query hit backend limit. (this is a bug!)" , "start_key" , startKey , "limit" , backend .DefaultRangeLimit )
678
747
}
@@ -790,18 +859,25 @@ func (l *Backend) DeleteRange(ctx context.Context, startKey, endKey backend.Key)
790
859
if err != nil {
791
860
return trace .Wrap (err )
792
861
}
793
- defer rows . Close ()
862
+
794
863
var keys []backend.Key
864
+ defer rows .Close ()
795
865
for rows .Next () {
796
866
var key backend.Key
797
867
if err := rows .Scan (& key ); err != nil {
798
868
return trace .Wrap (err )
799
869
}
870
+
800
871
keys = append (keys , key )
801
872
}
802
873
803
- for i := range keys {
804
- if err := l .deleteInTransaction (l .ctx , keys [i ], tx ); err != nil {
874
+ // Close rows early before any deletions occur.
875
+ if err := rows .Close (); err != nil {
876
+ return trace .Wrap (err )
877
+ }
878
+
879
+ for _ , key := range keys {
880
+ if err := l .deleteInTransaction (l .ctx , key , tx ); err != nil {
805
881
return trace .Wrap (err )
806
882
}
807
883
}
0 commit comments