From 83b9df2d87c6bb96841c78adec0d9c775cd33190 Mon Sep 17 00:00:00 2001 From: shentiecheng Date: Thu, 9 Jan 2025 20:28:36 +0800 Subject: [PATCH] =?UTF-8?q?feat=EF=BC=9Aadd=20resourceVersion=20for=20pod?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../hpaaggregator/aggregation/forward/pod.go | 84 +--- .../aggregation/forward/pod_test.go | 277 ------------ .../hpaaggregator/aggregation/rest.go | 6 +- .../hpaaggregator/metrics/resource/pod.go | 25 +- .../hpaaggregator/metrics/resource_metrics.go | 4 +- .../global_resource_version.go | 137 ++++++ pkg/util/aggregatedlister/interface.go | 33 ++ pkg/util/aggregatedlister/pod.go | 117 +++--- pkg/util/aggregatedlister/pod_test.go | 395 ------------------ 9 files changed, 276 insertions(+), 802 deletions(-) delete mode 100644 pkg/registry/hpaaggregator/aggregation/forward/pod_test.go create mode 100644 pkg/util/aggregatedlister/global_resource_version.go create mode 100644 pkg/util/aggregatedlister/interface.go delete mode 100644 pkg/util/aggregatedlister/pod_test.go diff --git a/pkg/registry/hpaaggregator/aggregation/forward/pod.go b/pkg/registry/hpaaggregator/aggregation/forward/pod.go index f6ed9252..8664251b 100644 --- a/pkg/registry/hpaaggregator/aggregation/forward/pod.go +++ b/pkg/registry/hpaaggregator/aggregation/forward/pod.go @@ -20,24 +20,21 @@ import ( "context" "fmt" "net/http" - "strconv" "sync" "time" + "github.com/kubewharf/kubeadmiral/pkg/util/aggregatedlister" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/endpoints/handlers" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" - "k8s.io/apiserver/pkg/registry/generic" "k8s.io/apiserver/pkg/registry/rest" - "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" "k8s.io/utils/pointer" @@ -51,7 +48,7 @@ type PodHandler interface { } type PodREST struct { - podLister cache.GenericLister + podLister aggregatedlister.AggregatedLister federatedInformerManager informermanager.FederatedInformerManager minRequestTimeout time.Duration } @@ -65,7 +62,7 @@ var ( func NewPodREST( f informermanager.FederatedInformerManager, - podLister cache.GenericLister, + podLister aggregatedlister.AggregatedLister, minRequestTimeout time.Duration, ) *PodREST { return &PodREST{ @@ -92,7 +89,7 @@ func (p *PodREST) Handler(requestInfo *genericapirequest.RequestInfo) (http.Hand // Get ... func (p *PodREST) Get(ctx context.Context, name string, opts *metav1.GetOptions) (runtime.Object, error) { namespace := genericapirequest.NamespaceValue(ctx) - obj, err := p.podLister.ByNamespace(namespace).Get(name) + obj, err := p.podLister.ByNamespace(namespace).Get(ctx, name, *opts) if err != nil { if apierrors.IsNotFound(err) { // return not-found errors directly @@ -110,24 +107,17 @@ func (p *PodREST) NewList() runtime.Object { } func (p *PodREST) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) { - label := labels.Everything() - if options != nil && options.LabelSelector != nil { - label = options.LabelSelector - } - namespace := genericapirequest.NamespaceValue(ctx) - objs, err := p.podLister.ByNamespace(namespace).List(label) + objs, err := p.podLister.ByNamespace(namespace).List(ctx, metav1.ListOptions{ + LabelSelector: options.LabelSelector.String(), + FieldSelector: options.FieldSelector.String(), + ResourceVersion: options.ResourceVersion, + }) if err != nil { - klog.ErrorS(err, "Failed listing pods", "labelSelector", label, "namespace", klog.KRef("", namespace)) + klog.ErrorS(err, "Failed listing pods", "namespace", klog.KRef("", namespace)) return nil, fmt.Errorf("failed listing pods: %w", err) } - - field := fields.Everything() - if options != nil && options.FieldSelector != nil { - field = options.FieldSelector - } - pods := convertAndFilterPodObject(objs, field) - return &corev1.PodList{Items: pods}, nil + return objs, nil } func (p *PodREST) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) { @@ -135,6 +125,8 @@ func (p *PodREST) ConvertToTable(ctx context.Context, object runtime.Object, tab } func (p *PodREST) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) { + grv := aggregatedlister.NewGlobalResourceVersionFromString(options.ResourceVersion) + retGrv := grv.Clone() label := labels.Everything() if options != nil && options.LabelSelector != nil { label = options.LabelSelector @@ -166,9 +158,10 @@ func (p *PodREST) Watch(ctx context.Context, options *metainternalversion.ListOp continue } watcher, err := client.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{ - LabelSelector: label.String(), - FieldSelector: options.FieldSelector.String(), - TimeoutSeconds: pointer.Int64(1200), + LabelSelector: label.String(), + FieldSelector: options.FieldSelector.String(), + TimeoutSeconds: pointer.Int64(1200), + ResourceVersion: grv.Get(clusters[i].Name), }) if err != nil { logger.Error(err, "Failed watching pods") @@ -197,6 +190,8 @@ func (p *PodREST) Watch(ctx context.Context, options *metainternalversion.ListOp } if pod, ok := event.Object.(*corev1.Pod); ok { clusterobject.MakePodUnique(pod, cluster) + retGrv.Set(cluster, pod.ResourceVersion) + pod.SetResourceVersion(retGrv.String()) event.Object = pod } @@ -211,44 +206,3 @@ func (p *PodREST) Watch(ctx context.Context, options *metainternalversion.ListOp } return proxyWatcher, nil } - -func convertAndFilterPodObject(objs []runtime.Object, selector fields.Selector) []corev1.Pod { - newObjs := make([]corev1.Pod, 0, len(objs)) - for _, obj := range objs { - pod, ok := obj.(*corev1.Pod) - if !ok { - continue - } - fields := ToSelectableFields(pod) - if !selector.Matches(fields) { - continue - } - - newObjs = append(newObjs, *pod) - } - return newObjs -} - -// ToSelectableFields returns a field set that represents the object -// TODO: fields are not labels, and the validation rules for them do not apply. -func ToSelectableFields(pod *corev1.Pod) fields.Set { - // The purpose of allocation with a given number of elements is to reduce - // amount of allocations needed to create the fields.Set. If you add any - // field here or the number of object-meta related fields changes, this should - // be adjusted. - podSpecificFieldsSet := make(fields.Set, 10) - podSpecificFieldsSet["spec.nodeName"] = pod.Spec.NodeName - podSpecificFieldsSet["spec.restartPolicy"] = string(pod.Spec.RestartPolicy) - podSpecificFieldsSet["spec.schedulerName"] = pod.Spec.SchedulerName - podSpecificFieldsSet["spec.serviceAccountName"] = pod.Spec.ServiceAccountName - podSpecificFieldsSet["spec.hostNetwork"] = strconv.FormatBool(pod.Spec.HostNetwork) - podSpecificFieldsSet["status.phase"] = string(pod.Status.Phase) - // TODO: add podIPs as a downward API value(s) with proper format - podIP := "" - if len(pod.Status.PodIPs) > 0 { - podIP = pod.Status.PodIPs[0].IP - } - podSpecificFieldsSet["status.podIP"] = podIP - podSpecificFieldsSet["status.nominatedNodeName"] = pod.Status.NominatedNodeName - return generic.AddObjectMetaFieldsSet(podSpecificFieldsSet, &pod.ObjectMeta, true) -} diff --git a/pkg/registry/hpaaggregator/aggregation/forward/pod_test.go b/pkg/registry/hpaaggregator/aggregation/forward/pod_test.go deleted file mode 100644 index 5a75d65f..00000000 --- a/pkg/registry/hpaaggregator/aggregation/forward/pod_test.go +++ /dev/null @@ -1,277 +0,0 @@ -/* -Copyright 2023 The KubeAdmiral Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package forward - -import ( - "context" - "errors" - "testing" - - corev1 "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/api/equality" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/client-go/tools/cache" -) - -func TestPodREST_convertAndFilterPodObject(t *testing.T) { - p1 := newPod("default", "test") - - type args struct { - objs []runtime.Object - selector fields.Selector - } - tests := []struct { - name string - args args - want []corev1.Pod - }{ - { - name: "1 pod", - args: args{ - objs: []runtime.Object{&p1}, - selector: fields.Everything(), - }, - want: []corev1.Pod{p1}, - }, - { - name: "2 obj, 1 pod", - args: args{ - objs: []runtime.Object{&corev1.Node{}, &p1}, - selector: fields.Everything(), - }, - want: []corev1.Pod{p1}, - }, - { - name: "1 pod, with selector", - args: args{ - objs: []runtime.Object{&corev1.Node{}, &p1}, - selector: fields.ParseSelectorOrDie("metadata.name=test"), - }, - want: []corev1.Pod{p1}, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - got := convertAndFilterPodObject(tt.args.objs, tt.args.selector) - if !equality.Semantic.DeepEqual(got, tt.want) { - t.Errorf("convertAndFilterPodObject() = %+v, want %+v", got, tt.want) - } - }) - } -} - -func newPod(ns, name string) corev1.Pod { - p1 := corev1.Pod{ - TypeMeta: metav1.TypeMeta{ - Kind: "Pod", - APIVersion: corev1.SchemeGroupVersion.String(), - }, - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: ns, - Labels: map[string]string{ns: name}, - }, - Spec: corev1.PodSpec{SecurityContext: &corev1.PodSecurityContext{}}, // used for convert - } - - return p1 -} - -// fakes both PodLister and PodNamespaceLister at once -type fakePodLister struct { - data []*corev1.Pod - err error -} - -func (pl fakePodLister) List(selector labels.Selector) (ret []runtime.Object, err error) { - if pl.err != nil { - return nil, pl.err - } - res := []runtime.Object{} - for _, pod := range pl.data { - if selector.Matches(labels.Set(pod.Labels)) { - res = append(res, pod) - } - } - return res, nil -} - -func (pl fakePodLister) Get(name string) (runtime.Object, error) { - if pl.err != nil { - return nil, pl.err - } - for _, pod := range pl.data { - if pod.Name == name { - return pod, nil - } - } - return nil, nil -} - -func (pl fakePodLister) ByNamespace(namespace string) cache.GenericNamespaceLister { - return pl -} - -//nolint:containedctx -func TestPodREST_Get(t *testing.T) { - p1 := newPod("default", "test") - - type args struct { - ctx context.Context - name string - opts *metav1.GetOptions - } - tests := []struct { - name string - podLister cache.GenericLister - args args - want runtime.Object - wantErr bool - }{ - { - name: "get pod", - podLister: fakePodLister{data: []*corev1.Pod{&p1}}, - args: args{ - ctx: context.Background(), - name: "test", - opts: nil, - }, - want: &p1, - wantErr: false, - }, - { - name: "get pod failed", - podLister: fakePodLister{err: errors.New("fake")}, - args: args{ - ctx: context.Background(), - name: "test", - opts: nil, - }, - want: nil, - wantErr: true, - }, - { - name: "pod not found", - podLister: fakePodLister{err: apierrors.NewNotFound(schema.GroupResource{}, "")}, - args: args{ - ctx: context.Background(), - name: "test", - opts: nil, - }, - want: nil, - wantErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - p := &PodREST{ - podLister: tt.podLister, - } - got, err := p.Get(tt.args.ctx, tt.args.name, tt.args.opts) - if (err != nil) != tt.wantErr { - t.Errorf("Get() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !equality.Semantic.DeepEqual(got, tt.want) { - t.Errorf("Get() got = %v, want %v", got, tt.want) - } - }) - } -} - -//nolint:containedctx -func TestPodREST_List(t *testing.T) { - p1 := newPod("default", "test") - - type args struct { - ctx context.Context - options *metainternalversion.ListOptions - } - tests := []struct { - name string - podLister cache.GenericLister - args args - want runtime.Object - wantErr bool - }{ - { - name: "list pod", - podLister: fakePodLister{data: []*corev1.Pod{&p1}}, - args: args{ - ctx: context.Background(), - options: nil, - }, - want: &corev1.PodList{Items: []corev1.Pod{p1}}, - wantErr: false, - }, - { - name: "list pod with label selector", - podLister: fakePodLister{data: []*corev1.Pod{&p1}}, - args: args{ - ctx: context.Background(), - options: &metainternalversion.ListOptions{ - LabelSelector: labels.SelectorFromSet(map[string]string{"default": "test"}), - }, - }, - want: &corev1.PodList{Items: []corev1.Pod{p1}}, - wantErr: false, - }, - { - name: "list pod with field selector", - podLister: fakePodLister{data: []*corev1.Pod{&p1}}, - args: args{ - ctx: context.Background(), - options: &metainternalversion.ListOptions{ - FieldSelector: fields.ParseSelectorOrDie("metadata.name=test"), - }, - }, - want: &corev1.PodList{Items: []corev1.Pod{p1}}, - wantErr: false, - }, - { - name: "list pod failed", - podLister: fakePodLister{err: errors.New("fake")}, - args: args{ - ctx: context.Background(), - options: nil, - }, - want: nil, - wantErr: true, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - p := &PodREST{ - podLister: tt.podLister, - } - got, err := p.List(tt.args.ctx, tt.args.options) - if (err != nil) != tt.wantErr { - t.Errorf("List() error = %v, wantErr %v", err, tt.wantErr) - return - } - if !equality.Semantic.DeepEqual(got, tt.want) { - t.Errorf("List() got = %v, want %v", got, tt.want) - } - }) - } -} diff --git a/pkg/registry/hpaaggregator/aggregation/rest.go b/pkg/registry/hpaaggregator/aggregation/rest.go index 463a77cb..a5aaf2df 100644 --- a/pkg/registry/hpaaggregator/aggregation/rest.go +++ b/pkg/registry/hpaaggregator/aggregation/rest.go @@ -24,6 +24,7 @@ import ( "path" "time" + "github.com/kubewharf/kubeadmiral/pkg/util/aggregatedlister" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -31,7 +32,6 @@ import ( genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" restclient "k8s.io/client-go/rest" - "k8s.io/client-go/tools/cache" "k8s.io/klog/v2" "github.com/kubewharf/kubeadmiral/pkg/apis/hpaaggregator/v1alpha1" @@ -46,7 +46,7 @@ type REST struct { resolver genericapirequest.RequestInfoResolver - podLister cache.GenericLister + podLister aggregatedlister.AggregatedLister podHandler forward.PodHandler forwardHandler forward.ForwardHandler @@ -65,7 +65,7 @@ var proxyMethods = []string{"GET", "POST", "PUT", "PATCH", "DELETE", "HEAD", "OP // NewREST returns a RESTStorage object that will work against API services. func NewREST( federatedInformerManager informermanager.FederatedInformerManager, - podLister cache.GenericLister, + podLister aggregatedlister.AggregatedLister, config *restclient.Config, minRequestTimeout time.Duration, logger klog.Logger, diff --git a/pkg/registry/hpaaggregator/metrics/resource/pod.go b/pkg/registry/hpaaggregator/metrics/resource/pod.go index 7d4c8cde..f23840a8 100644 --- a/pkg/registry/hpaaggregator/metrics/resource/pod.go +++ b/pkg/registry/hpaaggregator/metrics/resource/pod.go @@ -22,9 +22,11 @@ package resource import ( "context" + "errors" "fmt" "sort" + "github.com/kubewharf/kubeadmiral/pkg/util/aggregatedlister" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/meta" @@ -36,7 +38,6 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" genericapirequest "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/registry/rest" - "k8s.io/client-go/tools/cache" "k8s.io/component-base/metrics/legacyregistry" "k8s.io/klog/v2" "k8s.io/metrics/pkg/apis/metrics" @@ -46,7 +47,7 @@ import ( type PodMetrics struct { groupResource schema.GroupResource metrics PodMetricsGetter - podLister cache.GenericLister + podLister aggregatedlister.AggregatedLister } var ( @@ -58,7 +59,7 @@ var ( _ rest.Scoper = &PodMetrics{} ) -func NewPodMetrics(groupResource schema.GroupResource, metrics PodMetricsGetter, podLister cache.GenericLister) *PodMetrics { +func NewPodMetrics(groupResource schema.GroupResource, metrics PodMetricsGetter, podLister aggregatedlister.AggregatedLister) *PodMetrics { registerIntoLegacyRegistryOnce.Do(func() { err := RegisterAPIMetrics(legacyregistry.Register) if err != nil { @@ -114,14 +115,24 @@ func (m *PodMetrics) pods(ctx context.Context, options *metainternalversion.List } namespace := genericapirequest.NamespaceValue(ctx) - pods, err := m.podLister.ByNamespace(namespace).List(labelSelector) + podsObj, err := m.podLister.ByNamespace(namespace).List(ctx, metav1.ListOptions{ + LabelSelector: labelSelector.String(), + }) if err != nil { klog.ErrorS(err, "Failed listing pods", "labelSelector", labelSelector, "namespace", klog.KRef("", namespace)) return nil, fmt.Errorf("failed listing pods: %w", err) } + podList, ok := podsObj.(*corev1.PodList) + if !ok { + return nil, errors.New("failed to convert obj to PodList") + } + var runtimeObjs []runtime.Object + for _, pod := range podList.Items { + runtimeObjs = append(runtimeObjs, &pod) + } - partialPods := make([]runtime.Object, 0, len(pods)) - for _, obj := range pods { + partialPods := make([]runtime.Object, 0, len(runtimeObjs)) + for _, obj := range runtimeObjs { var partialObj *metav1.PartialObjectMetadata switch t := obj.(type) { case *metav1.PartialObjectMetadata: @@ -144,7 +155,7 @@ func (m *PodMetrics) pods(ctx context.Context, options *metainternalversion.List func (m *PodMetrics) Get(ctx context.Context, name string, opts *metav1.GetOptions) (runtime.Object, error) { namespace := genericapirequest.NamespaceValue(ctx) - obj, err := m.podLister.ByNamespace(namespace).Get(name) + obj, err := m.podLister.ByNamespace(namespace).Get(ctx, name, *opts) if err != nil { if apierrors.IsNotFound(err) { // return not-found errors directly diff --git a/pkg/registry/hpaaggregator/metrics/resource_metrics.go b/pkg/registry/hpaaggregator/metrics/resource_metrics.go index 579ace9e..7486e3d6 100644 --- a/pkg/registry/hpaaggregator/metrics/resource_metrics.go +++ b/pkg/registry/hpaaggregator/metrics/resource_metrics.go @@ -17,13 +17,13 @@ limitations under the License. package metrics import ( + "github.com/kubewharf/kubeadmiral/pkg/util/aggregatedlister" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apiserver/pkg/registry/rest" genericapiserver "k8s.io/apiserver/pkg/server" corev1listers "k8s.io/client-go/listers/core/v1" - "k8s.io/client-go/tools/cache" "k8s.io/metrics/pkg/apis/metrics" metricsv1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1" @@ -36,7 +36,7 @@ func BuildResourceMetrics( parameterCodec runtime.ParameterCodec, codecs serializer.CodecFactory, m resource.MetricsGetter, - podMetadataLister cache.GenericLister, + podMetadataLister aggregatedlister.AggregatedLister, nodeLister corev1listers.NodeLister, nodeSelector []labels.Requirement, ) genericapiserver.APIGroupInfo { diff --git a/pkg/util/aggregatedlister/global_resource_version.go b/pkg/util/aggregatedlister/global_resource_version.go new file mode 100644 index 00000000..4c4f5c27 --- /dev/null +++ b/pkg/util/aggregatedlister/global_resource_version.go @@ -0,0 +1,137 @@ +/* +Copyright 2025 The KubeAdmiral Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aggregatedlister + +import ( + "encoding/base64" + "encoding/json" + "sort" + "sync" +) + +type GlobalResourceVersion struct { + sync.RWMutex + + clusterRVs map[string]string + isZero bool +} + +// NewGlobalResourceVersionWithCapacity Only used for Get Verb +func NewGlobalResourceVersionWithCapacity(capacity int) *GlobalResourceVersion { + return &GlobalResourceVersion{ + clusterRVs: make(map[string]string, capacity), + } +} + +func NewGlobalResourceVersionFromString(s string) *GlobalResourceVersion { + g := &GlobalResourceVersion{ + clusterRVs: map[string]string{}, + } + if s == "" { + return g + } + if s == "0" { + g.isZero = true + return g + } + + decoded, err := base64.RawURLEncoding.DecodeString(s) + if err != nil { + return g + } + _ = json.Unmarshal(decoded, &g.clusterRVs) + return g +} + +func (g *GlobalResourceVersion) Set(cluster, rv string) { + g.Lock() + defer g.Unlock() + g.clusterRVs[cluster] = rv + if rv != "0" { + g.isZero = false + } +} + +func (g *GlobalResourceVersion) Get(cluster string) string { + g.RLock() + defer g.RUnlock() + if g.isZero { + return "0" + } + return g.clusterRVs[cluster] +} + +func (g *GlobalResourceVersion) String() string { + if g.isZero { + return "0" + } + if len(g.clusterRVs) == 0 { + return "" + } + buf := marshalRvs(g.clusterRVs) + return base64.RawURLEncoding.EncodeToString(buf) +} + +func (g *GlobalResourceVersion) Clone() *GlobalResourceVersion { + ret := &GlobalResourceVersion{ + isZero: g.isZero, + clusterRVs: make(map[string]string, len(g.clusterRVs)), + } + for k, v := range g.clusterRVs { + ret.clusterRVs[k] = v + } + return ret +} + +func marshalRvs(rvs map[string]string) []byte { + if len(rvs) == 0 { + return nil + } + + type onWireRvs struct { + Cluster string + ResourceVersion string + } + + slice := make([]onWireRvs, 0, len(rvs)) + + for clusterName, version := range rvs { + obj := onWireRvs{clusterName, version} + slice = append(slice, obj) + } + + sort.Slice(slice, func(i, j int) bool { + return slice[i].Cluster < slice[j].Cluster + }) + + var encoded = make([]byte, 0) + encoded = append(encoded, '{') + for i, n := 0, len(slice); i < n; i++ { + encoded = append(encoded, '"') + encoded = append(encoded, slice[i].Cluster...) + encoded = append(encoded, `":"`...) + encoded = append(encoded, slice[i].ResourceVersion...) + encoded = append(encoded, '"') + + if i != n-1 { + encoded = append(encoded, ',') + } + } + encoded = append(encoded, '}') + + return encoded +} diff --git a/pkg/util/aggregatedlister/interface.go b/pkg/util/aggregatedlister/interface.go new file mode 100644 index 00000000..52a96ea6 --- /dev/null +++ b/pkg/util/aggregatedlister/interface.go @@ -0,0 +1,33 @@ +/* +Copyright 2025 The KubeAdmiral Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package aggregatedlister + +import ( + "context" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" +) + +type AggregatedLister interface { + ByNamespace(namespace string) AggregatedNamespaceLister +} + +type AggregatedNamespaceLister interface { + Get(ctx context.Context, name string, opts metav1.GetOptions) (runtime.Object, error) + List(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) +} diff --git a/pkg/util/aggregatedlister/pod.go b/pkg/util/aggregatedlister/pod.go index c697675f..7d76305e 100644 --- a/pkg/util/aggregatedlister/pod.go +++ b/pkg/util/aggregatedlister/pod.go @@ -17,17 +17,15 @@ limitations under the License. package aggregatedlister import ( - "fmt" - "strings" + "context" + "github.com/kubewharf/kubeadmiral/pkg/util/clusterobject" + "github.com/kubewharf/kubeadmiral/pkg/util/informermanager" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/tools/cache" - - "github.com/kubewharf/kubeadmiral/pkg/util/clusterobject" - "github.com/kubewharf/kubeadmiral/pkg/util/informermanager" ) type PodLister struct { @@ -40,92 +38,105 @@ type PodNamespaceLister struct { federatedInformerManager informermanager.FederatedInformerManager } -var ( - _ cache.GenericLister = &PodLister{} - _ cache.GenericNamespaceLister = &PodNamespaceLister{} -) - func NewPodLister(informer informermanager.FederatedInformerManager) *PodLister { return &PodLister{federatedInformerManager: informer} } -func (p *PodLister) List(selector labels.Selector) (ret []runtime.Object, err error) { +func (p *PodLister) ByNamespace(namespace string) AggregatedNamespaceLister { + return &PodNamespaceLister{federatedInformerManager: p.federatedInformerManager, namespace: namespace} +} + +func (p *PodNamespaceLister) List(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { + grv := NewGlobalResourceVersionFromString(opts.ResourceVersion) + retGrv := grv.Clone() clusters, err := p.federatedInformerManager.GetReadyClusters() if err != nil { return nil, err } + var resultObject runtime.Object + items := make([]runtime.Object, 0) for _, cluster := range clusters { - podLister, podsSynced, exists := p.federatedInformerManager.GetPodLister(cluster.Name) - if !exists || !podsSynced() { + client, exists := p.federatedInformerManager.GetClusterKubeClient(cluster.Name) + if !exists { continue } - pods, err := podLister.List(selector) + + podList, err := client.CoreV1().Pods(p.namespace).List(ctx, metav1.ListOptions{ + LabelSelector: opts.LabelSelector, + FieldSelector: opts.FieldSelector, + ResourceVersion: grv.Get(cluster.Name), + }) if err != nil { continue } - for i := range pods { - pod := pods[i].DeepCopy() - clusterobject.MakePodUnique(pod, cluster.Name) - ret = append(ret, pod) + pods := podList.Items + + list, err := meta.ListAccessor(podList) + if err != nil { + continue } - } - return ret, nil -} -func (p *PodLister) Get(name string) (runtime.Object, error) { - items := strings.Split(name, "/") - if len(items) != 2 { - return nil, apierrors.NewBadRequest(fmt.Sprintf("invalid name %q", name)) + if resultObject == nil { + resultObject = podList + } + + for _, pod := range pods { + clusterobject.MakePodUnique(&pod, cluster.Name) + podObj := pod.DeepCopyObject() + items = append(items, podObj) + } + + retGrv.Set(cluster.Name, list.GetResourceVersion()) } - return p.ByNamespace(items[0]).Get(items[1]) -} -func (p *PodLister) ByNamespace(namespace string) cache.GenericNamespaceLister { - return &PodNamespaceLister{federatedInformerManager: p.federatedInformerManager, namespace: namespace} -} + if resultObject == nil { + resultObject = &metav1.List{ + TypeMeta: metav1.TypeMeta{ + APIVersion: "v1", + Kind: "List", + }, + ListMeta: metav1.ListMeta{}, + Items: []runtime.RawExtension{}, + } + } -func (p *PodNamespaceLister) List(selector labels.Selector) (ret []runtime.Object, err error) { - clusters, err := p.federatedInformerManager.GetReadyClusters() + err = meta.SetList(resultObject, items) if err != nil { return nil, err } - for _, cluster := range clusters { - podLister, podsSynced, exists := p.federatedInformerManager.GetPodLister(cluster.Name) - if !exists || !podsSynced() { - continue - } - pods, err := podLister.Pods(p.namespace).List(selector) - if err != nil { - continue - } - for i := range pods { - pod := pods[i].DeepCopy() - clusterobject.MakePodUnique(pod, cluster.Name) - ret = append(ret, pod) - } + accessor, err := meta.ListAccessor(resultObject) + if err != nil { + return nil, err } - return ret, nil + accessor.SetResourceVersion(retGrv.String()) + return resultObject, nil } -func (p *PodNamespaceLister) Get(name string) (runtime.Object, error) { +func (p *PodNamespaceLister) Get(ctx context.Context, name string, opts metav1.GetOptions) (runtime.Object, error) { clusters, err := p.federatedInformerManager.GetReadyClusters() if err != nil { return nil, err } for _, cluster := range clusterobject.GetPossibleClusters(clusters, name) { - podLister, podsSynced, exists := p.federatedInformerManager.GetPodLister(cluster) - if !exists || !podsSynced() { + client, exists := p.federatedInformerManager.GetClusterKubeClient(cluster) + if !exists { continue } - pods, err := podLister.Pods(p.namespace).List(labels.Everything()) + + podList, err := client.CoreV1().Pods(p.namespace).List(ctx, metav1.ListOptions{}) if err != nil { continue } + pods := podList.Items + for i := range pods { if name == clusterobject.GenUniqueName(cluster, pods[i].Name) { pod := pods[i].DeepCopy() clusterobject.MakePodUnique(pod, cluster) + grv := NewGlobalResourceVersionWithCapacity(1) + grv.Set(cluster, pod.GetResourceVersion()) + pod.SetResourceVersion(grv.String()) return pod, nil } } diff --git a/pkg/util/aggregatedlister/pod_test.go b/pkg/util/aggregatedlister/pod_test.go deleted file mode 100644 index d3bf0469..00000000 --- a/pkg/util/aggregatedlister/pod_test.go +++ /dev/null @@ -1,395 +0,0 @@ -/* -Copyright 2023 The KubeAdmiral Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package aggregatedlister - -import ( - "fmt" - "testing" - - "github.com/stretchr/testify/assert" - corev1 "k8s.io/api/core/v1" - apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" - corev1listers "k8s.io/client-go/listers/core/v1" - - fedcorev1a1 "github.com/kubewharf/kubeadmiral/pkg/apis/core/v1alpha1" - "github.com/kubewharf/kubeadmiral/pkg/util/clusterobject" - "github.com/kubewharf/kubeadmiral/pkg/util/informermanager" - fakeinformermanager "github.com/kubewharf/kubeadmiral/pkg/util/informermanager/fake" -) - -func TestPodLister_Get(t *testing.T) { - type fields struct { - federatedInformerManager informermanager.FederatedInformerManager - } - type args struct { - name string - } - tests := []struct { - name string - fields fields - args args - want runtime.Object - wantErr assert.ErrorAssertionFunc - }{ - { - name: "bad request", - fields: fields{}, - args: args{ - name: "aa", - }, - want: nil, - wantErr: func(t assert.TestingT, err error, i ...interface{}) bool { return apierrors.IsBadRequest(err) }, - }, - { - name: "normal", - fields: fields{ - federatedInformerManager: &fakeinformermanager.FakeFederatedInformerManager{}, - }, - args: args{ - name: "a/b", - }, - want: nil, - wantErr: func(t assert.TestingT, err error, i ...interface{}) bool { return err == nil }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - p := NewPodLister(tt.fields.federatedInformerManager) - got, err := p.Get(tt.args.name) - if !tt.wantErr(t, err, fmt.Sprintf("Get(%v)", tt.args.name)) { - return - } - assert.Equalf(t, tt.want, got, "Get(%v)", tt.args.name) - }) - } -} - -func TestPodLister_List(t *testing.T) { - clusters, listers := newClustersWithPodListers(3) - informer := &fakeinformermanager.FakeFederatedInformerManager{ - ReadyClusters: clusters, - PodListers: listers, - } - pods := make([]*corev1.Pod, 3) - for i := range pods { - name := fmt.Sprintf("cluster-%d", i) - pod := newPod("default", name) - clusterobject.MakePodUnique(pod, name) - pods[i] = pod - } - - type fields struct { - federatedInformerManager informermanager.FederatedInformerManager - } - type args struct { - selector labels.Selector - } - tests := []struct { - name string - fields fields - args args - wantRet []runtime.Object - wantErr assert.ErrorAssertionFunc - }{ - { - name: "list all", - fields: fields{ - federatedInformerManager: informer, - }, - args: args{ - selector: labels.Everything(), - }, - wantRet: []runtime.Object{pods[0], pods[1], pods[2]}, - wantErr: func(t assert.TestingT, err error, i ...interface{}) bool { return err == nil }, - }, - { - name: "list fake pod", - fields: fields{ - federatedInformerManager: informer, - }, - args: args{ - selector: labels.SelectorFromSet(map[string]string{"fake": "pod"}), - }, - wantRet: []runtime.Object{pods[0], pods[1], pods[2]}, - wantErr: func(t assert.TestingT, err error, i ...interface{}) bool { return err == nil }, - }, - { - name: "list one pod", - fields: fields{ - federatedInformerManager: informer, - }, - args: args{ - selector: labels.SelectorFromSet(map[string]string{"pod": "cluster-0"}), - }, - wantRet: []runtime.Object{pods[0]}, - wantErr: func(t assert.TestingT, err error, i ...interface{}) bool { return err == nil }, - }, - { - name: "list not found", - fields: fields{ - federatedInformerManager: informer, - }, - args: args{ - selector: labels.SelectorFromSet(map[string]string{"foo": "bar"}), - }, - wantRet: nil, - wantErr: func(t assert.TestingT, err error, i ...interface{}) bool { return err == nil }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - p := &PodLister{ - federatedInformerManager: tt.fields.federatedInformerManager, - } - gotRet, err := p.List(tt.args.selector) - if !tt.wantErr(t, err, fmt.Sprintf("List(%v)", tt.args.selector)) { - return - } - assert.Equalf(t, tt.wantRet, gotRet, "List(%v)", tt.args.selector) - }) - } -} - -func TestPodNamespaceLister_Get(t *testing.T) { - clusters, listers := newClustersWithPodListers(3) - informer := &fakeinformermanager.FakeFederatedInformerManager{ - ReadyClusters: clusters, - PodListers: listers, - } - pods := make([]*corev1.Pod, 3) - for i := range pods { - name := fmt.Sprintf("cluster-%d", i) - pod := newPod("default", name) - clusterobject.MakePodUnique(pod, name) - pods[i] = pod - } - - type fields struct { - namespace string - federatedInformerManager informermanager.FederatedInformerManager - } - type args struct { - name string - } - tests := []struct { - name string - fields fields - args args - want runtime.Object - wantErr assert.ErrorAssertionFunc - }{ - { - name: "get pod", - fields: fields{ - federatedInformerManager: informer, - }, - args: args{ - name: pods[1].Name, - }, - want: pods[1], - wantErr: func(t assert.TestingT, err error, i ...interface{}) bool { return err == nil }, - }, - { - name: "pod not found", - fields: fields{ - federatedInformerManager: informer, - }, - args: args{ - name: "fake", - }, - want: nil, - wantErr: func(t assert.TestingT, err error, i ...interface{}) bool { return apierrors.IsNotFound(err) }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - p := &PodNamespaceLister{ - namespace: tt.fields.namespace, - federatedInformerManager: tt.fields.federatedInformerManager, - } - got, err := p.Get(tt.args.name) - if !tt.wantErr(t, err, fmt.Sprintf("Get(%v)", tt.args.name)) { - return - } - assert.Equalf(t, tt.want, got, "Get(%v)", tt.args.name) - }) - } -} - -func TestPodNamespaceLister_List(t *testing.T) { - clusters, listers := newClustersWithPodListers(3) - informer := &fakeinformermanager.FakeFederatedInformerManager{ - ReadyClusters: clusters, - PodListers: listers, - } - pods := make([]*corev1.Pod, 3) - for i := range pods { - name := fmt.Sprintf("cluster-%d", i) - pod := newPod("default", name) - clusterobject.MakePodUnique(pod, name) - pods[i] = pod - } - - type fields struct { - namespace string - federatedInformerManager informermanager.FederatedInformerManager - } - type args struct { - selector labels.Selector - } - tests := []struct { - name string - fields fields - args args - wantRet []runtime.Object - wantErr assert.ErrorAssertionFunc - }{ - { - name: "list all", - fields: fields{ - namespace: "default", - federatedInformerManager: informer, - }, - args: args{ - selector: labels.Everything(), - }, - wantRet: []runtime.Object{pods[0], pods[1], pods[2]}, - wantErr: func(t assert.TestingT, err error, i ...interface{}) bool { return err == nil }, - }, - { - name: "list fake pod", - fields: fields{ - namespace: "default", - federatedInformerManager: informer, - }, - args: args{ - selector: labels.SelectorFromSet(map[string]string{"fake": "pod"}), - }, - wantRet: []runtime.Object{pods[0], pods[1], pods[2]}, - wantErr: func(t assert.TestingT, err error, i ...interface{}) bool { return err == nil }, - }, - { - name: "list one pod", - fields: fields{ - namespace: "default", - federatedInformerManager: informer, - }, - args: args{ - selector: labels.SelectorFromSet(map[string]string{"pod": "cluster-0"}), - }, - wantRet: []runtime.Object{pods[0]}, - wantErr: func(t assert.TestingT, err error, i ...interface{}) bool { return err == nil }, - }, - { - name: "list not found", - fields: fields{ - namespace: "default", - federatedInformerManager: informer, - }, - args: args{ - selector: labels.SelectorFromSet(map[string]string{"foo": "bar"}), - }, - wantRet: nil, - wantErr: func(t assert.TestingT, err error, i ...interface{}) bool { return err == nil }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - p := &PodNamespaceLister{ - namespace: tt.fields.namespace, - federatedInformerManager: tt.fields.federatedInformerManager, - } - gotRet, err := p.List(tt.args.selector) - if !tt.wantErr(t, err, fmt.Sprintf("List(%v)", tt.args.selector)) { - return - } - assert.Equalf(t, tt.wantRet, gotRet, "List(%v)", tt.args.selector) - }) - } -} - -func newPod(ns, name string) *corev1.Pod { - return &corev1.Pod{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: ns, - Labels: map[string]string{ - "fake": "pod", - "pod": name, - }, - }, - } -} - -func newClustersWithPodListers(num int) ([]*fedcorev1a1.FederatedCluster, map[string]fakeinformermanager.FakeLister) { - clusters := make([]*fedcorev1a1.FederatedCluster, num) - listers := make(map[string]fakeinformermanager.FakeLister) - ns := "default" - - for i := range clusters { - name := fmt.Sprintf("cluster-%d", i) - clusters[i] = &fedcorev1a1.FederatedCluster{ - ObjectMeta: metav1.ObjectMeta{ - Name: name, - }, - } - - listers[name] = fakeinformermanager.FakeLister{ - PodLister: fakePodLister{data: []*corev1.Pod{newPod(ns, name)}}, - Synced: true, - } - } - return clusters, listers -} - -// fakes both PodLister and PodNamespaceLister at once -type fakePodLister struct { - data []*corev1.Pod - err error -} - -func (pl fakePodLister) List(selector labels.Selector) (ret []*corev1.Pod, err error) { - if pl.err != nil { - return nil, pl.err - } - res := []*corev1.Pod{} - for _, pod := range pl.data { - if selector.Matches(labels.Set(pod.Labels)) { - res = append(res, pod) - } - } - return res, nil -} - -func (pl fakePodLister) Get(name string) (*corev1.Pod, error) { - if pl.err != nil { - return nil, pl.err - } - for _, pod := range pl.data { - if pod.Name == name { - return pod, nil - } - } - return nil, nil -} - -func (pl fakePodLister) Pods(namespace string) corev1listers.PodNamespaceLister { - return pl -}