diff --git a/pkg/storage/cacher/cacher.go b/pkg/storage/cacher/cacher.go index b6dcd2a87..ec7d194ff 100644 --- a/pkg/storage/cacher/cacher.go +++ b/pkg/storage/cacher/cacher.go @@ -606,75 +606,40 @@ func shouldDelegateList(opts storage.ListOptions) bool { // GetToList implements storage.Interface. func (c *Cacher) GetToList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { - resourceVersion := opts.ResourceVersion - pred := opts.Predicate - if shouldDelegateList(opts) { - return c.storage.GetToList(ctx, key, opts, listObj) - } + return c.list(ctx, key, opts, listObj, false) +} - // If resourceVersion is specified, serve it from cache. - // It's guaranteed that the returned value is at least that - // fresh as the given resourceVersion. - listRV, err := c.versioner.ParseResourceVersion(resourceVersion) - if err != nil { - return err - } +// List implements storage.Interface. +func (c *Cacher) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { + return c.list(ctx, key, opts, listObj, true) +} - if listRV == 0 && !c.ready.check() { - // If Cacher is not yet initialized and we don't require any specific - // minimal resource version, simply forward the request to storage. +func (c *Cacher) delegateList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object, recursive bool) error { + if !recursive { return c.storage.GetToList(ctx, key, opts, listObj) } + return c.storage.List(ctx, key, opts, listObj) +} - trace := utiltrace.New("cacher list", utiltrace.Field{"type", c.objectType.String()}) - defer trace.LogIfLong(500 * time.Millisecond) - - c.ready.wait() - trace.Step("Ready") - - // List elements with at least 'listRV' from cache. - listPtr, err := meta.GetItemsPtr(listObj) - if err != nil { - return err - } - listVal, err := conversion.EnforcePtr(listPtr) - if err != nil { - return err - } - if listVal.Kind() != reflect.Slice { - return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind()) - } - filter := filterWithAttrsFunction(key, pred) - - obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRV, key, trace) - if err != nil { - return err - } - trace.Step("Got from cache") - - if exists { - elem, ok := obj.(*storeElement) - if !ok { - return fmt.Errorf("non *storeElement returned from storage: %v", obj) - } - if filter(elem.Key, elem.Labels, elem.Fields) { - listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem())) +func (c *Cacher) listItems(listRV uint64, key string, pred storage.SelectionPredicate, trace *utiltrace.Trace, recursive bool) ([]interface{}, uint64, string, error) { + if !recursive { + obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRV, key, trace) + if err != nil { + return nil, 0, "", err } - } - if c.versioner != nil { - if err := c.versioner.UpdateList(listObj, readResourceVersion, "", nil); err != nil { - return err + if exists { + return []interface{}{obj}, readResourceVersion, "", nil } + return nil, readResourceVersion, "", nil } - return nil + return c.watchCache.WaitUntilFreshAndList(listRV, pred.MatcherIndex(), trace) } -// List implements storage.Interface. -func (c *Cacher) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { +func (c *Cacher) list(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object, recursive bool) error { resourceVersion := opts.ResourceVersion pred := opts.Predicate if shouldDelegateList(opts) { - return c.storage.List(ctx, key, opts, listObj) + return c.delegateList(ctx, key, opts, listObj, recursive) } // If resourceVersion is specified, serve it from cache. @@ -688,7 +653,7 @@ func (c *Cacher) List(ctx context.Context, key string, opts storage.ListOptions, if listRV == 0 && !c.ready.check() { // If Cacher is not yet initialized and we don't require any specific // minimal resource version, simply forward the request to storage. - return c.storage.List(ctx, key, opts, listObj) + return c.delegateList(ctx, key, opts, listObj, recursive) } trace := utiltrace.New("cacher list", utiltrace.Field{"type", c.objectType.String()}) @@ -711,7 +676,7 @@ func (c *Cacher) List(ctx context.Context, key string, opts storage.ListOptions, } filter := filterWithAttrsFunction(key, pred) - objs, readResourceVersion, indexUsed, err := c.watchCache.WaitUntilFreshAndList(listRV, pred.MatcherIndex(), trace) + objs, readResourceVersion, indexUsed, err := c.listItems(listRV, key, pred, trace, recursive) if err != nil { return err }