Skip to content

Commit

Permalink
gonfidel/implement-weighted-queues
Browse files Browse the repository at this point in the history
  • Loading branch information
Gonfidel committed Feb 14, 2025
1 parent bae0389 commit 6ddba44
Show file tree
Hide file tree
Showing 9 changed files with 188 additions and 33 deletions.
7 changes: 7 additions & 0 deletions api/v1alpha1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,13 @@ type ResourceGraphDefinitionSpec struct {
//
// +kubebuilder:validation:Optional
DefaultServiceAccounts map[string]string `json:"defaultServiceAccounts,omitempty"`
// The weight used to determine priority when queuing reconciliation.
//
// +kubebuilder:default=100
// +kubebuilder:validation:Maximum=1000
// +kubebuilder:validation:Minimum=1
// +kubebuilder:validation:Optional
Weight int `json:"weight,omitempty"`
}

// Schema represents the attributes that define an instance of
Expand Down
6 changes: 6 additions & 0 deletions config/crd/bases/kro.run_resourcegraphdefinitions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ spec:
- apiVersion
- kind
type: object
weight:
default: 100
description: The weight used to determine priority when queuing reconciliation.
maximum: 1000
minimum: 1
type: integer
required:
- schema
type: object
Expand Down
5 changes: 3 additions & 2 deletions examples/kubernetes/webapp/rg.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ kind: ResourceGraphDefinition
metadata:
name: webapp.kro.run
spec:
weight: 150
schema:
apiVersion: v1alpha1
kind: WebApp
Expand Down Expand Up @@ -68,7 +69,7 @@ spec:

- id: service
includeWhen:
- ${schema.spec.service.enabled}
- ${schema.spec.service.enabled}
template:
apiVersion: v1
kind: Service
Expand All @@ -86,7 +87,7 @@ spec:

- id: ingress
includeWhen:
- ${schema.spec.ingress.enabled}
- ${schema.spec.ingress.enabled}
template:
apiVersion: networking.k8s.io/v1
kind: Ingress
Expand Down
6 changes: 6 additions & 0 deletions helm/crds/kro.run_resourcegraphdefinitions.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ spec:
- apiVersion
- kind
type: object
weight:
default: 100
description: The weight used to determine priority when queuing reconciliation.
maximum: 1000
minimum: 1
type: integer
required:
- schema
type: object
Expand Down
3 changes: 3 additions & 0 deletions pkg/controller/instance/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ type ReconcileConfig struct {
// TODO(a-hilaly): need to define think the different deletion policies we need to
// support.
DeletionPolicy string

// Instance controller events are queued in a weighted queue.
Weight int
}

// Controller manages the reconciliation of a single instance of a ResourceGraphDefinition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func (r *ResourceGraphDefinitionReconciler) reconcileResourceGraphDefinition(ctx
controller := r.setupMicroController(gvr, processedRGD, rgd.Spec.DefaultServiceAccounts, graphExecLabeler)

log.V(1).Info("reconciling resource graph definition micro controller")
if err := r.reconcileResourceGraphDefinitionMicroController(ctx, &gvr, controller.Reconcile); err != nil {
if err := r.reconcileResourceGraphDefinitionMicroController(ctx, &gvr, controller.Reconcile, rgd.Spec.Weight); err != nil {
return processedRGD.TopologicalOrder, resourcesInfo, err
}

Expand Down Expand Up @@ -141,8 +141,8 @@ func (r *ResourceGraphDefinitionReconciler) reconcileResourceGraphDefinitionCRD(
}

// reconcileResourceGraphDefinitionMicroController starts the microcontroller for handling the resources
func (r *ResourceGraphDefinitionReconciler) reconcileResourceGraphDefinitionMicroController(ctx context.Context, gvr *schema.GroupVersionResource, handler dynamiccontroller.Handler) error {
err := r.dynamicController.StartServingGVK(ctx, *gvr, handler)
func (r *ResourceGraphDefinitionReconciler) reconcileResourceGraphDefinitionMicroController(ctx context.Context, gvr *schema.GroupVersionResource, handler dynamiccontroller.Handler, weight int) error {
err := r.dynamicController.StartServingGVK(ctx, *gvr, handler, weight)
if err != nil {
return newMicroControllerError(err)
}
Expand Down
174 changes: 152 additions & 22 deletions pkg/dynamiccontroller/dynamic_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,15 @@ type Config struct {
ShutdownTimeout time.Duration
}

// DynamicController (DC) manages multiple GVRs and needs a way to prioritize the importance
// of each GVR. Weighted queues provide a flexible way for the DynamicController (DC) to prioritize
// GVRs based on their weights.
type WeightedQueue struct {
weight int
queue workqueue.RateLimitingInterface

Check failure on line 106 in pkg/dynamiccontroller/dynamic_controller.go

View workflow job for this annotation

GitHub Actions / lint

SA1019: workqueue.RateLimitingInterface is deprecated: Use TypedRateLimitingInterface instead. (staticcheck)
gvrSet map[schema.GroupVersionResource]struct{}
}

// DynamicController (DC) is a single controller capable of managing multiple different
// kubernetes resources (GVRs) in parallel. It can safely start watching new
// resources and stop watching others at runtime - hence the term "dynamic". This
Expand All @@ -122,10 +131,14 @@ type DynamicController struct {
// handler is responsible for managing a specific GVR.
handlers sync.Map

// queue is the workqueue used to process items
queue workqueue.RateLimitingInterface
// queues are the workqueue used to process items
weightedQueues map[int]*WeightedQueue

// weight of gvr when queued, between 1 and 1000
gvrWeights map[schema.GroupVersionResource]int

log logr.Logger
mu sync.RWMutex
}

type Handler func(ctx context.Context, req ctrl.Request) error
Expand All @@ -146,18 +159,77 @@ func NewDynamicController(
dc := &DynamicController{
config: config,
kubeClient: kubeClient,
// TODO(a-hilaly): Make the queue size configurable.
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 1000*time.Second),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
), "dynamic-controller-queue"),

weightedQueues: make(map[int]*WeightedQueue),
gvrWeights: make(map[schema.GroupVersionResource]int),

Check failure on line 164 in pkg/dynamiccontroller/dynamic_controller.go

View workflow job for this annotation

GitHub Actions / lint

File is not properly formatted (gofmt)
log: logger,
// pass version and pod id from env
}

dc.ensureWeightedQueue(100, schema.GroupVersionResource{})

return dc
}

func (dc *DynamicController) getWeightByGVR(gvr schema.GroupVersionResource) int {
dc.mu.RLock()
defer dc.mu.RUnlock()
return int(dc.gvrWeights[gvr])

Check failure on line 177 in pkg/dynamiccontroller/dynamic_controller.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary conversion (unconvert)
}

func (dc *DynamicController) setWeightByGVR(gvr schema.GroupVersionResource, weight int) {
dc.mu.Lock()
defer dc.mu.Unlock()
dc.gvrWeights[gvr] = weight
}

func (dc *DynamicController) deleteWeightByGVR(gvr schema.GroupVersionResource) {
dc.mu.Lock()
defer dc.mu.Unlock()
delete(dc.gvrWeights, gvr)
}

func (dc *DynamicController) deleteGVRFromQueue(weight int, gvr schema.GroupVersionResource) {
dc.mu.Lock()
defer dc.mu.Unlock()
delete(dc.weightedQueues[weight].gvrSet, gvr)
if len(dc.weightedQueues[weight].gvrSet) < 1 {
dc.weightedQueues[weight].queue.ShutDown()
delete(dc.weightedQueues, weight)
}
}

// Ensure weighted queue exists with the specified weight
func (dc *DynamicController) ensureWeightedQueue(weight int, gvr schema.GroupVersionResource) bool {
if weight < 1 || weight > 1000 {
dc.log.Error(nil, "weight should be between 1 and 1000", "weight", weight)
return false
}

if _, ok := dc.weightedQueues[weight]; !ok {
dc.mu.Lock()
defer dc.mu.Unlock()
// TODO(a-hilaly): Make the queue size configurable.
dc.weightedQueues[weight] = &WeightedQueue{
weight: weight,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter(

Check failure on line 215 in pkg/dynamiccontroller/dynamic_controller.go

View workflow job for this annotation

GitHub Actions / lint

SA1019: workqueue.NewMaxOfRateLimiter is deprecated: NewMaxOfRateLimiter is deprecated, use NewTypedMaxOfRateLimiter instead. (staticcheck)
workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 1000*time.Second),

Check failure on line 216 in pkg/dynamiccontroller/dynamic_controller.go

View workflow job for this annotation

GitHub Actions / lint

SA1019: workqueue.NewItemExponentialFailureRateLimiter is deprecated: NewItemExponentialFailureRateLimiter is deprecated, use NewTypedItemExponentialFailureRateLimiter instead. (staticcheck)
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},

Check failure on line 217 in pkg/dynamiccontroller/dynamic_controller.go

View workflow job for this annotation

GitHub Actions / lint

SA1019: workqueue.BucketRateLimiter is deprecated: BucketRateLimiter is deprecated, use TypedBucketRateLimiter instead. (staticcheck)
), fmt.Sprintf("weight-%d-queue", weight)),
gvrSet: map[schema.GroupVersionResource]struct{}{},
}
dc.weightedQueues[weight].gvrSet[gvr] = struct{}{}
} else {
dc.mu.Lock()
defer dc.mu.Unlock()
if _, ok := dc.weightedQueues[weight].gvrSet[gvr]; !ok {
dc.weightedQueues[weight].gvrSet[gvr] = struct{}{}
}
}

return true
}

// AllInformerHaveSynced checks if all registered informers have synced, returns
// true if they have.
func (dc *DynamicController) AllInformerHaveSynced() bool {
Expand Down Expand Up @@ -200,10 +272,18 @@ func (dc *DynamicController) WaitForInformersSync(stopCh <-chan struct{}) bool {
return cache.WaitForCacheSync(stopCh, dc.AllInformerHaveSynced)
}

func (dc *DynamicController) shutdownQueues() {
dc.log.Info("Shutting down dynamic controller queues")
for key, _ := range dc.weightedQueues {
dc.log.Info("Shutting down weighted queue: %d", key)
dc.weightedQueues[key].queue.ShutDown()
}
}

// Run starts the DynamicController.
func (dc *DynamicController) Run(ctx context.Context) error {
defer utilruntime.HandleCrash()
defer dc.queue.ShutDown()
defer dc.shutdownQueues()

dc.log.Info("Starting dynamic controller")
defer dc.log.Info("Shutting down dynamic controller")
Expand All @@ -230,26 +310,60 @@ func (dc *DynamicController) worker(ctx context.Context) {
}
}

// Select queue propotionally based on weight
// TODO(n-george): review this logic
func (dc *DynamicController) selectQueueByWeight() *WeightedQueue {
var totalWeight int
var minQueue *WeightedQueue

for _, wq := range dc.weightedQueues {
totalWeight += wq.weight
if minQueue == nil || wq.weight < minQueue.weight {
minQueue = wq
}
}

if totalWeight == 0 {
return nil
}

var selectedQueue *WeightedQueue
minRatio := float64(^uint(0))

for _, wq := range dc.weightedQueues {
queueRatio := float64(wq.queue.Len()) / float64(wq.weight)
if queueRatio < minRatio {
minRatio = queueRatio
selectedQueue = wq
}
}

return selectedQueue
}

// processNextWorkItem processes a single item from the queue.
func (dc *DynamicController) processNextWorkItem(ctx context.Context) bool {
obj, shutdown := dc.queue.Get()
weightedQueue := dc.selectQueueByWeight()
dc.log.V(1).Info("Processing item with specified weight", "weight", weightedQueue.weight)

obj, shutdown := weightedQueue.queue.Get()
if shutdown {
return false
}
defer dc.queue.Done(obj)
defer weightedQueue.queue.Done(obj)

queueLength.Set(float64(dc.queue.Len()))
queueLength.Set(float64(weightedQueue.queue.Len()))

item, ok := obj.(ObjectIdentifiers)
if !ok {
dc.log.Error(fmt.Errorf("expected ObjectIdentifiers in queue but got %#v", obj), "Invalid item in queue")
dc.queue.Forget(obj)
weightedQueue.queue.Forget(obj)
return true
}

err := dc.syncFunc(ctx, item)
if err == nil || apierrors.IsNotFound(err) {
dc.queue.Forget(obj)
weightedQueue.queue.Forget(obj)
return true
}

Expand All @@ -260,25 +374,25 @@ func (dc *DynamicController) processNextWorkItem(ctx context.Context) bool {
case *requeue.NoRequeue:
dc.log.Error(typedErr, "Error syncing item, not requeuing", "item", item)
requeueTotal.WithLabelValues(gvrKey, "no_requeue").Inc()
dc.queue.Forget(obj)
weightedQueue.queue.Forget(obj)
case *requeue.RequeueNeeded:
dc.log.V(1).Info("Requeue needed", "item", item, "error", typedErr)
requeueTotal.WithLabelValues(gvrKey, "requeue").Inc()
dc.queue.Add(obj) // Add without rate limiting
weightedQueue.queue.Add(obj) // Add without rate limiting
case *requeue.RequeueNeededAfter:
dc.log.V(1).Info("Requeue needed after delay", "item", item, "error", typedErr, "delay", typedErr.Duration())
requeueTotal.WithLabelValues(gvrKey, "requeue_after").Inc()
dc.queue.AddAfter(obj, typedErr.Duration())
weightedQueue.queue.AddAfter(obj, typedErr.Duration())
default:
// Arriving here means we have an unexpected error, we should requeue the item
// with rate limiting.
requeueTotal.WithLabelValues(gvrKey, "rate_limited").Inc()
if dc.queue.NumRequeues(obj) < dc.config.QueueMaxRetries {
if weightedQueue.queue.NumRequeues(obj) < dc.config.QueueMaxRetries {
dc.log.Error(err, "Error syncing item, requeuing with rate limit", "item", item)
dc.queue.AddRateLimited(obj)
weightedQueue.queue.AddRateLimited(obj)
} else {
dc.log.Error(err, "Dropping item from queue after max retries", "item", item)
dc.queue.Forget(obj)
weightedQueue.queue.Forget(obj)
}
}

Expand Down Expand Up @@ -413,18 +527,28 @@ func (dc *DynamicController) enqueueObject(obj interface{}, eventType string) {
GVR: gvr,
}

weight := dc.getWeightByGVR(gvr)

dc.log.V(1).Info("Enqueueing object",
"objectIdentifiers", objectIdentifiers,
"eventType", eventType)
"eventType", eventType,
"weight", weight)

informerEventsTotal.WithLabelValues(gvr.String(), eventType).Inc()
dc.queue.Add(objectIdentifiers)

dc.weightedQueues[weight].queue.Add(objectIdentifiers)
}

// StartServingGVK registers a new GVK to the informers map safely.
func (dc *DynamicController) StartServingGVK(ctx context.Context, gvr schema.GroupVersionResource, handler Handler) error {
func (dc *DynamicController) StartServingGVK(ctx context.Context, gvr schema.GroupVersionResource, handler Handler, queueWeight int) error {
dc.log.V(1).Info("Registering new GVK", "gvr", gvr)

// Set the weight for the GVR and ensure the queue exists for that weight class
dc.setWeightByGVR(gvr, queueWeight)
if ok := dc.ensureWeightedQueue(queueWeight, gvr); !ok {
return fmt.Errorf("Failed to create or get weighted queue with weight: %d", queueWeight)
}

_, exists := dc.informers.Load(gvr)
if exists {
// Even thought the informer is already registered, we should still
Expand Down Expand Up @@ -494,6 +618,11 @@ func (dc *DynamicController) StartServingGVK(ctx context.Context, gvr schema.Gro
func (dc *DynamicController) StopServiceGVK(ctx context.Context, gvr schema.GroupVersionResource) error {
dc.log.Info("Unregistering GVK", "gvr", gvr)

// Remove gvr from weight map
weight := dc.getWeightByGVR(gvr)
dc.deleteWeightByGVR(gvr)
dc.deleteGVRFromQueue(weight, gvr)

// Retrieve the informer
informerObj, ok := dc.informers.Load(gvr)
if !ok {
Expand All @@ -520,6 +649,7 @@ func (dc *DynamicController) StopServiceGVK(ctx context.Context, gvr schema.Grou
// Unregister the handler if any
dc.handlers.Delete(gvr)


gvrCount.Dec()
// Clean up any pending items in the queue for this GVR
// NOTE(a-hilaly): This is a bit heavy.. maybe we can find a better way to do this.
Expand Down
Loading

0 comments on commit 6ddba44

Please sign in to comment.