diff --git a/cmd/flow-aggregator/flow-aggregator.go b/cmd/flow-aggregator/flow-aggregator.go index e156e42ec10..85886e702da 100644 --- a/cmd/flow-aggregator/flow-aggregator.go +++ b/cmd/flow-aggregator/flow-aggregator.go @@ -20,12 +20,9 @@ import ( "sync" "time" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" - coreinformers "k8s.io/client-go/informers/core/v1" + "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" - "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" aggregator "antrea.io/antrea/pkg/flowaggregator" @@ -33,7 +30,6 @@ import ( "antrea.io/antrea/pkg/log" "antrea.io/antrea/pkg/signals" "antrea.io/antrea/pkg/util/cipher" - "antrea.io/antrea/pkg/util/k8s" "antrea.io/antrea/pkg/util/podstore" "antrea.io/antrea/pkg/version" ) @@ -59,17 +55,9 @@ func run(configFile string) error { return fmt.Errorf("error when creating K8s client: %v", err) } - podInformer := coreinformers.NewFilteredPodInformer( - k8sClient, - metav1.NamespaceAll, - informerDefaultResync, - cache.Indexers{}, - func(options *metav1.ListOptions) { - options.FieldSelector = fields.OneTermEqualSelector("spec.hostNetwork", "false").String() - }, - ) - podInformer.SetTransform(k8s.NewTrimmer(k8s.TrimPod)) - podStore := podstore.NewPodStore(podInformer) + informerFactory := informers.NewSharedInformerFactory(k8sClient, informerDefaultResync) + podInformer := informerFactory.Core().V1().Pods() + podStore := podstore.NewPodStore(podInformer.Informer()) klog.InfoS("Retrieving Antrea cluster UUID") clusterUUID, err := aggregator.GetClusterUUID(ctx, k8sClient) @@ -109,7 +97,7 @@ func run(configFile string) error { } go apiServer.Run(ctx) - go podInformer.Run(stopCh) + informerFactory.Start(stopCh) <-stopCh klog.InfoS("Stopping Flow Aggregator") diff --git a/pkg/agent/cniserver/server.go b/pkg/agent/cniserver/server.go index 53b216d2a97..dc956438291 100644 --- a/pkg/agent/cniserver/server.go +++ b/pkg/agent/cniserver/server.go @@ -29,6 +29,7 @@ import ( "github.com/containernetworking/cni/pkg/version" "github.com/containernetworking/plugins/pkg/ip" "google.golang.org/grpc" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" clientset "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" @@ -763,12 +764,18 @@ func (s *CNIServer) interceptCheck(cniConfig *CNIConfig) (*cnipb.CniCmdResponse, // | Windows HostProcess Pod | true | true | No | Yes | func (s *CNIServer) reconcile() error { klog.InfoS("Starting reconciliation for CNI server") - pods, err := s.kubeClient.CoreV1().Pods("").List(context.TODO(), s.getPodsListOptions()) + podListOption := metav1.ListOptions{ + FieldSelector: fmt.Sprintf("spec.nodeName=%s", s.nodeConfig.Name), + // For performance reasons, use ResourceVersion="0" in the ListOptions to ensure the request is served from + // the watch cache in kube-apiserver. + ResourceVersion: "0", + } + pods, err := s.kubeClient.CoreV1().Pods("").List(context.TODO(), podListOption) if err != nil { return fmt.Errorf("failed to list Pods running on Node %s: %v", s.nodeConfig.Name, err) } - - return s.podConfigurator.reconcile(pods.Items, s.containerAccess, s.podNetworkWait, s.flowRestoreCompleteWait) + filteredPods := s.filterPodsForReconcile(pods) + return s.podConfigurator.reconcile(filteredPods, s.containerAccess, s.podNetworkWait, s.flowRestoreCompleteWait) } func init() { diff --git a/pkg/agent/cniserver/server_linux.go b/pkg/agent/cniserver/server_linux.go index 9f3f8db76b1..17c31da7602 100644 --- a/pkg/agent/cniserver/server_linux.go +++ b/pkg/agent/cniserver/server_linux.go @@ -15,10 +15,8 @@ package cniserver import ( - "fmt" - current "github.com/containernetworking/cni/pkg/types/100" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + corev1 "k8s.io/api/core/v1" ) // updateResultDNSConfig updates the DNS config from CNIConfig. @@ -54,12 +52,13 @@ func (c *CNIConfig) getInfraContainer() string { return c.ContainerId } -// getPodsListOptions returns the none host-network Pods running on the current Node. -func (s *CNIServer) getPodsListOptions() metav1.ListOptions { - return metav1.ListOptions{ - FieldSelector: fmt.Sprintf("spec.nodeName=%s,spec.hostNetwork=false", s.nodeConfig.Name), - // For performance reasons, use ResourceVersion="0" in the ListOptions to ensure the request is served from - // the watch cache in kube-apiserver. - ResourceVersion: "0", +// filterPodsForReconcile returns Pods that should be reconciled. +func (s *CNIServer) filterPodsForReconcile(pods *corev1.PodList) []corev1.Pod { + validPods := make([]corev1.Pod, 0) + for _, pod := range pods.Items { + if !pod.Spec.HostNetwork { + validPods = append(validPods, pod) + } } + return validPods } diff --git a/pkg/agent/cniserver/server_windows.go b/pkg/agent/cniserver/server_windows.go index b45b5587ca9..73f42a1c77e 100644 --- a/pkg/agent/cniserver/server_windows.go +++ b/pkg/agent/cniserver/server_windows.go @@ -22,7 +22,7 @@ import ( "strings" current "github.com/containernetworking/cni/pkg/types/100" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + corev1 "k8s.io/api/core/v1" "k8s.io/klog/v2" ) @@ -100,14 +100,7 @@ func (c *CNIConfig) getInfraContainer() string { return getInfraContainer(c.ContainerId, c.Netns) } -// getPodsListOptions returns the Pods running on the current Node. Note, the host-network Pods are not filtered -// out on Windows because they are also managed by antrea as long as "spec.SecurityContext.windowsOptions.hostProcess" -// is not configured. -func (s *CNIServer) getPodsListOptions() metav1.ListOptions { - return metav1.ListOptions{ - FieldSelector: fmt.Sprintf("spec.nodeName=%s", s.nodeConfig.Name), - // For performance reasons, use ResourceVersion="0" in the ListOptions to ensure the request is served from - // the watch cache in kube-apiserver. - ResourceVersion: "0", - } +// filterPodsForReconcile returns Pods that should be reconciled. +func (s *CNIServer) filterPodsForReconcile(pods *corev1.PodList) []corev1.Pod { + return pods.Items }