From 6ddba442fb639be6197a03a0fdb21713792bb3ff Mon Sep 17 00:00:00 2001 From: Nic George Date: Fri, 14 Feb 2025 08:59:23 -0700 Subject: [PATCH 01/10] gonfidel/implement-weighted-queues --- api/v1alpha1/types.go | 7 + .../kro.run_resourcegraphdefinitions.yaml | 6 + examples/kubernetes/webapp/rg.yaml | 5 +- .../kro.run_resourcegraphdefinitions.yaml | 6 + pkg/controller/instance/controller.go | 3 + .../controller_reconcile.go | 6 +- pkg/dynamiccontroller/dynamic_controller.go | 174 +++++++++++++++--- .../dynamic_controller_test.go | 12 +- pkg/metadata/groupversion.go | 2 +- 9 files changed, 188 insertions(+), 33 deletions(-) diff --git a/api/v1alpha1/types.go b/api/v1alpha1/types.go index 77aa01f7..7a55582e 100644 --- a/api/v1alpha1/types.go +++ b/api/v1alpha1/types.go @@ -43,6 +43,13 @@ type ResourceGraphDefinitionSpec struct { // // +kubebuilder:validation:Optional DefaultServiceAccounts map[string]string `json:"defaultServiceAccounts,omitempty"` + // The weight used to determine priority when queuing reconciliation. + // + // +kubebuilder:default=100 + // +kubebuilder:validation:Maximum=1000 + // +kubebuilder:validation:Minimum=1 + // +kubebuilder:validation:Optional + Weight int `json:"weight,omitempty"` } // Schema represents the attributes that define an instance of diff --git a/config/crd/bases/kro.run_resourcegraphdefinitions.yaml b/config/crd/bases/kro.run_resourcegraphdefinitions.yaml index b26d4d63..9cd49d92 100644 --- a/config/crd/bases/kro.run_resourcegraphdefinitions.yaml +++ b/config/crd/bases/kro.run_resourcegraphdefinitions.yaml @@ -145,6 +145,12 @@ spec: - apiVersion - kind type: object + weight: + default: 100 + description: The weight used to determine priority when queuing reconciliation. + maximum: 1000 + minimum: 1 + type: integer required: - schema type: object diff --git a/examples/kubernetes/webapp/rg.yaml b/examples/kubernetes/webapp/rg.yaml index dbe28d1d..8556f241 100644 --- a/examples/kubernetes/webapp/rg.yaml +++ b/examples/kubernetes/webapp/rg.yaml @@ -3,6 +3,7 @@ kind: ResourceGraphDefinition metadata: name: webapp.kro.run spec: + weight: 150 schema: apiVersion: v1alpha1 kind: WebApp @@ -68,7 +69,7 @@ spec: - id: service includeWhen: - - ${schema.spec.service.enabled} + - ${schema.spec.service.enabled} template: apiVersion: v1 kind: Service @@ -86,7 +87,7 @@ spec: - id: ingress includeWhen: - - ${schema.spec.ingress.enabled} + - ${schema.spec.ingress.enabled} template: apiVersion: networking.k8s.io/v1 kind: Ingress diff --git a/helm/crds/kro.run_resourcegraphdefinitions.yaml b/helm/crds/kro.run_resourcegraphdefinitions.yaml index b26d4d63..9cd49d92 100644 --- a/helm/crds/kro.run_resourcegraphdefinitions.yaml +++ b/helm/crds/kro.run_resourcegraphdefinitions.yaml @@ -145,6 +145,12 @@ spec: - apiVersion - kind type: object + weight: + default: 100 + description: The weight used to determine priority when queuing reconciliation. + maximum: 1000 + minimum: 1 + type: integer required: - schema type: object diff --git a/pkg/controller/instance/controller.go b/pkg/controller/instance/controller.go index b032a160..b333b902 100644 --- a/pkg/controller/instance/controller.go +++ b/pkg/controller/instance/controller.go @@ -47,6 +47,9 @@ type ReconcileConfig struct { // TODO(a-hilaly): need to define think the different deletion policies we need to // support. DeletionPolicy string + + // Instance controller events are queued in a weighted queue. + Weight int } // Controller manages the reconciliation of a single instance of a ResourceGraphDefinition, diff --git a/pkg/controller/resourcegraphdefinition/controller_reconcile.go b/pkg/controller/resourcegraphdefinition/controller_reconcile.go index b4a11902..763308df 100644 --- a/pkg/controller/resourcegraphdefinition/controller_reconcile.go +++ b/pkg/controller/resourcegraphdefinition/controller_reconcile.go @@ -63,7 +63,7 @@ func (r *ResourceGraphDefinitionReconciler) reconcileResourceGraphDefinition(ctx controller := r.setupMicroController(gvr, processedRGD, rgd.Spec.DefaultServiceAccounts, graphExecLabeler) log.V(1).Info("reconciling resource graph definition micro controller") - if err := r.reconcileResourceGraphDefinitionMicroController(ctx, &gvr, controller.Reconcile); err != nil { + if err := r.reconcileResourceGraphDefinitionMicroController(ctx, &gvr, controller.Reconcile, rgd.Spec.Weight); err != nil { return processedRGD.TopologicalOrder, resourcesInfo, err } @@ -141,8 +141,8 @@ func (r *ResourceGraphDefinitionReconciler) reconcileResourceGraphDefinitionCRD( } // reconcileResourceGraphDefinitionMicroController starts the microcontroller for handling the resources -func (r *ResourceGraphDefinitionReconciler) reconcileResourceGraphDefinitionMicroController(ctx context.Context, gvr *schema.GroupVersionResource, handler dynamiccontroller.Handler) error { - err := r.dynamicController.StartServingGVK(ctx, *gvr, handler) +func (r *ResourceGraphDefinitionReconciler) reconcileResourceGraphDefinitionMicroController(ctx context.Context, gvr *schema.GroupVersionResource, handler dynamiccontroller.Handler, weight int) error { + err := r.dynamicController.StartServingGVK(ctx, *gvr, handler, weight) if err != nil { return newMicroControllerError(err) } diff --git a/pkg/dynamiccontroller/dynamic_controller.go b/pkg/dynamiccontroller/dynamic_controller.go index ec6fa821..1dc1531c 100644 --- a/pkg/dynamiccontroller/dynamic_controller.go +++ b/pkg/dynamiccontroller/dynamic_controller.go @@ -98,6 +98,15 @@ type Config struct { ShutdownTimeout time.Duration } +// DynamicController (DC) manages multiple GVRs and needs a way to prioritize the importance +// of each GVR. Weighted queues provide a flexible way for the DynamicController (DC) to prioritize +// GVRs based on their weights. +type WeightedQueue struct { + weight int + queue workqueue.RateLimitingInterface + gvrSet map[schema.GroupVersionResource]struct{} +} + // DynamicController (DC) is a single controller capable of managing multiple different // kubernetes resources (GVRs) in parallel. It can safely start watching new // resources and stop watching others at runtime - hence the term "dynamic". This @@ -122,10 +131,14 @@ type DynamicController struct { // handler is responsible for managing a specific GVR. handlers sync.Map - // queue is the workqueue used to process items - queue workqueue.RateLimitingInterface + // queues are the workqueue used to process items + weightedQueues map[int]*WeightedQueue + + // weight of gvr when queued, between 1 and 1000 + gvrWeights map[schema.GroupVersionResource]int log logr.Logger + mu sync.RWMutex } type Handler func(ctx context.Context, req ctrl.Request) error @@ -146,18 +159,77 @@ func NewDynamicController( dc := &DynamicController{ config: config, kubeClient: kubeClient, - // TODO(a-hilaly): Make the queue size configurable. - queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter( - workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 1000*time.Second), - &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, - ), "dynamic-controller-queue"), + + weightedQueues: make(map[int]*WeightedQueue), + gvrWeights: make(map[schema.GroupVersionResource]int), log: logger, // pass version and pod id from env } + dc.ensureWeightedQueue(100, schema.GroupVersionResource{}) + return dc } +func (dc *DynamicController) getWeightByGVR(gvr schema.GroupVersionResource) int { + dc.mu.RLock() + defer dc.mu.RUnlock() + return int(dc.gvrWeights[gvr]) +} + +func (dc *DynamicController) setWeightByGVR(gvr schema.GroupVersionResource, weight int) { + dc.mu.Lock() + defer dc.mu.Unlock() + dc.gvrWeights[gvr] = weight +} + +func (dc *DynamicController) deleteWeightByGVR(gvr schema.GroupVersionResource) { + dc.mu.Lock() + defer dc.mu.Unlock() + delete(dc.gvrWeights, gvr) +} + +func (dc *DynamicController) deleteGVRFromQueue(weight int, gvr schema.GroupVersionResource) { + dc.mu.Lock() + defer dc.mu.Unlock() + delete(dc.weightedQueues[weight].gvrSet, gvr) + if len(dc.weightedQueues[weight].gvrSet) < 1 { + dc.weightedQueues[weight].queue.ShutDown() + delete(dc.weightedQueues, weight) + } +} + +// Ensure weighted queue exists with the specified weight +func (dc *DynamicController) ensureWeightedQueue(weight int, gvr schema.GroupVersionResource) bool { + if weight < 1 || weight > 1000 { + dc.log.Error(nil, "weight should be between 1 and 1000", "weight", weight) + return false + } + + if _, ok := dc.weightedQueues[weight]; !ok { + dc.mu.Lock() + defer dc.mu.Unlock() + // TODO(a-hilaly): Make the queue size configurable. + dc.weightedQueues[weight] = &WeightedQueue{ + weight: weight, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter( + workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 1000*time.Second), + &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, + ), fmt.Sprintf("weight-%d-queue", weight)), + gvrSet: map[schema.GroupVersionResource]struct{}{}, + } + dc.weightedQueues[weight].gvrSet[gvr] = struct{}{} + } else { + dc.mu.Lock() + defer dc.mu.Unlock() + if _, ok := dc.weightedQueues[weight].gvrSet[gvr]; !ok { + dc.weightedQueues[weight].gvrSet[gvr] = struct{}{} + } + } + + return true +} + // AllInformerHaveSynced checks if all registered informers have synced, returns // true if they have. func (dc *DynamicController) AllInformerHaveSynced() bool { @@ -200,10 +272,18 @@ func (dc *DynamicController) WaitForInformersSync(stopCh <-chan struct{}) bool { return cache.WaitForCacheSync(stopCh, dc.AllInformerHaveSynced) } +func (dc *DynamicController) shutdownQueues() { + dc.log.Info("Shutting down dynamic controller queues") + for key, _ := range dc.weightedQueues { + dc.log.Info("Shutting down weighted queue: %d", key) + dc.weightedQueues[key].queue.ShutDown() + } +} + // Run starts the DynamicController. func (dc *DynamicController) Run(ctx context.Context) error { defer utilruntime.HandleCrash() - defer dc.queue.ShutDown() + defer dc.shutdownQueues() dc.log.Info("Starting dynamic controller") defer dc.log.Info("Shutting down dynamic controller") @@ -230,26 +310,60 @@ func (dc *DynamicController) worker(ctx context.Context) { } } +// Select queue propotionally based on weight +// TODO(n-george): review this logic +func (dc *DynamicController) selectQueueByWeight() *WeightedQueue { + var totalWeight int + var minQueue *WeightedQueue + + for _, wq := range dc.weightedQueues { + totalWeight += wq.weight + if minQueue == nil || wq.weight < minQueue.weight { + minQueue = wq + } + } + + if totalWeight == 0 { + return nil + } + + var selectedQueue *WeightedQueue + minRatio := float64(^uint(0)) + + for _, wq := range dc.weightedQueues { + queueRatio := float64(wq.queue.Len()) / float64(wq.weight) + if queueRatio < minRatio { + minRatio = queueRatio + selectedQueue = wq + } + } + + return selectedQueue +} + // processNextWorkItem processes a single item from the queue. func (dc *DynamicController) processNextWorkItem(ctx context.Context) bool { - obj, shutdown := dc.queue.Get() + weightedQueue := dc.selectQueueByWeight() + dc.log.V(1).Info("Processing item with specified weight", "weight", weightedQueue.weight) + + obj, shutdown := weightedQueue.queue.Get() if shutdown { return false } - defer dc.queue.Done(obj) + defer weightedQueue.queue.Done(obj) - queueLength.Set(float64(dc.queue.Len())) + queueLength.Set(float64(weightedQueue.queue.Len())) item, ok := obj.(ObjectIdentifiers) if !ok { dc.log.Error(fmt.Errorf("expected ObjectIdentifiers in queue but got %#v", obj), "Invalid item in queue") - dc.queue.Forget(obj) + weightedQueue.queue.Forget(obj) return true } err := dc.syncFunc(ctx, item) if err == nil || apierrors.IsNotFound(err) { - dc.queue.Forget(obj) + weightedQueue.queue.Forget(obj) return true } @@ -260,25 +374,25 @@ func (dc *DynamicController) processNextWorkItem(ctx context.Context) bool { case *requeue.NoRequeue: dc.log.Error(typedErr, "Error syncing item, not requeuing", "item", item) requeueTotal.WithLabelValues(gvrKey, "no_requeue").Inc() - dc.queue.Forget(obj) + weightedQueue.queue.Forget(obj) case *requeue.RequeueNeeded: dc.log.V(1).Info("Requeue needed", "item", item, "error", typedErr) requeueTotal.WithLabelValues(gvrKey, "requeue").Inc() - dc.queue.Add(obj) // Add without rate limiting + weightedQueue.queue.Add(obj) // Add without rate limiting case *requeue.RequeueNeededAfter: dc.log.V(1).Info("Requeue needed after delay", "item", item, "error", typedErr, "delay", typedErr.Duration()) requeueTotal.WithLabelValues(gvrKey, "requeue_after").Inc() - dc.queue.AddAfter(obj, typedErr.Duration()) + weightedQueue.queue.AddAfter(obj, typedErr.Duration()) default: // Arriving here means we have an unexpected error, we should requeue the item // with rate limiting. requeueTotal.WithLabelValues(gvrKey, "rate_limited").Inc() - if dc.queue.NumRequeues(obj) < dc.config.QueueMaxRetries { + if weightedQueue.queue.NumRequeues(obj) < dc.config.QueueMaxRetries { dc.log.Error(err, "Error syncing item, requeuing with rate limit", "item", item) - dc.queue.AddRateLimited(obj) + weightedQueue.queue.AddRateLimited(obj) } else { dc.log.Error(err, "Dropping item from queue after max retries", "item", item) - dc.queue.Forget(obj) + weightedQueue.queue.Forget(obj) } } @@ -413,18 +527,28 @@ func (dc *DynamicController) enqueueObject(obj interface{}, eventType string) { GVR: gvr, } + weight := dc.getWeightByGVR(gvr) + dc.log.V(1).Info("Enqueueing object", "objectIdentifiers", objectIdentifiers, - "eventType", eventType) + "eventType", eventType, + "weight", weight) informerEventsTotal.WithLabelValues(gvr.String(), eventType).Inc() - dc.queue.Add(objectIdentifiers) + + dc.weightedQueues[weight].queue.Add(objectIdentifiers) } // StartServingGVK registers a new GVK to the informers map safely. -func (dc *DynamicController) StartServingGVK(ctx context.Context, gvr schema.GroupVersionResource, handler Handler) error { +func (dc *DynamicController) StartServingGVK(ctx context.Context, gvr schema.GroupVersionResource, handler Handler, queueWeight int) error { dc.log.V(1).Info("Registering new GVK", "gvr", gvr) + // Set the weight for the GVR and ensure the queue exists for that weight class + dc.setWeightByGVR(gvr, queueWeight) + if ok := dc.ensureWeightedQueue(queueWeight, gvr); !ok { + return fmt.Errorf("Failed to create or get weighted queue with weight: %d", queueWeight) + } + _, exists := dc.informers.Load(gvr) if exists { // Even thought the informer is already registered, we should still @@ -494,6 +618,11 @@ func (dc *DynamicController) StartServingGVK(ctx context.Context, gvr schema.Gro func (dc *DynamicController) StopServiceGVK(ctx context.Context, gvr schema.GroupVersionResource) error { dc.log.Info("Unregistering GVK", "gvr", gvr) + // Remove gvr from weight map + weight := dc.getWeightByGVR(gvr) + dc.deleteWeightByGVR(gvr) + dc.deleteGVRFromQueue(weight, gvr) + // Retrieve the informer informerObj, ok := dc.informers.Load(gvr) if !ok { @@ -520,6 +649,7 @@ func (dc *DynamicController) StopServiceGVK(ctx context.Context, gvr schema.Grou // Unregister the handler if any dc.handlers.Delete(gvr) + gvrCount.Dec() // Clean up any pending items in the queue for this GVR // NOTE(a-hilaly): This is a bit heavy.. maybe we can find a better way to do this. diff --git a/pkg/dynamiccontroller/dynamic_controller_test.go b/pkg/dynamiccontroller/dynamic_controller_test.go index 556d99df..ee24da30 100644 --- a/pkg/dynamiccontroller/dynamic_controller_test.go +++ b/pkg/dynamiccontroller/dynamic_controller_test.go @@ -69,7 +69,7 @@ func TestNewDynamicController(t *testing.T) { assert.NotNil(t, dc) assert.Equal(t, config, dc.config) - assert.NotNil(t, dc.queue) + assert.NotNil(t, dc.weightedQueues) assert.NotNil(t, dc.kubeClient) } @@ -104,14 +104,14 @@ func TestRegisterAndUnregisterGVK(t *testing.T) { }) // Register GVK - err := dc.StartServingGVK(context.Background(), gvr, handlerFunc) + err := dc.StartServingGVK(context.Background(), gvr, handlerFunc, 100) require.NoError(t, err) _, exists := dc.informers.Load(gvr) assert.True(t, exists) // Try to register again (should not fail) - err = dc.StartServingGVK(context.Background(), gvr, handlerFunc) + err = dc.StartServingGVK(context.Background(), gvr, handlerFunc, 100) assert.NoError(t, err) // Unregister GVK @@ -125,6 +125,7 @@ func TestRegisterAndUnregisterGVK(t *testing.T) { } func TestEnqueueObject(t *testing.T) { + queueWeight := 100 logger := noopLogger() client := setupFakeClient() dc := NewDynamicController(logger, Config{}, client) @@ -132,9 +133,10 @@ func TestEnqueueObject(t *testing.T) { obj := &unstructured.Unstructured{} obj.SetName("test-object") obj.SetNamespace("default") - obj.SetGroupVersionKind(schema.GroupVersionKind{Group: "test", Version: "v1", Kind: "Test"}) + err := unstructured.SetNestedField(obj.Object, int64(queueWeight), "spec", "weight") + require.NoError(t, err) dc.enqueueObject(obj, "add") - assert.Equal(t, 1, dc.queue.Len()) + assert.Equal(t, 1, dc.weightedQueues[queueWeight].queue.Len()) } diff --git a/pkg/metadata/groupversion.go b/pkg/metadata/groupversion.go index 458aa94c..89c0f004 100644 --- a/pkg/metadata/groupversion.go +++ b/pkg/metadata/groupversion.go @@ -71,7 +71,7 @@ func GetResourceGraphDefinitionInstanceGVK(group, apiVersion, kind string) schem func GetResourceGraphDefinitionInstanceGVR(group, apiVersion, kind string) schema.GroupVersionResource { pluralKind := flect.Pluralize(strings.ToLower(kind)) return schema.GroupVersionResource{ - Group: fmt.Sprintf("%s.%s", pluralKind, group), + Group: group, Version: apiVersion, Resource: pluralKind, } From 9fec75fb5d466d1d04b61fbc1d7e4270e1fc0d9f Mon Sep 17 00:00:00 2001 From: Nic George Date: Fri, 14 Feb 2025 09:03:56 -0700 Subject: [PATCH 02/10] lint fixes --- pkg/dynamiccontroller/dynamic_controller.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/dynamiccontroller/dynamic_controller.go b/pkg/dynamiccontroller/dynamic_controller.go index 1dc1531c..84d94fe8 100644 --- a/pkg/dynamiccontroller/dynamic_controller.go +++ b/pkg/dynamiccontroller/dynamic_controller.go @@ -161,8 +161,8 @@ func NewDynamicController( kubeClient: kubeClient, weightedQueues: make(map[int]*WeightedQueue), - gvrWeights: make(map[schema.GroupVersionResource]int), - log: logger, + gvrWeights: make(map[schema.GroupVersionResource]int), + log: logger, // pass version and pod id from env } @@ -649,7 +649,6 @@ func (dc *DynamicController) StopServiceGVK(ctx context.Context, gvr schema.Grou // Unregister the handler if any dc.handlers.Delete(gvr) - gvrCount.Dec() // Clean up any pending items in the queue for this GVR // NOTE(a-hilaly): This is a bit heavy.. maybe we can find a better way to do this. From 722e02941b460044ab20f0a09e9ef46e84cb9cf5 Mon Sep 17 00:00:00 2001 From: Gonfidel Date: Sat, 15 Feb 2025 04:21:36 +0000 Subject: [PATCH 03/10] merge main --- .devcontainer/devcontainer.json | 38 +++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) create mode 100644 .devcontainer/devcontainer.json diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json new file mode 100644 index 00000000..91225f95 --- /dev/null +++ b/.devcontainer/devcontainer.json @@ -0,0 +1,38 @@ +// For format details, see https://aka.ms/devcontainer.json. For config options, see the +// README at: https://github.com/devcontainers/templates/tree/main/src/go +{ + "name": "Go", + // Or use a Dockerfile or Docker Compose file. More info: https://containers.dev/guide/dockerfile + "image": "mcr.microsoft.com/devcontainers/go:1-1.23-bookworm", + "features": { + "ghcr.io/devcontainers/features/docker-in-docker": { + "moby": "false" + }, + "ghcr.io/devcontainers/features/kubectl-helm-minikube:1": {}, + "ghcr.io/rio/features/k3d:1": {}, + "ghcr.io/rio/features/kustomize:1": {}, + "ghcr.io/audacioustux/devcontainers/k9s:1": {}, + "ghcr.io/audacioustux/devcontainers/kubebuilder:1": {}, + "ghcr.io/xfrancois/devcontainers-features/velero:1": {}, + "ghcr.io/devcontainers-extra/features/argo-cd:1": {}, + "ghcr.io/devcontainers-extra/features/argo-workflows:1": {}, + "ghcr.io/devcontainers-extra/features/kubectx-kubens:1": {}, + "ghcr.io/eitsupi/devcontainer-features/nushell:0": {} + }, + "postCreateCommand": "k3d cluster create internal" + + // Features to add to the dev container. More info: https://containers.dev/features. + // "features": {}, + + // Use 'forwardPorts' to make a list of ports inside the container available locally. + // "forwardPorts": [], + + // Use 'postCreateCommand' to run commands after the container is created. + // "postCreateCommand": "go version", + + // Configure tool-specific properties. + // "customizations": {}, + + // Uncomment to connect as root instead. More info: https://aka.ms/dev-containers-non-root. + // "remoteUser": "root" +} From f2c8c4d24b5a039ac05f4c84db45eec4de81649e Mon Sep 17 00:00:00 2001 From: Gonfidel Date: Fri, 14 Feb 2025 22:05:36 -0700 Subject: [PATCH 04/10] testing completed --- .devcontainer/devcontainer.json | 38 ------------------- pkg/dynamiccontroller/dynamic_controller.go | 25 +++++++++--- .../dynamic_controller_test.go | 9 ++--- 3 files changed, 23 insertions(+), 49 deletions(-) delete mode 100644 .devcontainer/devcontainer.json diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json deleted file mode 100644 index 91225f95..00000000 --- a/.devcontainer/devcontainer.json +++ /dev/null @@ -1,38 +0,0 @@ -// For format details, see https://aka.ms/devcontainer.json. For config options, see the -// README at: https://github.com/devcontainers/templates/tree/main/src/go -{ - "name": "Go", - // Or use a Dockerfile or Docker Compose file. More info: https://containers.dev/guide/dockerfile - "image": "mcr.microsoft.com/devcontainers/go:1-1.23-bookworm", - "features": { - "ghcr.io/devcontainers/features/docker-in-docker": { - "moby": "false" - }, - "ghcr.io/devcontainers/features/kubectl-helm-minikube:1": {}, - "ghcr.io/rio/features/k3d:1": {}, - "ghcr.io/rio/features/kustomize:1": {}, - "ghcr.io/audacioustux/devcontainers/k9s:1": {}, - "ghcr.io/audacioustux/devcontainers/kubebuilder:1": {}, - "ghcr.io/xfrancois/devcontainers-features/velero:1": {}, - "ghcr.io/devcontainers-extra/features/argo-cd:1": {}, - "ghcr.io/devcontainers-extra/features/argo-workflows:1": {}, - "ghcr.io/devcontainers-extra/features/kubectx-kubens:1": {}, - "ghcr.io/eitsupi/devcontainer-features/nushell:0": {} - }, - "postCreateCommand": "k3d cluster create internal" - - // Features to add to the dev container. More info: https://containers.dev/features. - // "features": {}, - - // Use 'forwardPorts' to make a list of ports inside the container available locally. - // "forwardPorts": [], - - // Use 'postCreateCommand' to run commands after the container is created. - // "postCreateCommand": "go version", - - // Configure tool-specific properties. - // "customizations": {}, - - // Uncomment to connect as root instead. More info: https://aka.ms/dev-containers-non-root. - // "remoteUser": "root" -} diff --git a/pkg/dynamiccontroller/dynamic_controller.go b/pkg/dynamiccontroller/dynamic_controller.go index 84d94fe8..c303902a 100644 --- a/pkg/dynamiccontroller/dynamic_controller.go +++ b/pkg/dynamiccontroller/dynamic_controller.go @@ -102,8 +102,11 @@ type Config struct { // of each GVR. Weighted queues provide a flexible way for the DynamicController (DC) to prioritize // GVRs based on their weights. type WeightedQueue struct { + // queue items will be prioritized based on GVR weight, between 1 and 1000 weight int - queue workqueue.RateLimitingInterface + // queues are the workqueue used to process items + queue workqueue.RateLimitingInterface + // a set of each gvr associated with the queue gvrSet map[schema.GroupVersionResource]struct{} } @@ -137,8 +140,10 @@ type DynamicController struct { // weight of gvr when queued, between 1 and 1000 gvrWeights map[schema.GroupVersionResource]int + // mutex for weightedQueues and gvrWeights + mu sync.RWMutex + log logr.Logger - mu sync.RWMutex } type Handler func(ctx context.Context, req ctrl.Request) error @@ -148,6 +153,10 @@ type informerWrapper struct { shutdown func() } +var ( + defaultQueueWeight = 100 +) + // NewDynamicController creates a new DynamicController instance. func NewDynamicController( log logr.Logger, @@ -166,7 +175,7 @@ func NewDynamicController( // pass version and pod id from env } - dc.ensureWeightedQueue(100, schema.GroupVersionResource{}) + dc.ensureWeightedQueue(defaultQueueWeight, schema.GroupVersionResource{}) return dc } @@ -174,7 +183,11 @@ func NewDynamicController( func (dc *DynamicController) getWeightByGVR(gvr schema.GroupVersionResource) int { dc.mu.RLock() defer dc.mu.RUnlock() - return int(dc.gvrWeights[gvr]) + if weight, ok := dc.gvrWeights[gvr]; ok { + return weight + } + + return defaultQueueWeight } func (dc *DynamicController) setWeightByGVR(gvr schema.GroupVersionResource, weight int) { @@ -274,7 +287,7 @@ func (dc *DynamicController) WaitForInformersSync(stopCh <-chan struct{}) bool { func (dc *DynamicController) shutdownQueues() { dc.log.Info("Shutting down dynamic controller queues") - for key, _ := range dc.weightedQueues { + for key := range dc.weightedQueues { dc.log.Info("Shutting down weighted queue: %d", key) dc.weightedQueues[key].queue.ShutDown() } @@ -618,7 +631,7 @@ func (dc *DynamicController) StartServingGVK(ctx context.Context, gvr schema.Gro func (dc *DynamicController) StopServiceGVK(ctx context.Context, gvr schema.GroupVersionResource) error { dc.log.Info("Unregistering GVK", "gvr", gvr) - // Remove gvr from weight map + // Remove gvr from weight map and cleanup queue if no GVRs are assigned weight := dc.getWeightByGVR(gvr) dc.deleteWeightByGVR(gvr) dc.deleteGVRFromQueue(weight, gvr) diff --git a/pkg/dynamiccontroller/dynamic_controller_test.go b/pkg/dynamiccontroller/dynamic_controller_test.go index ee24da30..79d9abe9 100644 --- a/pkg/dynamiccontroller/dynamic_controller_test.go +++ b/pkg/dynamiccontroller/dynamic_controller_test.go @@ -104,14 +104,14 @@ func TestRegisterAndUnregisterGVK(t *testing.T) { }) // Register GVK - err := dc.StartServingGVK(context.Background(), gvr, handlerFunc, 100) + err := dc.StartServingGVK(context.Background(), gvr, handlerFunc, defaultQueueWeight) require.NoError(t, err) _, exists := dc.informers.Load(gvr) assert.True(t, exists) // Try to register again (should not fail) - err = dc.StartServingGVK(context.Background(), gvr, handlerFunc, 100) + err = dc.StartServingGVK(context.Background(), gvr, handlerFunc, defaultQueueWeight) assert.NoError(t, err) // Unregister GVK @@ -125,7 +125,6 @@ func TestRegisterAndUnregisterGVK(t *testing.T) { } func TestEnqueueObject(t *testing.T) { - queueWeight := 100 logger := noopLogger() client := setupFakeClient() dc := NewDynamicController(logger, Config{}, client) @@ -133,10 +132,10 @@ func TestEnqueueObject(t *testing.T) { obj := &unstructured.Unstructured{} obj.SetName("test-object") obj.SetNamespace("default") - err := unstructured.SetNestedField(obj.Object, int64(queueWeight), "spec", "weight") + err := unstructured.SetNestedField(obj.Object, int64(defaultQueueWeight), "spec", "weight") require.NoError(t, err) dc.enqueueObject(obj, "add") - assert.Equal(t, 1, dc.weightedQueues[queueWeight].queue.Len()) + assert.Equal(t, 1, dc.weightedQueues[defaultQueueWeight].queue.Len()) } From 4f05ca5d6eb61f8ee4b6d36acee05b9623b1d506 Mon Sep 17 00:00:00 2001 From: Gonfidel Date: Sat, 15 Feb 2025 00:26:42 -0700 Subject: [PATCH 05/10] resolve item processing bug --- pkg/dynamiccontroller/dynamic_controller.go | 104 +++++++++++--------- 1 file changed, 56 insertions(+), 48 deletions(-) diff --git a/pkg/dynamiccontroller/dynamic_controller.go b/pkg/dynamiccontroller/dynamic_controller.go index c303902a..5f834e93 100644 --- a/pkg/dynamiccontroller/dynamic_controller.go +++ b/pkg/dynamiccontroller/dynamic_controller.go @@ -175,51 +175,62 @@ func NewDynamicController( // pass version and pod id from env } - dc.ensureWeightedQueue(defaultQueueWeight, schema.GroupVersionResource{}) + dc.ensureWeightedQueue(defaultQueueWeight) + dc.setGVRWeight(schema.GroupVersionResource{}, defaultQueueWeight) return dc } -func (dc *DynamicController) getWeightByGVR(gvr schema.GroupVersionResource) int { +// getGVRWeight retrieves the weight for a given GroupVersionResource (GVR). If the GVR +// exists in the `gvrWeights` map, its associated weight is returned. Otherwise, a default +// weight is returned. +func (dc *DynamicController) getGVRWeight(gvr schema.GroupVersionResource) int { dc.mu.RLock() defer dc.mu.RUnlock() - if weight, ok := dc.gvrWeights[gvr]; ok { + + if weight, exists := dc.gvrWeights[gvr]; exists { return weight } return defaultQueueWeight } -func (dc *DynamicController) setWeightByGVR(gvr schema.GroupVersionResource, weight int) { +// setGVRWeight sets the weight for a given GroupVersionResource (GVR) and updates the +// corresponding weighted queue. +func (dc *DynamicController) setGVRWeight(gvr schema.GroupVersionResource, weight int) { dc.mu.Lock() defer dc.mu.Unlock() + dc.gvrWeights[gvr] = weight + dc.weightedQueues[weight].gvrSet[gvr] = struct{}{} } -func (dc *DynamicController) deleteWeightByGVR(gvr schema.GroupVersionResource) { +// deleteGVRWeight removes the specified GroupVersionResource (GVR) from the +// DynamicController's weight mapping and queues gvr set. If the queue has no +// remaining GVRs registered, the queue is removed +func (dc *DynamicController) deleteGVRWeight(gvr schema.GroupVersionResource) { dc.mu.Lock() defer dc.mu.Unlock() - delete(dc.gvrWeights, gvr) -} -func (dc *DynamicController) deleteGVRFromQueue(weight int, gvr schema.GroupVersionResource) { - dc.mu.Lock() - defer dc.mu.Unlock() - delete(dc.weightedQueues[weight].gvrSet, gvr) - if len(dc.weightedQueues[weight].gvrSet) < 1 { - dc.weightedQueues[weight].queue.ShutDown() + weight := dc.gvrWeights[gvr] + wq := dc.weightedQueues[weight] + + delete(dc.gvrWeights, gvr) + delete(wq.gvrSet, gvr) + if len(wq.gvrSet) < 1 { + wq.queue.ShutDown() delete(dc.weightedQueues, weight) } } // Ensure weighted queue exists with the specified weight -func (dc *DynamicController) ensureWeightedQueue(weight int, gvr schema.GroupVersionResource) bool { +func (dc *DynamicController) ensureWeightedQueue(weight int) bool { if weight < 1 || weight > 1000 { dc.log.Error(nil, "weight should be between 1 and 1000", "weight", weight) return false } - if _, ok := dc.weightedQueues[weight]; !ok { + if _, exists := dc.weightedQueues[weight]; !exists { dc.mu.Lock() defer dc.mu.Unlock() // TODO(a-hilaly): Make the queue size configurable. @@ -231,13 +242,6 @@ func (dc *DynamicController) ensureWeightedQueue(weight int, gvr schema.GroupVer ), fmt.Sprintf("weight-%d-queue", weight)), gvrSet: map[schema.GroupVersionResource]struct{}{}, } - dc.weightedQueues[weight].gvrSet[gvr] = struct{}{} - } else { - dc.mu.Lock() - defer dc.mu.Unlock() - if _, ok := dc.weightedQueues[weight].gvrSet[gvr]; !ok { - dc.weightedQueues[weight].gvrSet[gvr] = struct{}{} - } } return true @@ -285,6 +289,7 @@ func (dc *DynamicController) WaitForInformersSync(stopCh <-chan struct{}) bool { return cache.WaitForCacheSync(stopCh, dc.AllInformerHaveSynced) } +// shutdownQueues shuts down all the weighted queues managed by the DynamicController. func (dc *DynamicController) shutdownQueues() { dc.log.Info("Shutting down dynamic controller queues") for key := range dc.weightedQueues { @@ -323,33 +328,39 @@ func (dc *DynamicController) worker(ctx context.Context) { } } -// Select queue propotionally based on weight -// TODO(n-george): review this logic +// selectQueueByWeight select a queue based on weight * length proportionally. +// A queue with a weight of 200 will be selected twice as often assuming an even +// number of events are distributed between the queues func (dc *DynamicController) selectQueueByWeight() *WeightedQueue { - var totalWeight int - var minQueue *WeightedQueue + var ( + totalWeight int = 0 + maxWeight int = 0 + selectedQueue *WeightedQueue + activeQueues = make([]*WeightedQueue, 0, len(dc.weightedQueues)) + ) for _, wq := range dc.weightedQueues { - totalWeight += wq.weight - if minQueue == nil || wq.weight < minQueue.weight { - minQueue = wq + if wq.queue.Len() > 0 { + totalWeight += wq.weight + activeQueues = append(activeQueues, wq) } } - if totalWeight == 0 { - return nil + if totalWeight == 0 || len(activeQueues) == 0 { + return dc.weightedQueues[defaultQueueWeight] } - var selectedQueue *WeightedQueue - minRatio := float64(^uint(0)) - - for _, wq := range dc.weightedQueues { - queueRatio := float64(wq.queue.Len()) / float64(wq.weight) - if queueRatio < minRatio { - minRatio = queueRatio - selectedQueue = wq + for _, q := range activeQueues { + effectiveWeight := q.weight * q.queue.Len() + if effectiveWeight > maxWeight { + maxWeight = effectiveWeight + selectedQueue = q + continue } - } + if effectiveWeight == maxWeight && q.weight > selectedQueue.weight { + selectedQueue = q + } + } return selectedQueue } @@ -357,7 +368,6 @@ func (dc *DynamicController) selectQueueByWeight() *WeightedQueue { // processNextWorkItem processes a single item from the queue. func (dc *DynamicController) processNextWorkItem(ctx context.Context) bool { weightedQueue := dc.selectQueueByWeight() - dc.log.V(1).Info("Processing item with specified weight", "weight", weightedQueue.weight) obj, shutdown := weightedQueue.queue.Get() if shutdown { @@ -540,7 +550,7 @@ func (dc *DynamicController) enqueueObject(obj interface{}, eventType string) { GVR: gvr, } - weight := dc.getWeightByGVR(gvr) + weight := dc.getGVRWeight(gvr) dc.log.V(1).Info("Enqueueing object", "objectIdentifiers", objectIdentifiers, @@ -557,10 +567,10 @@ func (dc *DynamicController) StartServingGVK(ctx context.Context, gvr schema.Gro dc.log.V(1).Info("Registering new GVK", "gvr", gvr) // Set the weight for the GVR and ensure the queue exists for that weight class - dc.setWeightByGVR(gvr, queueWeight) - if ok := dc.ensureWeightedQueue(queueWeight, gvr); !ok { - return fmt.Errorf("Failed to create or get weighted queue with weight: %d", queueWeight) + if ok := dc.ensureWeightedQueue(queueWeight); !ok { + return fmt.Errorf("failed to create or get weighted queue with weight: %d", queueWeight) } + dc.setGVRWeight(gvr, queueWeight) _, exists := dc.informers.Load(gvr) if exists { @@ -632,9 +642,7 @@ func (dc *DynamicController) StopServiceGVK(ctx context.Context, gvr schema.Grou dc.log.Info("Unregistering GVK", "gvr", gvr) // Remove gvr from weight map and cleanup queue if no GVRs are assigned - weight := dc.getWeightByGVR(gvr) - dc.deleteWeightByGVR(gvr) - dc.deleteGVRFromQueue(weight, gvr) + dc.deleteGVRWeight(gvr) // Retrieve the informer informerObj, ok := dc.informers.Load(gvr) From 0af07f6c400d4d98f712c5a01755318af7593de2 Mon Sep 17 00:00:00 2001 From: Gonfidel Date: Sat, 15 Feb 2025 00:28:39 -0700 Subject: [PATCH 06/10] readd missing test logic --- pkg/dynamiccontroller/dynamic_controller_test.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/dynamiccontroller/dynamic_controller_test.go b/pkg/dynamiccontroller/dynamic_controller_test.go index 79d9abe9..d6e8fb90 100644 --- a/pkg/dynamiccontroller/dynamic_controller_test.go +++ b/pkg/dynamiccontroller/dynamic_controller_test.go @@ -70,6 +70,7 @@ func TestNewDynamicController(t *testing.T) { assert.NotNil(t, dc) assert.Equal(t, config, dc.config) assert.NotNil(t, dc.weightedQueues) + assert.NotNil(t, dc.weightedQueues[defaultQueueWeight].queue) assert.NotNil(t, dc.kubeClient) } From 9eb6e14282ddd3f24c6c9342f8d4f88829b851de Mon Sep 17 00:00:00 2001 From: Gonfidel Date: Sat, 15 Feb 2025 00:51:05 -0700 Subject: [PATCH 07/10] remediate linter errors related to workqueue deprecations --- pkg/dynamiccontroller/dynamic_controller.go | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/pkg/dynamiccontroller/dynamic_controller.go b/pkg/dynamiccontroller/dynamic_controller.go index 5f834e93..f65197ef 100644 --- a/pkg/dynamiccontroller/dynamic_controller.go +++ b/pkg/dynamiccontroller/dynamic_controller.go @@ -105,7 +105,7 @@ type WeightedQueue struct { // queue items will be prioritized based on GVR weight, between 1 and 1000 weight int // queues are the workqueue used to process items - queue workqueue.RateLimitingInterface + queue workqueue.TypedRateLimitingInterface[any] // a set of each gvr associated with the queue gvrSet map[schema.GroupVersionResource]struct{} } @@ -236,10 +236,13 @@ func (dc *DynamicController) ensureWeightedQueue(weight int) bool { // TODO(a-hilaly): Make the queue size configurable. dc.weightedQueues[weight] = &WeightedQueue{ weight: weight, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter( - workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 1000*time.Second), - &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, - ), fmt.Sprintf("weight-%d-queue", weight)), + queue: workqueue.NewNamedRateLimitingQueue( + workqueue.NewTypedMaxOfRateLimiter( + workqueue.NewTypedItemExponentialFailureRateLimiter[any](200*time.Millisecond, 1000*time.Second), + &workqueue.TypedBucketRateLimiter[any]{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, + ), + fmt.Sprintf("weight-%d-queue", weight), + ), gvrSet: map[schema.GroupVersionResource]struct{}{}, } } @@ -333,10 +336,10 @@ func (dc *DynamicController) worker(ctx context.Context) { // number of events are distributed between the queues func (dc *DynamicController) selectQueueByWeight() *WeightedQueue { var ( - totalWeight int = 0 - maxWeight int = 0 + totalWeight int = 0 + maxWeight int = 0 selectedQueue *WeightedQueue - activeQueues = make([]*WeightedQueue, 0, len(dc.weightedQueues)) + activeQueues = make([]*WeightedQueue, 0, len(dc.weightedQueues)) ) for _, wq := range dc.weightedQueues { @@ -360,7 +363,7 @@ func (dc *DynamicController) selectQueueByWeight() *WeightedQueue { if effectiveWeight == maxWeight && q.weight > selectedQueue.weight { selectedQueue = q } - } + } return selectedQueue } From 7f70991a5fd222d57b78de19f16f003c802f6641 Mon Sep 17 00:00:00 2001 From: Gonfidel Date: Sat, 15 Feb 2025 00:58:23 -0700 Subject: [PATCH 08/10] test shutdown logging --- pkg/dynamiccontroller/dynamic_controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/dynamiccontroller/dynamic_controller.go b/pkg/dynamiccontroller/dynamic_controller.go index f65197ef..d842c893 100644 --- a/pkg/dynamiccontroller/dynamic_controller.go +++ b/pkg/dynamiccontroller/dynamic_controller.go @@ -296,7 +296,7 @@ func (dc *DynamicController) WaitForInformersSync(stopCh <-chan struct{}) bool { func (dc *DynamicController) shutdownQueues() { dc.log.Info("Shutting down dynamic controller queues") for key := range dc.weightedQueues { - dc.log.Info("Shutting down weighted queue: %d", key) + dc.log.Info("Shutting down weighted queue", "key", key) dc.weightedQueues[key].queue.ShutDown() } } From 6d8f1ff7ca117861061b16a8d303cfd1c34a8e9f Mon Sep 17 00:00:00 2001 From: Gonfidel Date: Sat, 15 Feb 2025 23:36:06 +0000 Subject: [PATCH 09/10] add read lock to process item for concurrent workers, protect default queue --- pkg/dynamiccontroller/dynamic_controller.go | 25 +++++++++++++++------ pkg/metadata/labels.go | 4 ++-- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/pkg/dynamiccontroller/dynamic_controller.go b/pkg/dynamiccontroller/dynamic_controller.go index d842c893..80ae9d14 100644 --- a/pkg/dynamiccontroller/dynamic_controller.go +++ b/pkg/dynamiccontroller/dynamic_controller.go @@ -175,7 +175,6 @@ func NewDynamicController( // pass version and pod id from env } - dc.ensureWeightedQueue(defaultQueueWeight) dc.setGVRWeight(schema.GroupVersionResource{}, defaultQueueWeight) return dc @@ -198,6 +197,8 @@ func (dc *DynamicController) getGVRWeight(gvr schema.GroupVersionResource) int { // setGVRWeight sets the weight for a given GroupVersionResource (GVR) and updates the // corresponding weighted queue. func (dc *DynamicController) setGVRWeight(gvr schema.GroupVersionResource, weight int) { + dc.ensureWeightedQueue(weight) + dc.mu.Lock() defer dc.mu.Unlock() @@ -215,6 +216,11 @@ func (dc *DynamicController) deleteGVRWeight(gvr schema.GroupVersionResource) { weight := dc.gvrWeights[gvr] wq := dc.weightedQueues[weight] + if weight == defaultQueueWeight { + dc.log.Error(nil, "cannot delete default queue", "weight", weight) + return + } + delete(dc.gvrWeights, gvr) delete(wq.gvrSet, gvr) if len(wq.gvrSet) < 1 { @@ -295,9 +301,9 @@ func (dc *DynamicController) WaitForInformersSync(stopCh <-chan struct{}) bool { // shutdownQueues shuts down all the weighted queues managed by the DynamicController. func (dc *DynamicController) shutdownQueues() { dc.log.Info("Shutting down dynamic controller queues") - for key := range dc.weightedQueues { + for key, wq := range dc.weightedQueues { dc.log.Info("Shutting down weighted queue", "key", key) - dc.weightedQueues[key].queue.ShutDown() + wq.queue.ShutDown() } } @@ -335,6 +341,8 @@ func (dc *DynamicController) worker(ctx context.Context) { // A queue with a weight of 200 will be selected twice as often assuming an even // number of events are distributed between the queues func (dc *DynamicController) selectQueueByWeight() *WeightedQueue { + startTime := time.Now() + var ( totalWeight int = 0 maxWeight int = 0 @@ -342,6 +350,9 @@ func (dc *DynamicController) selectQueueByWeight() *WeightedQueue { activeQueues = make([]*WeightedQueue, 0, len(dc.weightedQueues)) ) + dc.mu.RLock() + defer dc.mu.RUnlock() + for _, wq := range dc.weightedQueues { if wq.queue.Len() > 0 { totalWeight += wq.weight @@ -365,6 +376,9 @@ func (dc *DynamicController) selectQueueByWeight() *WeightedQueue { } } + duration := time.Since(startTime) + dc.log.V(1).Info("Time to select queue", "duration", duration.String(), "weight", selectedQueue.weight) + return selectedQueue } @@ -374,7 +388,7 @@ func (dc *DynamicController) processNextWorkItem(ctx context.Context) bool { obj, shutdown := weightedQueue.queue.Get() if shutdown { - return false + return true } defer weightedQueue.queue.Done(obj) @@ -570,9 +584,6 @@ func (dc *DynamicController) StartServingGVK(ctx context.Context, gvr schema.Gro dc.log.V(1).Info("Registering new GVK", "gvr", gvr) // Set the weight for the GVR and ensure the queue exists for that weight class - if ok := dc.ensureWeightedQueue(queueWeight); !ok { - return fmt.Errorf("failed to create or get weighted queue with weight: %d", queueWeight) - } dc.setGVRWeight(gvr, queueWeight) _, exists := dc.informers.Load(gvr) diff --git a/pkg/metadata/labels.go b/pkg/metadata/labels.go index 34b106ef..8cb4911f 100644 --- a/pkg/metadata/labels.go +++ b/pkg/metadata/labels.go @@ -115,8 +115,8 @@ func (gl GenericLabeler) Copy() map[string]string { // ResourceGraphDefinitionLabel and ResourceGraphDefinitionIDLabel labels on a resource. func NewResourceGraphDefinitionLabeler(rgMeta metav1.Object) GenericLabeler { return map[string]string{ - ResourceGraphDefinitionIDLabel: string(rgMeta.GetUID()), - ResourceGraphDefinitionNameLabel: rgMeta.GetName(), + ResourceGraphDefinitionIDLabel: string(rgMeta.GetUID()), + ResourceGraphDefinitionNameLabel: rgMeta.GetName(), } } From d6a83b7829c3844bb0aceabc80cfdd13c87af717 Mon Sep 17 00:00:00 2001 From: Gonfidel Date: Sat, 15 Feb 2025 23:42:38 +0000 Subject: [PATCH 10/10] fmt --- pkg/metadata/labels.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/metadata/labels.go b/pkg/metadata/labels.go index 8cb4911f..34b106ef 100644 --- a/pkg/metadata/labels.go +++ b/pkg/metadata/labels.go @@ -115,8 +115,8 @@ func (gl GenericLabeler) Copy() map[string]string { // ResourceGraphDefinitionLabel and ResourceGraphDefinitionIDLabel labels on a resource. func NewResourceGraphDefinitionLabeler(rgMeta metav1.Object) GenericLabeler { return map[string]string{ - ResourceGraphDefinitionIDLabel: string(rgMeta.GetUID()), - ResourceGraphDefinitionNameLabel: rgMeta.GetName(), + ResourceGraphDefinitionIDLabel: string(rgMeta.GetUID()), + ResourceGraphDefinitionNameLabel: rgMeta.GetName(), } }