From 0083a3d0fcb8a73437544008ae1ba70df40a9bac Mon Sep 17 00:00:00 2001 From: Darren Shepherd Date: Mon, 28 Oct 2024 20:55:40 -0700 Subject: [PATCH] expose withwatch client --- pkg/backend/backend.go | 4 ++-- pkg/router/client.go | 7 +++++++ pkg/router/handler.go | 3 ++- pkg/router/types.go | 2 +- pkg/runtime/backend.go | 2 +- pkg/runtime/cached.go | 29 +++++++++++++++++++++++------ pkg/runtime/clients.go | 4 ++-- 7 files changed, 38 insertions(+), 13 deletions(-) diff --git a/pkg/backend/backend.go b/pkg/backend/backend.go index b2ddab2..266adef 100644 --- a/pkg/backend/backend.go +++ b/pkg/backend/backend.go @@ -17,14 +17,14 @@ type Trigger interface { } type Watcher interface { - Watch(ctx context.Context, gvk schema.GroupVersionKind, name string, cb Callback) error + Watcher(ctx context.Context, gvk schema.GroupVersionKind, name string, cb Callback) error } type Backend interface { Trigger CacheFactory Watcher - kclient.Client + kclient.WithWatch kclient.FieldIndexer Start(ctx context.Context) error diff --git a/pkg/router/client.go b/pkg/router/client.go index 95139e2..3b644dd 100644 --- a/pkg/router/client.go +++ b/pkg/router/client.go @@ -3,11 +3,13 @@ package router import ( "context" + "github.com/acorn-io/baaah/pkg/backend" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" kclient "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -17,11 +19,16 @@ type TriggerRegistry interface { } type client struct { + backend backend.Backend reader writer status } +func (c *client) Watch(ctx context.Context, list kclient.ObjectList, opts ...kclient.ListOption) (watch.Interface, error) { + return c.backend.Watch(ctx, list, opts...) +} + func (c *client) Scheme() *runtime.Scheme { return c.reader.client.Scheme() } diff --git a/pkg/router/handler.go b/pkg/router/handler.go index 1536704..1747ca8 100644 --- a/pkg/router/handler.go +++ b/pkg/router/handler.go @@ -139,6 +139,7 @@ func (m *HandlerSet) newRequestResponse(gvk schema.GroupVersionKind, key string, req := Request{ FromTrigger: trigger, Client: &client{ + backend: m.backend, reader: reader{ scheme: m.scheme, client: m.backend, @@ -179,7 +180,7 @@ func (m *HandlerSet) WatchGVK(gvks ...schema.GroupVersionKind) error { if m.watching[gvk] { continue } - if err := m.backend.Watch(m.ctx, gvk, m.name, m.onChange); err == nil { + if err := m.backend.Watcher(m.ctx, gvk, m.name, m.onChange); err == nil { m.watching[gvk] = true } else { watchErrs = append(watchErrs, err) diff --git a/pkg/router/types.go b/pkg/router/types.go index 56ebbd4..5083ff8 100644 --- a/pkg/router/types.go +++ b/pkg/router/types.go @@ -32,7 +32,7 @@ func (h HandlerFunc) Handle(req Request, resp Response) error { } type Request struct { - Client kclient.Client + Client kclient.WithWatch Object kclient.Object Ctx context.Context GVK schema.GroupVersionKind diff --git a/pkg/runtime/backend.go b/pkg/runtime/backend.go index ea5a4d3..65974bf 100644 --- a/pkg/runtime/backend.go +++ b/pkg/runtime/backend.go @@ -113,7 +113,7 @@ func (b *Backend) addIndexer(ctx context.Context, gvk schema.GroupVersionKind) e return cache.AddIndexers(indexers) } -func (b *Backend) Watch(ctx context.Context, gvk schema.GroupVersionKind, name string, cb backend.Callback) error { +func (b *Backend) Watcher(ctx context.Context, gvk schema.GroupVersionKind, name string, cb backend.Callback) error { c, err := b.cacheFactory.ForKind(gvk) if err != nil { return err diff --git a/pkg/runtime/cached.go b/pkg/runtime/cached.go index 99a765a..4178e98 100644 --- a/pkg/runtime/cached.go +++ b/pkg/runtime/cached.go @@ -2,14 +2,17 @@ package runtime import ( "context" + "errors" "strconv" "sync" "time" "github.com/acorn-io/baaah/pkg/uncached" + apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/watch" kclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/apiutil" ) @@ -29,7 +32,8 @@ type objectValue struct { } type cacheClient struct { - uncached, cached kclient.Client + uncached kclient.WithWatch + cached kclient.Client recent map[objectKey]objectValue recentLock sync.Mutex @@ -50,7 +54,7 @@ func newer(oldRV, newRV string) bool { return oldI < newI } -func newCacheClient(uncached, cached kclient.Client) *cacheClient { +func newCacheClient(uncached kclient.WithWatch, cached kclient.Client) *cacheClient { return &cacheClient{ uncached: uncached, cached: cached, @@ -115,14 +119,14 @@ func (c *cacheClient) Get(ctx context.Context, key kclient.ObjectKey, obj kclien return c.uncached.Get(ctx, key, u.Object, opts...) } - err := c.cached.Get(ctx, key, obj) - if err != nil { - return err + getErr := c.cached.Get(ctx, key, obj) + if getErr != nil && !apierrors.IsNotFound(getErr) { + return getErr } gvk, err := apiutil.GVKForObject(obj, c.Scheme()) if err != nil { - return err + return errors.Join(getErr, err) } cacheKey := objectKey{ @@ -134,6 +138,15 @@ func (c *cacheClient) Get(ctx context.Context, key kclient.ObjectKey, obj kclien c.recentLock.Lock() cachedObj, ok := c.recent[cacheKey] c.recentLock.Unlock() + + if apierrors.IsNotFound(getErr) { + if ok { + return CopyInto(obj, cachedObj.Object) + } else { + return getErr + } + } + if ok && newer(obj.GetResourceVersion(), cachedObj.Object.GetResourceVersion()) { return CopyInto(obj, cachedObj.Object) } @@ -209,6 +222,10 @@ func (c *cacheClient) SubResource(subResource string) kclient.SubResourceClient } } +func (c *cacheClient) Watch(ctx context.Context, obj kclient.ObjectList, opts ...kclient.ListOption) (watch.Interface, error) { + return c.uncached.Watch(ctx, obj, opts...) +} + func (c *cacheClient) Status() kclient.StatusWriter { return &subResourceClient{ c: c, diff --git a/pkg/runtime/clients.go b/pkg/runtime/clients.go index 6c451d0..accf3b4 100644 --- a/pkg/runtime/clients.go +++ b/pkg/runtime/clients.go @@ -34,7 +34,7 @@ func NewRuntimeWithConfig(cfg Config, scheme *runtime.Scheme) (*Runtime, error) } func NewRuntimeWithConfigs(defaultConfig Config, apiGroupConfigs map[string]Config, scheme *runtime.Scheme) (*Runtime, error) { - clients := make(map[string]client.Client, len(apiGroupConfigs)) + clients := make(map[string]client.WithWatch, len(apiGroupConfigs)) cachedClients := make(map[string]client.Client, len(apiGroupConfigs)) caches := make(map[string]cache.Cache, len(apiGroupConfigs)) @@ -54,7 +54,7 @@ func NewRuntimeWithConfigs(defaultConfig Config, apiGroupConfigs map[string]Conf return nil, err } - aggUncachedClient := multi.NewClient(uncachedClient, clients) + aggUncachedClient := multi.NewWithWatch(uncachedClient, clients) aggCachedClient := multi.NewClient(cachedClient, cachedClients) aggCache := multi.NewCache(scheme, theCache, caches)