Skip to content

Commit

Permalink
make rate limiter configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
DanielKlt committed Feb 21, 2025
1 parent 83aade2 commit 50f6173
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 10 deletions.
24 changes: 22 additions & 2 deletions cmd/controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ func main() {
allowCRDDeletion bool
resourceGraphDefinitionConcurrentReconciles int
dynamicControllerConcurrentReconciles int
// dynamic controller rate limiter parameters
minRetryDelay time.Duration
maxRetryDelay time.Duration
rateLimit int
burstLimit int
// reconciler parameters
resyncPeriod int
queueMaxRetries int
Expand All @@ -91,7 +96,18 @@ func main() {
"dynamic-controller-concurrent-reconciles", 1,
"The number of dynamic controller reconciles to run in parallel",
)
// reconciler parametes

// rate limiter parameters
flag.DurationVar(&minRetryDelay, "dynamic-controller-rate-limiter-min-delay", 200*time.Millisecond,
"Minimum delay for the dynamic controller rate limiter, in milliseconds.")
flag.DurationVar(&maxRetryDelay, "dynamic-controller-rate-limiter-max-delay", 1000*time.Second,
"Maximum delay for the dynamic controller rate limiter, in seconds.")
flag.IntVar(&rateLimit, "dynamic-controller-rate-limiter-rate-limit", 10,
"Rate limit to control how frequently events are allowed to happen for the dynamic controller.")
flag.IntVar(&burstLimit, "dynamic-controller-rate-limiter-burst-limit", 100,
"Burst size of events for the dynamic controller rate limiter.")

// reconciler parameters
flag.IntVar(&resyncPeriod, "dynamic-controller-default-resync-period", 10,
"interval at which the controller will re list resources even with no changes, in hours")
flag.IntVar(&queueMaxRetries, "dynamic-controller-default-queue-max-retries", 20,
Expand Down Expand Up @@ -157,7 +173,11 @@ func main() {
ShutdownTimeout: time.Duration(shutdownTimeout) * time.Second,
ResyncPeriod: time.Duration(resyncPeriod) * time.Hour,
QueueMaxRetries: queueMaxRetries,
}, set.Dynamic())
}, set.Dynamic(),
minRetryDelay,
maxRetryDelay,
rateLimit,
burstLimit)

resourceGraphDefinitionGraphBuilder, err := graph.NewBuilder(
restConfig,
Expand Down
9 changes: 6 additions & 3 deletions pkg/dynamiccontroller/dynamic_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,16 +140,19 @@ func NewDynamicController(
log logr.Logger,
config Config,
kubeClient dynamic.Interface,
minRetryDelay time.Duration,
maxRetryDelay time.Duration,
rateLimit int,
burstLimit int,
) *DynamicController {
logger := log.WithName("dynamic-controller")

dc := &DynamicController{
config: config,
kubeClient: kubeClient,
// TODO(a-hilaly): Make the queue size configurable.
queue: workqueue.NewTypedRateLimitingQueueWithConfig(workqueue.NewTypedMaxOfRateLimiter(
workqueue.NewTypedItemExponentialFailureRateLimiter[ObjectIdentifiers](200*time.Millisecond, 1000*time.Second),
&workqueue.TypedBucketRateLimiter[ObjectIdentifiers]{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
workqueue.NewTypedItemExponentialFailureRateLimiter[ObjectIdentifiers](minRetryDelay, maxRetryDelay),
&workqueue.TypedBucketRateLimiter[ObjectIdentifiers]{Limiter: rate.NewLimiter(rate.Limit(rateLimit), burstLimit)},
), workqueue.TypedRateLimitingQueueConfig[ObjectIdentifiers]{Name: "dynamic-controller-queue"}),
log: logger,
// pass version and pod id from env
Expand Down
20 changes: 15 additions & 5 deletions pkg/dynamiccontroller/dynamic_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ package dynamiccontroller

import (
"context"
"io/ioutil"
"io"
"testing"
"time"

Expand All @@ -34,10 +34,17 @@ import (
// parts that need to be tested. I'll probably need to rewrite some parts of graphexec
// and dynamiccontroller to make this work.

const (
minRetryDelay = 200 * time.Millisecond
maxRetryDelay = 1000 * time.Second
rateLimit = 10
burstLimit = 100
)

func noopLogger() logr.Logger {
opts := zap.Options{
// Write to dev/null
DestWriter: ioutil.Discard,
DestWriter: io.Discard,
}
logger := zap.New(zap.UseFlagOptions(&opts))
return logger
Expand Down Expand Up @@ -65,7 +72,7 @@ func TestNewDynamicController(t *testing.T) {
ShutdownTimeout: 60 * time.Second,
}

dc := NewDynamicController(logger, config, client)
dc := NewDynamicController(logger, config, client, minRetryDelay, maxRetryDelay, rateLimit, burstLimit)

assert.NotNil(t, dc)
assert.Equal(t, config, dc.config)
Expand All @@ -76,13 +83,15 @@ func TestNewDynamicController(t *testing.T) {
func TestRegisterAndUnregisterGVK(t *testing.T) {
logger := noopLogger()
client := setupFakeClient()

config := Config{
Workers: 1,
ResyncPeriod: 1 * time.Second,
QueueMaxRetries: 5,
ShutdownTimeout: 5 * time.Second,
}
dc := NewDynamicController(logger, config, client)

dc := NewDynamicController(logger, config, client, minRetryDelay, maxRetryDelay, rateLimit, burstLimit)

gvr := schema.GroupVersionResource{Group: "test", Version: "v1", Resource: "tests"}

Expand Down Expand Up @@ -127,7 +136,8 @@ func TestRegisterAndUnregisterGVK(t *testing.T) {
func TestEnqueueObject(t *testing.T) {
logger := noopLogger()
client := setupFakeClient()
dc := NewDynamicController(logger, Config{}, client)

dc := NewDynamicController(logger, Config{}, client, minRetryDelay, maxRetryDelay, rateLimit, burstLimit)

obj := &unstructured.Unstructured{}
obj.SetName("test-object")
Expand Down
5 changes: 5 additions & 0 deletions test/integration/environment/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,12 @@ func (e *Environment) setupController() error {
ShutdownTimeout: 60 * time.Second,
},
e.ClientSet.Dynamic(),
200*time.Millisecond,
1000*time.Second,
10,
100,
)

go func() {
err := dc.Run(e.context)
if err != nil {
Expand Down

0 comments on commit 50f6173

Please sign in to comment.