Skip to content
This repository has been archived by the owner on Mar 16, 2024. It is now read-only.

Commit

Permalink
Unify List and GetToList functions of cacher.
Browse files Browse the repository at this point in the history
Kubernetes-commit: c86543d9cb302859510604256fdb81ea1b0d46fb
  • Loading branch information
wojtek-t authored and k8s-publishing-bot committed Nov 18, 2021
1 parent 5d36bfd commit 0002a59
Showing 1 changed file with 23 additions and 58 deletions.
81 changes: 23 additions & 58 deletions pkg/storage/cacher/cacher.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,75 +606,40 @@ func shouldDelegateList(opts storage.ListOptions) bool {

// GetToList implements storage.Interface.
func (c *Cacher) GetToList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
resourceVersion := opts.ResourceVersion
pred := opts.Predicate
if shouldDelegateList(opts) {
return c.storage.GetToList(ctx, key, opts, listObj)
}
return c.list(ctx, key, opts, listObj, false)
}

// If resourceVersion is specified, serve it from cache.
// It's guaranteed that the returned value is at least that
// fresh as the given resourceVersion.
listRV, err := c.versioner.ParseResourceVersion(resourceVersion)
if err != nil {
return err
}
// List implements storage.Interface.
func (c *Cacher) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
return c.list(ctx, key, opts, listObj, true)
}

if listRV == 0 && !c.ready.check() {
// If Cacher is not yet initialized and we don't require any specific
// minimal resource version, simply forward the request to storage.
func (c *Cacher) delegateList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object, recursive bool) error {
if !recursive {
return c.storage.GetToList(ctx, key, opts, listObj)
}
return c.storage.List(ctx, key, opts, listObj)
}

trace := utiltrace.New("cacher list", utiltrace.Field{"type", c.objectType.String()})
defer trace.LogIfLong(500 * time.Millisecond)

c.ready.wait()
trace.Step("Ready")

// List elements with at least 'listRV' from cache.
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
return err
}
listVal, err := conversion.EnforcePtr(listPtr)
if err != nil {
return err
}
if listVal.Kind() != reflect.Slice {
return fmt.Errorf("need a pointer to slice, got %v", listVal.Kind())
}
filter := filterWithAttrsFunction(key, pred)

obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRV, key, trace)
if err != nil {
return err
}
trace.Step("Got from cache")

if exists {
elem, ok := obj.(*storeElement)
if !ok {
return fmt.Errorf("non *storeElement returned from storage: %v", obj)
}
if filter(elem.Key, elem.Labels, elem.Fields) {
listVal.Set(reflect.Append(listVal, reflect.ValueOf(elem.Object).Elem()))
func (c *Cacher) listItems(listRV uint64, key string, pred storage.SelectionPredicate, trace *utiltrace.Trace, recursive bool) ([]interface{}, uint64, string, error) {
if !recursive {
obj, exists, readResourceVersion, err := c.watchCache.WaitUntilFreshAndGet(listRV, key, trace)
if err != nil {
return nil, 0, "", err
}
}
if c.versioner != nil {
if err := c.versioner.UpdateList(listObj, readResourceVersion, "", nil); err != nil {
return err
if exists {
return []interface{}{obj}, readResourceVersion, "", nil
}
return nil, readResourceVersion, "", nil
}
return nil
return c.watchCache.WaitUntilFreshAndList(listRV, pred.MatcherIndex(), trace)
}

// List implements storage.Interface.
func (c *Cacher) List(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error {
func (c *Cacher) list(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object, recursive bool) error {
resourceVersion := opts.ResourceVersion
pred := opts.Predicate
if shouldDelegateList(opts) {
return c.storage.List(ctx, key, opts, listObj)
return c.delegateList(ctx, key, opts, listObj, recursive)
}

// If resourceVersion is specified, serve it from cache.
Expand All @@ -688,7 +653,7 @@ func (c *Cacher) List(ctx context.Context, key string, opts storage.ListOptions,
if listRV == 0 && !c.ready.check() {
// If Cacher is not yet initialized and we don't require any specific
// minimal resource version, simply forward the request to storage.
return c.storage.List(ctx, key, opts, listObj)
return c.delegateList(ctx, key, opts, listObj, recursive)
}

trace := utiltrace.New("cacher list", utiltrace.Field{"type", c.objectType.String()})
Expand All @@ -711,7 +676,7 @@ func (c *Cacher) List(ctx context.Context, key string, opts storage.ListOptions,
}
filter := filterWithAttrsFunction(key, pred)

objs, readResourceVersion, indexUsed, err := c.watchCache.WaitUntilFreshAndList(listRV, pred.MatcherIndex(), trace)
objs, readResourceVersion, indexUsed, err := c.listItems(listRV, key, pred, trace, recursive)
if err != nil {
return err
}
Expand Down

0 comments on commit 0002a59

Please sign in to comment.