Skip to content

Commit

Permalink
Merge pull request #316 from DanielKlt/main
Browse files Browse the repository at this point in the history
  • Loading branch information
a-hilaly authored Feb 18, 2025
2 parents d964cc0 + b8fb380 commit 494a593
Showing 1 changed file with 14 additions and 21 deletions.
35 changes: 14 additions & 21 deletions pkg/dynamiccontroller/dynamic_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ type DynamicController struct {
handlers sync.Map

// queue is the workqueue used to process items
queue workqueue.RateLimitingInterface
queue workqueue.TypedRateLimitingInterface[ObjectIdentifiers]

log logr.Logger
}
Expand All @@ -147,10 +147,10 @@ func NewDynamicController(
config: config,
kubeClient: kubeClient,
// TODO(a-hilaly): Make the queue size configurable.
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(200*time.Millisecond, 1000*time.Second),
&workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
), "dynamic-controller-queue"),
queue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.NewTypedMaxOfRateLimiter(
workqueue.NewTypedItemExponentialFailureRateLimiter[ObjectIdentifiers](200*time.Millisecond, 1000*time.Second),
&workqueue.TypedBucketRateLimiter[ObjectIdentifiers]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
), workqueue.TypedRateLimitingQueueConfig[ObjectIdentifiers]{Name: "dynamic-controller-queue"}),
log: logger,
// pass version and pod id from env
}
Expand Down Expand Up @@ -232,24 +232,17 @@ func (dc *DynamicController) worker(ctx context.Context) {

// processNextWorkItem processes a single item from the queue.
func (dc *DynamicController) processNextWorkItem(ctx context.Context) bool {
obj, shutdown := dc.queue.Get()
item, shutdown := dc.queue.Get()
if shutdown {
return false
}
defer dc.queue.Done(obj)
defer dc.queue.Done(item)

queueLength.Set(float64(dc.queue.Len()))

item, ok := obj.(ObjectIdentifiers)
if !ok {
dc.log.Error(fmt.Errorf("expected ObjectIdentifiers in queue but got %#v", obj), "Invalid item in queue")
dc.queue.Forget(obj)
return true
}

err := dc.syncFunc(ctx, item)
if err == nil || apierrors.IsNotFound(err) {
dc.queue.Forget(obj)
dc.queue.Forget(item)
return true
}

Expand All @@ -260,25 +253,25 @@ func (dc *DynamicController) processNextWorkItem(ctx context.Context) bool {
case *requeue.NoRequeue:
dc.log.Error(typedErr, "Error syncing item, not requeuing", "item", item)
requeueTotal.WithLabelValues(gvrKey, "no_requeue").Inc()
dc.queue.Forget(obj)
dc.queue.Forget(item)
case *requeue.RequeueNeeded:
dc.log.V(1).Info("Requeue needed", "item", item, "error", typedErr)
requeueTotal.WithLabelValues(gvrKey, "requeue").Inc()
dc.queue.Add(obj) // Add without rate limiting
dc.queue.Add(item) // Add without rate limiting
case *requeue.RequeueNeededAfter:
dc.log.V(1).Info("Requeue needed after delay", "item", item, "error", typedErr, "delay", typedErr.Duration())
requeueTotal.WithLabelValues(gvrKey, "requeue_after").Inc()
dc.queue.AddAfter(obj, typedErr.Duration())
dc.queue.AddAfter(item, typedErr.Duration())
default:
// Arriving here means we have an unexpected error, we should requeue the item
// with rate limiting.
requeueTotal.WithLabelValues(gvrKey, "rate_limited").Inc()
if dc.queue.NumRequeues(obj) < dc.config.QueueMaxRetries {
if dc.queue.NumRequeues(item) < dc.config.QueueMaxRetries {
dc.log.Error(err, "Error syncing item, requeuing with rate limit", "item", item)
dc.queue.AddRateLimited(obj)
dc.queue.AddRateLimited(item)
} else {
dc.log.Error(err, "Dropping item from queue after max retries", "item", item)
dc.queue.Forget(obj)
dc.queue.Forget(item)
}
}

Expand Down

0 comments on commit 494a593

Please sign in to comment.