Skip to content

Commit

Permalink
feat:add resourceVersion for pod
Browse files Browse the repository at this point in the history
  • Loading branch information
Poor12 committed Jan 9, 2025
1 parent 37ee4ad commit 83b9df2
Show file tree
Hide file tree
Showing 9 changed files with 276 additions and 802 deletions.
84 changes: 19 additions & 65 deletions pkg/registry/hpaaggregator/aggregation/forward/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,21 @@ import (
"context"
"fmt"
"net/http"
"strconv"
"sync"
"time"

"github.com/kubewharf/kubeadmiral/pkg/util/aggregatedlister"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/endpoints/handlers"
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/klog/v2"
"k8s.io/utils/pointer"

Expand All @@ -51,7 +48,7 @@ type PodHandler interface {
}

type PodREST struct {
podLister cache.GenericLister
podLister aggregatedlister.AggregatedLister
federatedInformerManager informermanager.FederatedInformerManager
minRequestTimeout time.Duration
}
Expand All @@ -65,7 +62,7 @@ var (

func NewPodREST(
f informermanager.FederatedInformerManager,
podLister cache.GenericLister,
podLister aggregatedlister.AggregatedLister,
minRequestTimeout time.Duration,
) *PodREST {
return &PodREST{
Expand All @@ -92,7 +89,7 @@ func (p *PodREST) Handler(requestInfo *genericapirequest.RequestInfo) (http.Hand
// Get ...
func (p *PodREST) Get(ctx context.Context, name string, opts *metav1.GetOptions) (runtime.Object, error) {
namespace := genericapirequest.NamespaceValue(ctx)
obj, err := p.podLister.ByNamespace(namespace).Get(name)
obj, err := p.podLister.ByNamespace(namespace).Get(ctx, name, *opts)
if err != nil {
if apierrors.IsNotFound(err) {
// return not-found errors directly
Expand All @@ -110,31 +107,26 @@ func (p *PodREST) NewList() runtime.Object {
}

func (p *PodREST) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
label := labels.Everything()
if options != nil && options.LabelSelector != nil {
label = options.LabelSelector
}

namespace := genericapirequest.NamespaceValue(ctx)
objs, err := p.podLister.ByNamespace(namespace).List(label)
objs, err := p.podLister.ByNamespace(namespace).List(ctx, metav1.ListOptions{
LabelSelector: options.LabelSelector.String(),
FieldSelector: options.FieldSelector.String(),
ResourceVersion: options.ResourceVersion,
})
if err != nil {
klog.ErrorS(err, "Failed listing pods", "labelSelector", label, "namespace", klog.KRef("", namespace))
klog.ErrorS(err, "Failed listing pods", "namespace", klog.KRef("", namespace))
return nil, fmt.Errorf("failed listing pods: %w", err)
}

field := fields.Everything()
if options != nil && options.FieldSelector != nil {
field = options.FieldSelector
}
pods := convertAndFilterPodObject(objs, field)
return &corev1.PodList{Items: pods}, nil
return objs, nil
}

func (p *PodREST) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) {
return tableConvertor.ConvertToTable(ctx, object, tableOptions)
}

func (p *PodREST) Watch(ctx context.Context, options *metainternalversion.ListOptions) (watch.Interface, error) {
grv := aggregatedlister.NewGlobalResourceVersionFromString(options.ResourceVersion)
retGrv := grv.Clone()
label := labels.Everything()
if options != nil && options.LabelSelector != nil {
label = options.LabelSelector
Expand Down Expand Up @@ -166,9 +158,10 @@ func (p *PodREST) Watch(ctx context.Context, options *metainternalversion.ListOp
continue
}
watcher, err := client.CoreV1().Pods(namespace).Watch(ctx, metav1.ListOptions{
LabelSelector: label.String(),
FieldSelector: options.FieldSelector.String(),
TimeoutSeconds: pointer.Int64(1200),
LabelSelector: label.String(),
FieldSelector: options.FieldSelector.String(),
TimeoutSeconds: pointer.Int64(1200),
ResourceVersion: grv.Get(clusters[i].Name),
})
if err != nil {
logger.Error(err, "Failed watching pods")
Expand Down Expand Up @@ -197,6 +190,8 @@ func (p *PodREST) Watch(ctx context.Context, options *metainternalversion.ListOp
}
if pod, ok := event.Object.(*corev1.Pod); ok {
clusterobject.MakePodUnique(pod, cluster)
retGrv.Set(cluster, pod.ResourceVersion)
pod.SetResourceVersion(retGrv.String())
event.Object = pod
}

Expand All @@ -211,44 +206,3 @@ func (p *PodREST) Watch(ctx context.Context, options *metainternalversion.ListOp
}
return proxyWatcher, nil
}

func convertAndFilterPodObject(objs []runtime.Object, selector fields.Selector) []corev1.Pod {
newObjs := make([]corev1.Pod, 0, len(objs))
for _, obj := range objs {
pod, ok := obj.(*corev1.Pod)
if !ok {
continue
}
fields := ToSelectableFields(pod)
if !selector.Matches(fields) {
continue
}

newObjs = append(newObjs, *pod)
}
return newObjs
}

// ToSelectableFields returns a field set that represents the object
// TODO: fields are not labels, and the validation rules for them do not apply.
func ToSelectableFields(pod *corev1.Pod) fields.Set {
// The purpose of allocation with a given number of elements is to reduce
// amount of allocations needed to create the fields.Set. If you add any
// field here or the number of object-meta related fields changes, this should
// be adjusted.
podSpecificFieldsSet := make(fields.Set, 10)
podSpecificFieldsSet["spec.nodeName"] = pod.Spec.NodeName
podSpecificFieldsSet["spec.restartPolicy"] = string(pod.Spec.RestartPolicy)
podSpecificFieldsSet["spec.schedulerName"] = pod.Spec.SchedulerName
podSpecificFieldsSet["spec.serviceAccountName"] = pod.Spec.ServiceAccountName
podSpecificFieldsSet["spec.hostNetwork"] = strconv.FormatBool(pod.Spec.HostNetwork)
podSpecificFieldsSet["status.phase"] = string(pod.Status.Phase)
// TODO: add podIPs as a downward API value(s) with proper format
podIP := ""
if len(pod.Status.PodIPs) > 0 {
podIP = pod.Status.PodIPs[0].IP
}
podSpecificFieldsSet["status.podIP"] = podIP
podSpecificFieldsSet["status.nominatedNodeName"] = pod.Status.NominatedNodeName
return generic.AddObjectMetaFieldsSet(podSpecificFieldsSet, &pod.ObjectMeta, true)
}
Loading

0 comments on commit 83b9df2

Please sign in to comment.