Skip to content

Commit

Permalink
watcher: Refactor K8sWatcher
Browse files Browse the repository at this point in the history
This refactoring will make it easier to clean up unused code and to extend the
watcher with additional informers if needed.

Signed-off-by: Anna Kapuscinska <anna@isovalent.com>
  • Loading branch information
lambdanis committed Feb 29, 2024
1 parent 5a74a05 commit 25cd27a
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 45 deletions.
10 changes: 10 additions & 0 deletions pkg/watcher/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"fmt"

corev1 "k8s.io/api/core/v1"
"k8s.io/client-go/tools/cache"
)

// FakeK8sWatcher is used as an "empty" K8sResourceWatcher when --enable-k8s-api flag is not set.
Expand Down Expand Up @@ -58,3 +59,12 @@ func (watcher *FakeK8sWatcher) AddService(service *corev1.Service) {
func (watcher *FakeK8sWatcher) ClearAllServices() {
watcher.services = nil
}

func (watcher *FakeK8sWatcher) AddInformers(_ internalSharedInformerFactory, _ ...internalInformer) {
}

func (watcher *FakeK8sWatcher) GetInformer(_ string) cache.SharedIndexInformer {
return nil
}

func (watcher *FakeK8sWatcher) Start() {}
135 changes: 90 additions & 45 deletions pkg/watcher/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"os"
"reflect"
"strings"
"time"

Expand All @@ -25,11 +26,14 @@ import (
)

const (
containerIDLen = 15
containerIdx = "containers-ids"
podIdx = "pod-ids"
serviceIPsIdx = "service-ips"
podInfoIPsIdx = "pod-info-ips"
containerIDLen = 15
containerIdx = "containers-ids"
podIdx = "pod-ids"
serviceIPsIdx = "service-ips"
podInfoIPsIdx = "pod-info-ips"
podInformerName = "pod"
serviceInformerName = "service"
podInfoInformerName = "podInfo"
)

var (
Expand All @@ -40,6 +44,10 @@ var (

// K8sResourceWatcher defines an interface for accessing various resources from Kubernetes API.
type K8sResourceWatcher interface {
AddInformers(factory internalSharedInformerFactory, infs ...*internalInformer)
GetInformer(name string) cache.SharedIndexInformer
Start()

// Find a pod/container pair for the given container ID.
FindContainer(containerID string) (*corev1.Pod, *corev1.ContainerStatus, bool)

Expand All @@ -49,9 +57,8 @@ type K8sResourceWatcher interface {

// K8sWatcher maintains a local cache of k8s resources.
type K8sWatcher struct {
podInformer cache.SharedIndexInformer
serviceInformer cache.SharedIndexInformer
podInfoInformer cache.SharedIndexInformer
informers map[string]cache.SharedIndexInformer
startFunc func()
}

func podIndexFunc(obj interface{}) ([]string, error) {
Expand Down Expand Up @@ -142,58 +149,61 @@ func NewK8sWatcherWithTetragonClient(k8sClient kubernetes.Interface, tetragonCli
if nodeName == "" {
logger.GetLogger().Warn("env var NODE_NAME not specified, K8s watcher will not work as expected")
}

k8sWatcher := &K8sWatcher{informers: make(map[string]cache.SharedIndexInformer)}

k8sInformerFactory := informers.NewSharedInformerFactoryWithOptions(k8sClient, stateSyncIntervalSec,
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
// Watch local pods only.
options.FieldSelector = "spec.nodeName=" + os.Getenv("NODE_NAME")
}))
podInformer := k8sInformerFactory.Core().V1().Pods().Informer()

err := podInformer.AddIndexers(map[string]cache.IndexFunc{
containerIdx: containerIndexFunc,
podIdx: podIndexFunc,
k8sWatcher.AddInformers(k8sInformerFactory, &internalInformer{
name: podInformerName,
informer: podInformer,
indexer: map[string]cache.IndexFunc{
containerIdx: containerIndexFunc,
podIdx: podIndexFunc,
},
})
if err != nil {
// Panic during setup since this should never fail, if it fails is a
// developer mistake.
panic(err)
}

// can't share the same informer factory as pods because the pod informer filters by spec.nodeName field.
serviceInformerFactory := informers.NewSharedInformerFactory(k8sClient, stateSyncIntervalSec)
serviceInformer := serviceInformerFactory.Core().V1().Services().Informer()
err = serviceInformer.AddIndexers(map[string]cache.IndexFunc{
serviceIPsIdx: serviceIPIndexFunc,
k8sWatcher.AddInformers(serviceInformerFactory, &internalInformer{
name: serviceInformerName,
informer: serviceInformer,
indexer: map[string]cache.IndexFunc{
serviceIPsIdx: serviceIPIndexFunc,
},
})
if err != nil {
// Panic during setup since this should never fail, if it fails is a
// developer mistake.
panic(err)
}

podInfoInformerFactory := externalversions.NewSharedInformerFactory(tetragonClient, stateSyncIntervalSec)
podInfoInformer := podInfoInformerFactory.Cilium().V1alpha1().PodInfo().Informer()
err = podInfoInformer.AddIndexers(map[string]cache.IndexFunc{
podInfoIPsIdx: podInfoIPIndexFunc,
k8sWatcher.AddInformers(podInfoInformerFactory, &internalInformer{
name: podInfoInformerName,
informer: podInfoInformer,
indexer: map[string]cache.IndexFunc{
podInfoIPsIdx: podInfoIPIndexFunc,
},
})
if err != nil {
// Panic during setup since this should never fail, if it fails is a
// developer mistake.
panic(err)
}

podhooks.InstallHooks(podInformer)

k8sInformerFactory.Start(wait.NeverStop)
k8sInformerFactory.WaitForCacheSync(wait.NeverStop)
serviceInformerFactory.Start(wait.NeverStop)
serviceInformerFactory.WaitForCacheSync(wait.NeverStop)
podInfoInformerFactory.Start(wait.NeverStop)
podInfoInformerFactory.WaitForCacheSync(wait.NeverStop)
logger.GetLogger().WithField("num_pods", len(podInformer.GetStore().ListKeys())).Info("Initialized pod cache")
logger.GetLogger().WithField("num_services", len(serviceInformer.GetStore().ListKeys())).Info("Initialized service cache")
logger.GetLogger().WithField("num_pod_info", len(podInfoInformer.GetStore().ListKeys())).Info("Initialized pod info cache")
return &K8sWatcher{podInformer, serviceInformer, podInfoInformer}
k8sWatcher.Start()

return k8sWatcher
}

type internalSharedInformerFactory interface {
Start(stopCh <-chan struct{})
WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool
}

type internalInformer struct {
name string
informer cache.SharedIndexInformer
indexer cache.Indexers
}

// FindContainer implements K8sResourceWatcher.FindContainer.
Expand All @@ -202,20 +212,55 @@ func (watcher *K8sWatcher) FindContainer(containerID string) (*corev1.Pod, *core
if len(containerID) > containerIDLen {
indexedContainerID = containerID[:containerIDLen]
}
objs, err := watcher.podInformer.GetIndexer().ByIndex(containerIdx, indexedContainerID)
objs, err := watcher.GetInformer(podInformerName).GetIndexer().ByIndex(containerIdx, indexedContainerID)
if err != nil {
return nil, nil, false
}
// If we can't find any pod indexed then fall back to the entire pod list.
// If we find more than 1 pods indexed also fall back to the entire pod list.
if len(objs) != 1 {
return findContainer(containerID, watcher.podInformer.GetStore().List())
return findContainer(containerID, watcher.GetInformer(podInfoInformerName).GetStore().List())
}
return findContainer(containerID, objs)
}

func (watcher *K8sWatcher) AddInformers(factory internalSharedInformerFactory, infs ...*internalInformer) {
for _, inf := range infs {
watcher.informers[inf.name] = inf.informer
oldStart := watcher.startFunc
watcher.startFunc = func() {
err := inf.informer.AddIndexers(inf.indexer)
if err != nil {
// Panic during setup since this should never fail, if it fails is a
// developer mistake.
panic(err)
}
oldStart()
}
}
oldStart := watcher.startFunc
watcher.startFunc = func() {
factory.Start(wait.NeverStop)
factory.WaitForCacheSync(wait.NeverStop)
for name, informer := range watcher.informers {
logger.GetLogger().WithField("informer", name).WithField("count", len(informer.GetStore().ListKeys())).Info("Initialized informer cache")
}
oldStart()
}
}

func (watcher *K8sWatcher) GetInformer(name string) cache.SharedIndexInformer {
return watcher.informers[name]
}

func (watcher *K8sWatcher) Start() {
if watcher.startFunc != nil {
watcher.startFunc()
}
}

func (watcher *K8sWatcher) FindPod(podID string) (*corev1.Pod, error) {
objs, err := watcher.podInformer.GetIndexer().ByIndex(podIdx, podID)
objs, err := watcher.GetInformer(podInformerName).GetIndexer().ByIndex(podIdx, podID)
if err != nil {
return nil, fmt.Errorf("watcher returned: %w", err)
}
Expand All @@ -226,7 +271,7 @@ func (watcher *K8sWatcher) FindPod(podID string) (*corev1.Pod, error) {
return nil, fmt.Errorf("unexpected type %t", objs[0])
}

allPods := watcher.podInformer.GetStore().List()
allPods := watcher.GetInformer(podInformerName).GetStore().List()
if pod, ok := findPod(allPods); ok {
return pod, nil
}
Expand Down

0 comments on commit 25cd27a

Please sign in to comment.