diff --git a/api/v1alpha1/types.go b/api/v1alpha1/types.go index 77aa01f7..7a55582e 100644 --- a/api/v1alpha1/types.go +++ b/api/v1alpha1/types.go @@ -43,6 +43,13 @@ type ResourceGraphDefinitionSpec struct { // // +kubebuilder:validation:Optional DefaultServiceAccounts map[string]string `json:"defaultServiceAccounts,omitempty"` + // The weight used to determine priority when queuing reconciliation. + // + // +kubebuilder:default=100 + // +kubebuilder:validation:Maximum=1000 + // +kubebuilder:validation:Minimum=1 + // +kubebuilder:validation:Optional + Weight int `json:"weight,omitempty"` } // Schema represents the attributes that define an instance of diff --git a/config/crd/bases/kro.run_resourcegraphdefinitions.yaml b/config/crd/bases/kro.run_resourcegraphdefinitions.yaml index b26d4d63..9cd49d92 100644 --- a/config/crd/bases/kro.run_resourcegraphdefinitions.yaml +++ b/config/crd/bases/kro.run_resourcegraphdefinitions.yaml @@ -145,6 +145,12 @@ spec: - apiVersion - kind type: object + weight: + default: 100 + description: The weight used to determine priority when queuing reconciliation. + maximum: 1000 + minimum: 1 + type: integer required: - schema type: object diff --git a/examples/kubernetes/webapp/rg.yaml b/examples/kubernetes/webapp/rg.yaml index dbe28d1d..8556f241 100644 --- a/examples/kubernetes/webapp/rg.yaml +++ b/examples/kubernetes/webapp/rg.yaml @@ -3,6 +3,7 @@ kind: ResourceGraphDefinition metadata: name: webapp.kro.run spec: + weight: 150 schema: apiVersion: v1alpha1 kind: WebApp @@ -68,7 +69,7 @@ spec: - id: service includeWhen: - - ${schema.spec.service.enabled} + - ${schema.spec.service.enabled} template: apiVersion: v1 kind: Service @@ -86,7 +87,7 @@ spec: - id: ingress includeWhen: - - ${schema.spec.ingress.enabled} + - ${schema.spec.ingress.enabled} template: apiVersion: networking.k8s.io/v1 kind: Ingress diff --git a/helm/crds/kro.run_resourcegraphdefinitions.yaml b/helm/crds/kro.run_resourcegraphdefinitions.yaml index b26d4d63..9cd49d92 100644 --- a/helm/crds/kro.run_resourcegraphdefinitions.yaml +++ b/helm/crds/kro.run_resourcegraphdefinitions.yaml @@ -145,6 +145,12 @@ spec: - apiVersion - kind type: object + weight: + default: 100 + description: The weight used to determine priority when queuing reconciliation. + maximum: 1000 + minimum: 1 + type: integer required: - schema type: object diff --git a/pkg/controller/instance/controller.go b/pkg/controller/instance/controller.go index b032a160..b333b902 100644 --- a/pkg/controller/instance/controller.go +++ b/pkg/controller/instance/controller.go @@ -47,6 +47,9 @@ type ReconcileConfig struct { // TODO(a-hilaly): need to define think the different deletion policies we need to // support. DeletionPolicy string + + // Instance controller events are queued in a weighted queue. + Weight int } // Controller manages the reconciliation of a single instance of a ResourceGraphDefinition, diff --git a/pkg/controller/resourcegraphdefinition/controller_reconcile.go b/pkg/controller/resourcegraphdefinition/controller_reconcile.go index b4a11902..763308df 100644 --- a/pkg/controller/resourcegraphdefinition/controller_reconcile.go +++ b/pkg/controller/resourcegraphdefinition/controller_reconcile.go @@ -63,7 +63,7 @@ func (r *ResourceGraphDefinitionReconciler) reconcileResourceGraphDefinition(ctx controller := r.setupMicroController(gvr, processedRGD, rgd.Spec.DefaultServiceAccounts, graphExecLabeler) log.V(1).Info("reconciling resource graph definition micro controller") - if err := r.reconcileResourceGraphDefinitionMicroController(ctx, &gvr, controller.Reconcile); err != nil { + if err := r.reconcileResourceGraphDefinitionMicroController(ctx, &gvr, controller.Reconcile, rgd.Spec.Weight); err != nil { return processedRGD.TopologicalOrder, resourcesInfo, err } @@ -141,8 +141,8 @@ func (r *ResourceGraphDefinitionReconciler) reconcileResourceGraphDefinitionCRD( } // reconcileResourceGraphDefinitionMicroController starts the microcontroller for handling the resources -func (r *ResourceGraphDefinitionReconciler) reconcileResourceGraphDefinitionMicroController(ctx context.Context, gvr *schema.GroupVersionResource, handler dynamiccontroller.Handler) error { - err := r.dynamicController.StartServingGVK(ctx, *gvr, handler) +func (r *ResourceGraphDefinitionReconciler) reconcileResourceGraphDefinitionMicroController(ctx context.Context, gvr *schema.GroupVersionResource, handler dynamiccontroller.Handler, weight int) error { + err := r.dynamicController.StartServingGVK(ctx, *gvr, handler, weight) if err != nil { return newMicroControllerError(err) } diff --git a/pkg/dynamiccontroller/dynamic_controller.go b/pkg/dynamiccontroller/dynamic_controller.go index ec6fa821..d842c893 100644 --- a/pkg/dynamiccontroller/dynamic_controller.go +++ b/pkg/dynamiccontroller/dynamic_controller.go @@ -98,6 +98,18 @@ type Config struct { ShutdownTimeout time.Duration } +// DynamicController (DC) manages multiple GVRs and needs a way to prioritize the importance +// of each GVR. Weighted queues provide a flexible way for the DynamicController (DC) to prioritize +// GVRs based on their weights. +type WeightedQueue struct { + // queue items will be prioritized based on GVR weight, between 1 and 1000 + weight int + // queues are the workqueue used to process items + queue workqueue.TypedRateLimitingInterface[any] + // a set of each gvr associated with the queue + gvrSet map[schema.GroupVersionResource]struct{} +} + // DynamicController (DC) is a single controller capable of managing multiple different // kubernetes resources (GVRs) in parallel. It can safely start watching new // resources and stop watching others at runtime - hence the term "dynamic". This @@ -122,8 +134,14 @@ type DynamicController struct { // handler is responsible for managing a specific GVR. handlers sync.Map - // queue is the workqueue used to process items - queue workqueue.RateLimitingInterface + // queues are the workqueue used to process items + weightedQueues map[int]*WeightedQueue + + // weight of gvr when queued, between 1 and 1000 + gvrWeights map[schema.GroupVersionResource]int + + // mutex for weightedQueues and gvrWeights + mu sync.RWMutex log logr.Logger } @@ -135,6 +153,10 @@ type informerWrapper struct { shutdown func() } +var ( + defaultQueueWeight = 100 +) + // NewDynamicController creates a new DynamicController instance. func NewDynamicController( log logr.Logger, @@ -146,18 +168,88 @@ func NewDynamicController( dc := &DynamicController{ config: config, kubeClient: kubeClient, - // TODO(a-hilaly): Make the queue size configurable. - queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter( - workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 1000*time.Second), - &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, - ), "dynamic-controller-queue"), - log: logger, + + weightedQueues: make(map[int]*WeightedQueue), + gvrWeights: make(map[schema.GroupVersionResource]int), + log: logger, // pass version and pod id from env } + dc.ensureWeightedQueue(defaultQueueWeight) + dc.setGVRWeight(schema.GroupVersionResource{}, defaultQueueWeight) + return dc } +// getGVRWeight retrieves the weight for a given GroupVersionResource (GVR). If the GVR +// exists in the `gvrWeights` map, its associated weight is returned. Otherwise, a default +// weight is returned. +func (dc *DynamicController) getGVRWeight(gvr schema.GroupVersionResource) int { + dc.mu.RLock() + defer dc.mu.RUnlock() + + if weight, exists := dc.gvrWeights[gvr]; exists { + return weight + } + + return defaultQueueWeight +} + +// setGVRWeight sets the weight for a given GroupVersionResource (GVR) and updates the +// corresponding weighted queue. +func (dc *DynamicController) setGVRWeight(gvr schema.GroupVersionResource, weight int) { + dc.mu.Lock() + defer dc.mu.Unlock() + + dc.gvrWeights[gvr] = weight + dc.weightedQueues[weight].gvrSet[gvr] = struct{}{} +} + +// deleteGVRWeight removes the specified GroupVersionResource (GVR) from the +// DynamicController's weight mapping and queues gvr set. If the queue has no +// remaining GVRs registered, the queue is removed +func (dc *DynamicController) deleteGVRWeight(gvr schema.GroupVersionResource) { + dc.mu.Lock() + defer dc.mu.Unlock() + + weight := dc.gvrWeights[gvr] + wq := dc.weightedQueues[weight] + + delete(dc.gvrWeights, gvr) + delete(wq.gvrSet, gvr) + if len(wq.gvrSet) < 1 { + wq.queue.ShutDown() + delete(dc.weightedQueues, weight) + } +} + +// Ensure weighted queue exists with the specified weight +func (dc *DynamicController) ensureWeightedQueue(weight int) bool { + if weight < 1 || weight > 1000 { + dc.log.Error(nil, "weight should be between 1 and 1000", "weight", weight) + return false + } + + if _, exists := dc.weightedQueues[weight]; !exists { + dc.mu.Lock() + defer dc.mu.Unlock() + // TODO(a-hilaly): Make the queue size configurable. + dc.weightedQueues[weight] = &WeightedQueue{ + weight: weight, + queue: workqueue.NewNamedRateLimitingQueue( + workqueue.NewTypedMaxOfRateLimiter( + workqueue.NewTypedItemExponentialFailureRateLimiter[any](200*time.Millisecond, 1000*time.Second), + &workqueue.TypedBucketRateLimiter[any]{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, + ), + fmt.Sprintf("weight-%d-queue", weight), + ), + gvrSet: map[schema.GroupVersionResource]struct{}{}, + } + } + + return true +} + // AllInformerHaveSynced checks if all registered informers have synced, returns // true if they have. func (dc *DynamicController) AllInformerHaveSynced() bool { @@ -200,10 +292,19 @@ func (dc *DynamicController) WaitForInformersSync(stopCh <-chan struct{}) bool { return cache.WaitForCacheSync(stopCh, dc.AllInformerHaveSynced) } +// shutdownQueues shuts down all the weighted queues managed by the DynamicController. +func (dc *DynamicController) shutdownQueues() { + dc.log.Info("Shutting down dynamic controller queues") + for key := range dc.weightedQueues { + dc.log.Info("Shutting down weighted queue", "key", key) + dc.weightedQueues[key].queue.ShutDown() + } +} + // Run starts the DynamicController. func (dc *DynamicController) Run(ctx context.Context) error { defer utilruntime.HandleCrash() - defer dc.queue.ShutDown() + defer dc.shutdownQueues() dc.log.Info("Starting dynamic controller") defer dc.log.Info("Shutting down dynamic controller") @@ -230,26 +331,65 @@ func (dc *DynamicController) worker(ctx context.Context) { } } +// selectQueueByWeight select a queue based on weight * length proportionally. +// A queue with a weight of 200 will be selected twice as often assuming an even +// number of events are distributed between the queues +func (dc *DynamicController) selectQueueByWeight() *WeightedQueue { + var ( + totalWeight int = 0 + maxWeight int = 0 + selectedQueue *WeightedQueue + activeQueues = make([]*WeightedQueue, 0, len(dc.weightedQueues)) + ) + + for _, wq := range dc.weightedQueues { + if wq.queue.Len() > 0 { + totalWeight += wq.weight + activeQueues = append(activeQueues, wq) + } + } + + if totalWeight == 0 || len(activeQueues) == 0 { + return dc.weightedQueues[defaultQueueWeight] + } + + for _, q := range activeQueues { + effectiveWeight := q.weight * q.queue.Len() + if effectiveWeight > maxWeight { + maxWeight = effectiveWeight + selectedQueue = q + continue + } + if effectiveWeight == maxWeight && q.weight > selectedQueue.weight { + selectedQueue = q + } + } + + return selectedQueue +} + // processNextWorkItem processes a single item from the queue. func (dc *DynamicController) processNextWorkItem(ctx context.Context) bool { - obj, shutdown := dc.queue.Get() + weightedQueue := dc.selectQueueByWeight() + + obj, shutdown := weightedQueue.queue.Get() if shutdown { return false } - defer dc.queue.Done(obj) + defer weightedQueue.queue.Done(obj) - queueLength.Set(float64(dc.queue.Len())) + queueLength.Set(float64(weightedQueue.queue.Len())) item, ok := obj.(ObjectIdentifiers) if !ok { dc.log.Error(fmt.Errorf("expected ObjectIdentifiers in queue but got %#v", obj), "Invalid item in queue") - dc.queue.Forget(obj) + weightedQueue.queue.Forget(obj) return true } err := dc.syncFunc(ctx, item) if err == nil || apierrors.IsNotFound(err) { - dc.queue.Forget(obj) + weightedQueue.queue.Forget(obj) return true } @@ -260,25 +400,25 @@ func (dc *DynamicController) processNextWorkItem(ctx context.Context) bool { case *requeue.NoRequeue: dc.log.Error(typedErr, "Error syncing item, not requeuing", "item", item) requeueTotal.WithLabelValues(gvrKey, "no_requeue").Inc() - dc.queue.Forget(obj) + weightedQueue.queue.Forget(obj) case *requeue.RequeueNeeded: dc.log.V(1).Info("Requeue needed", "item", item, "error", typedErr) requeueTotal.WithLabelValues(gvrKey, "requeue").Inc() - dc.queue.Add(obj) // Add without rate limiting + weightedQueue.queue.Add(obj) // Add without rate limiting case *requeue.RequeueNeededAfter: dc.log.V(1).Info("Requeue needed after delay", "item", item, "error", typedErr, "delay", typedErr.Duration()) requeueTotal.WithLabelValues(gvrKey, "requeue_after").Inc() - dc.queue.AddAfter(obj, typedErr.Duration()) + weightedQueue.queue.AddAfter(obj, typedErr.Duration()) default: // Arriving here means we have an unexpected error, we should requeue the item // with rate limiting. requeueTotal.WithLabelValues(gvrKey, "rate_limited").Inc() - if dc.queue.NumRequeues(obj) < dc.config.QueueMaxRetries { + if weightedQueue.queue.NumRequeues(obj) < dc.config.QueueMaxRetries { dc.log.Error(err, "Error syncing item, requeuing with rate limit", "item", item) - dc.queue.AddRateLimited(obj) + weightedQueue.queue.AddRateLimited(obj) } else { dc.log.Error(err, "Dropping item from queue after max retries", "item", item) - dc.queue.Forget(obj) + weightedQueue.queue.Forget(obj) } } @@ -413,18 +553,28 @@ func (dc *DynamicController) enqueueObject(obj interface{}, eventType string) { GVR: gvr, } + weight := dc.getGVRWeight(gvr) + dc.log.V(1).Info("Enqueueing object", "objectIdentifiers", objectIdentifiers, - "eventType", eventType) + "eventType", eventType, + "weight", weight) informerEventsTotal.WithLabelValues(gvr.String(), eventType).Inc() - dc.queue.Add(objectIdentifiers) + + dc.weightedQueues[weight].queue.Add(objectIdentifiers) } // StartServingGVK registers a new GVK to the informers map safely. -func (dc *DynamicController) StartServingGVK(ctx context.Context, gvr schema.GroupVersionResource, handler Handler) error { +func (dc *DynamicController) StartServingGVK(ctx context.Context, gvr schema.GroupVersionResource, handler Handler, queueWeight int) error { dc.log.V(1).Info("Registering new GVK", "gvr", gvr) + // Set the weight for the GVR and ensure the queue exists for that weight class + if ok := dc.ensureWeightedQueue(queueWeight); !ok { + return fmt.Errorf("failed to create or get weighted queue with weight: %d", queueWeight) + } + dc.setGVRWeight(gvr, queueWeight) + _, exists := dc.informers.Load(gvr) if exists { // Even thought the informer is already registered, we should still @@ -494,6 +644,9 @@ func (dc *DynamicController) StartServingGVK(ctx context.Context, gvr schema.Gro func (dc *DynamicController) StopServiceGVK(ctx context.Context, gvr schema.GroupVersionResource) error { dc.log.Info("Unregistering GVK", "gvr", gvr) + // Remove gvr from weight map and cleanup queue if no GVRs are assigned + dc.deleteGVRWeight(gvr) + // Retrieve the informer informerObj, ok := dc.informers.Load(gvr) if !ok { diff --git a/pkg/dynamiccontroller/dynamic_controller_test.go b/pkg/dynamiccontroller/dynamic_controller_test.go index 556d99df..d6e8fb90 100644 --- a/pkg/dynamiccontroller/dynamic_controller_test.go +++ b/pkg/dynamiccontroller/dynamic_controller_test.go @@ -69,7 +69,8 @@ func TestNewDynamicController(t *testing.T) { assert.NotNil(t, dc) assert.Equal(t, config, dc.config) - assert.NotNil(t, dc.queue) + assert.NotNil(t, dc.weightedQueues) + assert.NotNil(t, dc.weightedQueues[defaultQueueWeight].queue) assert.NotNil(t, dc.kubeClient) } @@ -104,14 +105,14 @@ func TestRegisterAndUnregisterGVK(t *testing.T) { }) // Register GVK - err := dc.StartServingGVK(context.Background(), gvr, handlerFunc) + err := dc.StartServingGVK(context.Background(), gvr, handlerFunc, defaultQueueWeight) require.NoError(t, err) _, exists := dc.informers.Load(gvr) assert.True(t, exists) // Try to register again (should not fail) - err = dc.StartServingGVK(context.Background(), gvr, handlerFunc) + err = dc.StartServingGVK(context.Background(), gvr, handlerFunc, defaultQueueWeight) assert.NoError(t, err) // Unregister GVK @@ -132,9 +133,10 @@ func TestEnqueueObject(t *testing.T) { obj := &unstructured.Unstructured{} obj.SetName("test-object") obj.SetNamespace("default") - obj.SetGroupVersionKind(schema.GroupVersionKind{Group: "test", Version: "v1", Kind: "Test"}) + err := unstructured.SetNestedField(obj.Object, int64(defaultQueueWeight), "spec", "weight") + require.NoError(t, err) dc.enqueueObject(obj, "add") - assert.Equal(t, 1, dc.queue.Len()) + assert.Equal(t, 1, dc.weightedQueues[defaultQueueWeight].queue.Len()) } diff --git a/pkg/metadata/groupversion.go b/pkg/metadata/groupversion.go index 458aa94c..89c0f004 100644 --- a/pkg/metadata/groupversion.go +++ b/pkg/metadata/groupversion.go @@ -71,7 +71,7 @@ func GetResourceGraphDefinitionInstanceGVK(group, apiVersion, kind string) schem func GetResourceGraphDefinitionInstanceGVR(group, apiVersion, kind string) schema.GroupVersionResource { pluralKind := flect.Pluralize(strings.ToLower(kind)) return schema.GroupVersionResource{ - Group: fmt.Sprintf("%s.%s", pluralKind, group), + Group: group, Version: apiVersion, Resource: pluralKind, }