Skip to content
This repository has been archived by the owner on Mar 16, 2024. It is now read-only.

Commit

Permalink
Unify List and GetToList functions of etcd3
Browse files Browse the repository at this point in the history
Kubernetes-commit: 27db8e34cfc930972beb8b5a33c443e134bcd614
  • Loading branch information
wojtek-t authored and k8s-publishing-bot committed Nov 22, 2021
1 parent 0002a59 commit a55c2c8
Showing 1 changed file with 22 additions and 64 deletions.
86 changes: 22 additions & 64 deletions pkg/storage/etcd3/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,58 +458,7 @@ func (s *store) GuaranteedUpdate(

// GetToList implements storage.Interface.GetToList.
func (s *store) GetToList(ctx context.Context, key string, listOpts storage.ListOptions, listObj runtime.Object) error {
resourceVersion := listOpts.ResourceVersion
match := listOpts.ResourceVersionMatch
pred := listOpts.Predicate
trace := utiltrace.New("GetToList etcd3",
utiltrace.Field{"key", key},
utiltrace.Field{"resourceVersion", resourceVersion},
utiltrace.Field{"resourceVersionMatch", match},
utiltrace.Field{"limit", pred.Limit},
utiltrace.Field{"continue", pred.Continue})
defer trace.LogIfLong(500 * time.Millisecond)
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
return err
}
v, err := conversion.EnforcePtr(listPtr)
if err != nil || v.Kind() != reflect.Slice {
return fmt.Errorf("need ptr to slice: %v", err)
}

newItemFunc := getNewItemFunc(listObj, v)

key = path.Join(s.pathPrefix, key)
startTime := time.Now()
var opts []clientv3.OpOption
if len(resourceVersion) > 0 && match == metav1.ResourceVersionMatchExact {
rv, err := s.versioner.ParseResourceVersion(resourceVersion)
if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
}
opts = append(opts, clientv3.WithRev(int64(rv)))
}

getResp, err := s.client.KV.Get(ctx, key, opts...)
metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
if err != nil {
return err
}
if err = s.validateMinimumResourceVersion(resourceVersion, uint64(getResp.Header.Revision)); err != nil {
return err
}

if len(getResp.Kvs) > 0 {
data, _, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value, authenticatedDataString(key))
if err != nil {
return storage.NewInternalError(err.Error())
}
if err := appendListItem(v, data, uint64(getResp.Kvs[0].ModRevision), pred, s.codec, s.versioner, newItemFunc); err != nil {
return err
}
}
// update version with cluster level revision
return s.versioner.UpdateList(listObj, uint64(getResp.Header.Revision), "", nil)
return s.list(ctx, key, listOpts, listObj, false)
}

func getNewItemFunc(listObj runtime.Object, v reflect.Value) func() runtime.Object {
Expand Down Expand Up @@ -610,10 +559,14 @@ func encodeContinue(key, keyPrefix string, resourceVersion int64) (string, error

// List implements storage.Interface.List.
func (s *store) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
return s.list(ctx, key, opts, listObj, true)
}

func (s *store) list(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object, recursive bool) error {
resourceVersion := opts.ResourceVersion
match := opts.ResourceVersionMatch
pred := opts.Predicate
trace := utiltrace.New("List etcd3",
trace := utiltrace.New(fmt.Sprintf("List(recursive=%v) etcd3", recursive),
utiltrace.Field{"key", key},
utiltrace.Field{"resourceVersion", resourceVersion},
utiltrace.Field{"resourceVersionMatch", match},
Expand All @@ -628,14 +581,13 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions,
if err != nil || v.Kind() != reflect.Slice {
return fmt.Errorf("need ptr to slice: %v", err)
}
key = path.Join(s.pathPrefix, key)

if s.pathPrefix != "" {
key = path.Join(s.pathPrefix, key)
}
// We need to make sure the key ended with "/" so that we only get children "directories".
// e.g. if we have key "/a", "/a/b", "/ab", getting keys with prefix "/a" will return all three,
// while with prefix "/a/" will return only "/a/b" which is the correct answer.
if !strings.HasSuffix(key, "/") {
// For recursive lists, we need to make sure the key ended with "/" so that we only
// get children "directories". e.g. if we have key "/a", "/a/b", "/ab", getting keys
// with prefix "/a" will return all three, while with prefix "/a/" will return only
// "/a/b" which is the correct answer.
if recursive && !strings.HasSuffix(key, "/") {
key += "/"
}
keyPrefix := key
Expand All @@ -662,7 +614,7 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions,
var returnedRV, continueRV, withRev int64
var continueKey string
switch {
case s.pagingEnabled && len(pred.Continue) > 0:
case recursive && s.pagingEnabled && len(pred.Continue) > 0:
continueKey, continueRV, err = decodeContinue(pred.Continue, keyPrefix)
if err != nil {
return apierrors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err))
Expand All @@ -683,7 +635,7 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions,
withRev = continueRV
returnedRV = continueRV
}
case s.pagingEnabled && pred.Limit > 0:
case recursive && s.pagingEnabled && pred.Limit > 0:
if fromRV != nil {
switch match {
case metav1.ResourceVersionMatchNotOlderThan:
Expand Down Expand Up @@ -719,7 +671,9 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions,
}
}

options = append(options, clientv3.WithPrefix())
if recursive {
options = append(options, clientv3.WithPrefix())
}
}
if withRev != 0 {
options = append(options, clientv3.WithRev(withRev))
Expand All @@ -740,7 +694,11 @@ func (s *store) List(ctx context.Context, key string, opts storage.ListOptions,
for {
startTime := time.Now()
getResp, err = s.client.KV.Get(ctx, key, options...)
metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime)
if recursive {
metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime)
} else {
metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
}
if err != nil {
return interpretListError(err, len(pred.Continue) > 0, continueKey, keyPrefix)
}
Expand Down

0 comments on commit a55c2c8

Please sign in to comment.