From 25cd27a316a4f464329de2159aa0bd7fbbfea141 Mon Sep 17 00:00:00 2001 From: Anna Kapuscinska Date: Thu, 29 Feb 2024 14:22:44 +0000 Subject: [PATCH] watcher: Refactor K8sWatcher This refactoring will make it easier to clean up unused code and to extend the watcher with additional informers if needed. Signed-off-by: Anna Kapuscinska --- pkg/watcher/fake.go | 10 +++ pkg/watcher/watcher.go | 135 +++++++++++++++++++++++++++-------------- 2 files changed, 100 insertions(+), 45 deletions(-) diff --git a/pkg/watcher/fake.go b/pkg/watcher/fake.go index 890a90814a4..97034617b13 100644 --- a/pkg/watcher/fake.go +++ b/pkg/watcher/fake.go @@ -7,6 +7,7 @@ import ( "fmt" corev1 "k8s.io/api/core/v1" + "k8s.io/client-go/tools/cache" ) // FakeK8sWatcher is used as an "empty" K8sResourceWatcher when --enable-k8s-api flag is not set. @@ -58,3 +59,12 @@ func (watcher *FakeK8sWatcher) AddService(service *corev1.Service) { func (watcher *FakeK8sWatcher) ClearAllServices() { watcher.services = nil } + +func (watcher *FakeK8sWatcher) AddInformers(_ internalSharedInformerFactory, _ ...internalInformer) { +} + +func (watcher *FakeK8sWatcher) GetInformer(_ string) cache.SharedIndexInformer { + return nil +} + +func (watcher *FakeK8sWatcher) Start() {} diff --git a/pkg/watcher/watcher.go b/pkg/watcher/watcher.go index ec869f1135c..500d4f5f6df 100644 --- a/pkg/watcher/watcher.go +++ b/pkg/watcher/watcher.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "os" + "reflect" "strings" "time" @@ -25,11 +26,14 @@ import ( ) const ( - containerIDLen = 15 - containerIdx = "containers-ids" - podIdx = "pod-ids" - serviceIPsIdx = "service-ips" - podInfoIPsIdx = "pod-info-ips" + containerIDLen = 15 + containerIdx = "containers-ids" + podIdx = "pod-ids" + serviceIPsIdx = "service-ips" + podInfoIPsIdx = "pod-info-ips" + podInformerName = "pod" + serviceInformerName = "service" + podInfoInformerName = "podInfo" ) var ( @@ -40,6 +44,10 @@ var ( // K8sResourceWatcher defines an interface for accessing various resources from Kubernetes API. type K8sResourceWatcher interface { + AddInformers(factory internalSharedInformerFactory, infs ...*internalInformer) + GetInformer(name string) cache.SharedIndexInformer + Start() + // Find a pod/container pair for the given container ID. FindContainer(containerID string) (*corev1.Pod, *corev1.ContainerStatus, bool) @@ -49,9 +57,8 @@ type K8sResourceWatcher interface { // K8sWatcher maintains a local cache of k8s resources. type K8sWatcher struct { - podInformer cache.SharedIndexInformer - serviceInformer cache.SharedIndexInformer - podInfoInformer cache.SharedIndexInformer + informers map[string]cache.SharedIndexInformer + startFunc func() } func podIndexFunc(obj interface{}) ([]string, error) { @@ -142,58 +149,61 @@ func NewK8sWatcherWithTetragonClient(k8sClient kubernetes.Interface, tetragonCli if nodeName == "" { logger.GetLogger().Warn("env var NODE_NAME not specified, K8s watcher will not work as expected") } + + k8sWatcher := &K8sWatcher{informers: make(map[string]cache.SharedIndexInformer)} + k8sInformerFactory := informers.NewSharedInformerFactoryWithOptions(k8sClient, stateSyncIntervalSec, informers.WithTweakListOptions(func(options *metav1.ListOptions) { // Watch local pods only. options.FieldSelector = "spec.nodeName=" + os.Getenv("NODE_NAME") })) podInformer := k8sInformerFactory.Core().V1().Pods().Informer() - - err := podInformer.AddIndexers(map[string]cache.IndexFunc{ - containerIdx: containerIndexFunc, - podIdx: podIndexFunc, + k8sWatcher.AddInformers(k8sInformerFactory, &internalInformer{ + name: podInformerName, + informer: podInformer, + indexer: map[string]cache.IndexFunc{ + containerIdx: containerIndexFunc, + podIdx: podIndexFunc, + }, }) - if err != nil { - // Panic during setup since this should never fail, if it fails is a - // developer mistake. - panic(err) - } // can't share the same informer factory as pods because the pod informer filters by spec.nodeName field. serviceInformerFactory := informers.NewSharedInformerFactory(k8sClient, stateSyncIntervalSec) serviceInformer := serviceInformerFactory.Core().V1().Services().Informer() - err = serviceInformer.AddIndexers(map[string]cache.IndexFunc{ - serviceIPsIdx: serviceIPIndexFunc, + k8sWatcher.AddInformers(serviceInformerFactory, &internalInformer{ + name: serviceInformerName, + informer: serviceInformer, + indexer: map[string]cache.IndexFunc{ + serviceIPsIdx: serviceIPIndexFunc, + }, }) - if err != nil { - // Panic during setup since this should never fail, if it fails is a - // developer mistake. - panic(err) - } podInfoInformerFactory := externalversions.NewSharedInformerFactory(tetragonClient, stateSyncIntervalSec) podInfoInformer := podInfoInformerFactory.Cilium().V1alpha1().PodInfo().Informer() - err = podInfoInformer.AddIndexers(map[string]cache.IndexFunc{ - podInfoIPsIdx: podInfoIPIndexFunc, + k8sWatcher.AddInformers(podInfoInformerFactory, &internalInformer{ + name: podInfoInformerName, + informer: podInfoInformer, + indexer: map[string]cache.IndexFunc{ + podInfoIPsIdx: podInfoIPIndexFunc, + }, }) - if err != nil { - // Panic during setup since this should never fail, if it fails is a - // developer mistake. - panic(err) - } podhooks.InstallHooks(podInformer) - k8sInformerFactory.Start(wait.NeverStop) - k8sInformerFactory.WaitForCacheSync(wait.NeverStop) - serviceInformerFactory.Start(wait.NeverStop) - serviceInformerFactory.WaitForCacheSync(wait.NeverStop) - podInfoInformerFactory.Start(wait.NeverStop) - podInfoInformerFactory.WaitForCacheSync(wait.NeverStop) - logger.GetLogger().WithField("num_pods", len(podInformer.GetStore().ListKeys())).Info("Initialized pod cache") - logger.GetLogger().WithField("num_services", len(serviceInformer.GetStore().ListKeys())).Info("Initialized service cache") - logger.GetLogger().WithField("num_pod_info", len(podInfoInformer.GetStore().ListKeys())).Info("Initialized pod info cache") - return &K8sWatcher{podInformer, serviceInformer, podInfoInformer} + k8sWatcher.Start() + + return k8sWatcher +} + +type internalSharedInformerFactory interface { + Start(stopCh <-chan struct{}) + WaitForCacheSync(stopCh <-chan struct{}) map[reflect.Type]bool +} + +type internalInformer struct { + name string + informer cache.SharedIndexInformer + indexer cache.Indexers } // FindContainer implements K8sResourceWatcher.FindContainer. @@ -202,20 +212,55 @@ func (watcher *K8sWatcher) FindContainer(containerID string) (*corev1.Pod, *core if len(containerID) > containerIDLen { indexedContainerID = containerID[:containerIDLen] } - objs, err := watcher.podInformer.GetIndexer().ByIndex(containerIdx, indexedContainerID) + objs, err := watcher.GetInformer(podInformerName).GetIndexer().ByIndex(containerIdx, indexedContainerID) if err != nil { return nil, nil, false } // If we can't find any pod indexed then fall back to the entire pod list. // If we find more than 1 pods indexed also fall back to the entire pod list. if len(objs) != 1 { - return findContainer(containerID, watcher.podInformer.GetStore().List()) + return findContainer(containerID, watcher.GetInformer(podInfoInformerName).GetStore().List()) } return findContainer(containerID, objs) } +func (watcher *K8sWatcher) AddInformers(factory internalSharedInformerFactory, infs ...*internalInformer) { + for _, inf := range infs { + watcher.informers[inf.name] = inf.informer + oldStart := watcher.startFunc + watcher.startFunc = func() { + err := inf.informer.AddIndexers(inf.indexer) + if err != nil { + // Panic during setup since this should never fail, if it fails is a + // developer mistake. + panic(err) + } + oldStart() + } + } + oldStart := watcher.startFunc + watcher.startFunc = func() { + factory.Start(wait.NeverStop) + factory.WaitForCacheSync(wait.NeverStop) + for name, informer := range watcher.informers { + logger.GetLogger().WithField("informer", name).WithField("count", len(informer.GetStore().ListKeys())).Info("Initialized informer cache") + } + oldStart() + } +} + +func (watcher *K8sWatcher) GetInformer(name string) cache.SharedIndexInformer { + return watcher.informers[name] +} + +func (watcher *K8sWatcher) Start() { + if watcher.startFunc != nil { + watcher.startFunc() + } +} + func (watcher *K8sWatcher) FindPod(podID string) (*corev1.Pod, error) { - objs, err := watcher.podInformer.GetIndexer().ByIndex(podIdx, podID) + objs, err := watcher.GetInformer(podInformerName).GetIndexer().ByIndex(podIdx, podID) if err != nil { return nil, fmt.Errorf("watcher returned: %w", err) } @@ -226,7 +271,7 @@ func (watcher *K8sWatcher) FindPod(podID string) (*corev1.Pod, error) { return nil, fmt.Errorf("unexpected type %t", objs[0]) } - allPods := watcher.podInformer.GetStore().List() + allPods := watcher.GetInformer(podInformerName).GetStore().List() if pod, ok := findPod(allPods); ok { return pod, nil }