Skip to content
This repository has been archived by the owner on Nov 3, 2024. It is now read-only.

Commit

Permalink
expose withwatch client
Browse files Browse the repository at this point in the history
  • Loading branch information
ibuildthecloud committed Oct 29, 2024
1 parent ffe9aa2 commit 0083a3d
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 13 deletions.
4 changes: 2 additions & 2 deletions pkg/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ type Trigger interface {
}

type Watcher interface {
Watch(ctx context.Context, gvk schema.GroupVersionKind, name string, cb Callback) error
Watcher(ctx context.Context, gvk schema.GroupVersionKind, name string, cb Callback) error
}

type Backend interface {
Trigger
CacheFactory
Watcher
kclient.Client
kclient.WithWatch
kclient.FieldIndexer

Start(ctx context.Context) error
Expand Down
7 changes: 7 additions & 0 deletions pkg/router/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ package router
import (
"context"

"github.com/acorn-io/baaah/pkg/backend"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
kclient "sigs.k8s.io/controller-runtime/pkg/client"
)

Expand All @@ -17,11 +19,16 @@ type TriggerRegistry interface {
}

type client struct {
backend backend.Backend
reader
writer
status
}

func (c *client) Watch(ctx context.Context, list kclient.ObjectList, opts ...kclient.ListOption) (watch.Interface, error) {
return c.backend.Watch(ctx, list, opts...)
}

func (c *client) Scheme() *runtime.Scheme {
return c.reader.client.Scheme()
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/router/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ func (m *HandlerSet) newRequestResponse(gvk schema.GroupVersionKind, key string,
req := Request{
FromTrigger: trigger,
Client: &client{
backend: m.backend,
reader: reader{
scheme: m.scheme,
client: m.backend,
Expand Down Expand Up @@ -179,7 +180,7 @@ func (m *HandlerSet) WatchGVK(gvks ...schema.GroupVersionKind) error {
if m.watching[gvk] {
continue
}
if err := m.backend.Watch(m.ctx, gvk, m.name, m.onChange); err == nil {
if err := m.backend.Watcher(m.ctx, gvk, m.name, m.onChange); err == nil {
m.watching[gvk] = true
} else {
watchErrs = append(watchErrs, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/router/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func (h HandlerFunc) Handle(req Request, resp Response) error {
}

type Request struct {
Client kclient.Client
Client kclient.WithWatch
Object kclient.Object
Ctx context.Context
GVK schema.GroupVersionKind
Expand Down
2 changes: 1 addition & 1 deletion pkg/runtime/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ func (b *Backend) addIndexer(ctx context.Context, gvk schema.GroupVersionKind) e
return cache.AddIndexers(indexers)
}

func (b *Backend) Watch(ctx context.Context, gvk schema.GroupVersionKind, name string, cb backend.Callback) error {
func (b *Backend) Watcher(ctx context.Context, gvk schema.GroupVersionKind, name string, cb backend.Callback) error {
c, err := b.cacheFactory.ForKind(gvk)
if err != nil {
return err
Expand Down
29 changes: 23 additions & 6 deletions pkg/runtime/cached.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ package runtime

import (
"context"
"errors"
"strconv"
"sync"
"time"

"github.com/acorn-io/baaah/pkg/uncached"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
kclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/apiutil"
)
Expand All @@ -29,7 +32,8 @@ type objectValue struct {
}

type cacheClient struct {
uncached, cached kclient.Client
uncached kclient.WithWatch
cached kclient.Client

recent map[objectKey]objectValue
recentLock sync.Mutex
Expand All @@ -50,7 +54,7 @@ func newer(oldRV, newRV string) bool {
return oldI < newI
}

func newCacheClient(uncached, cached kclient.Client) *cacheClient {
func newCacheClient(uncached kclient.WithWatch, cached kclient.Client) *cacheClient {
return &cacheClient{
uncached: uncached,
cached: cached,
Expand Down Expand Up @@ -115,14 +119,14 @@ func (c *cacheClient) Get(ctx context.Context, key kclient.ObjectKey, obj kclien
return c.uncached.Get(ctx, key, u.Object, opts...)
}

err := c.cached.Get(ctx, key, obj)
if err != nil {
return err
getErr := c.cached.Get(ctx, key, obj)
if getErr != nil && !apierrors.IsNotFound(getErr) {
return getErr
}

gvk, err := apiutil.GVKForObject(obj, c.Scheme())
if err != nil {
return err
return errors.Join(getErr, err)
}

cacheKey := objectKey{
Expand All @@ -134,6 +138,15 @@ func (c *cacheClient) Get(ctx context.Context, key kclient.ObjectKey, obj kclien
c.recentLock.Lock()
cachedObj, ok := c.recent[cacheKey]
c.recentLock.Unlock()

if apierrors.IsNotFound(getErr) {
if ok {
return CopyInto(obj, cachedObj.Object)
} else {
return getErr
}
}

if ok && newer(obj.GetResourceVersion(), cachedObj.Object.GetResourceVersion()) {
return CopyInto(obj, cachedObj.Object)
}
Expand Down Expand Up @@ -209,6 +222,10 @@ func (c *cacheClient) SubResource(subResource string) kclient.SubResourceClient
}
}

func (c *cacheClient) Watch(ctx context.Context, obj kclient.ObjectList, opts ...kclient.ListOption) (watch.Interface, error) {
return c.uncached.Watch(ctx, obj, opts...)
}

func (c *cacheClient) Status() kclient.StatusWriter {
return &subResourceClient{
c: c,
Expand Down
4 changes: 2 additions & 2 deletions pkg/runtime/clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func NewRuntimeWithConfig(cfg Config, scheme *runtime.Scheme) (*Runtime, error)
}

func NewRuntimeWithConfigs(defaultConfig Config, apiGroupConfigs map[string]Config, scheme *runtime.Scheme) (*Runtime, error) {
clients := make(map[string]client.Client, len(apiGroupConfigs))
clients := make(map[string]client.WithWatch, len(apiGroupConfigs))
cachedClients := make(map[string]client.Client, len(apiGroupConfigs))
caches := make(map[string]cache.Cache, len(apiGroupConfigs))

Expand All @@ -54,7 +54,7 @@ func NewRuntimeWithConfigs(defaultConfig Config, apiGroupConfigs map[string]Conf
return nil, err
}

aggUncachedClient := multi.NewClient(uncachedClient, clients)
aggUncachedClient := multi.NewWithWatch(uncachedClient, clients)
aggCachedClient := multi.NewClient(cachedClient, cachedClients)
aggCache := multi.NewCache(scheme, theCache, caches)

Expand Down

0 comments on commit 0083a3d

Please sign in to comment.