Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for weight based queue management in dynamiccontroller #198

Open
a-hilaly opened this issue Jan 11, 2025 · 6 comments · May be fixed by #307
Open

Add support for weight based queue management in dynamiccontroller #198

a-hilaly opened this issue Jan 11, 2025 · 6 comments · May be fixed by #307

Comments

@a-hilaly
Copy link
Member

Feature Description

Currently, the DynamicController uses a single queue for all GVRs, which doesn't allow for prioritization of resource processing based on user requirements. This can lead to resource starvation and unpredictable processing times for important resources.

One of the optimisations we can work on is a weight based queue system where resources are processed according to weights specified in ResourceGroup specs. Instead of maintaining a queue per GVR, we'll maintain queues per unique weight value.

We can add a spec.weight field to the ResourceGroup CRD to allow users to specify the processing priority of their resources.

Thoughts: the weight will be an integer value between 1 and 1000, with a default value of 100 if not specified. Resources with higher weights will receive proportionally more processing time and workers from the controller.

Sketching:

type WeightedQueue struct {
    weight    int
    queue     workqueue.RateLimitingInterface 
    gvrSet    map[schema.GroupVersionResource]struct{}
}

type DynamicController struct {
    weightedQueues map[int]*WeightedQueue
    gvrWeights     map[schema.GroupVersionResource]int
    mu             sync.RWMutex
}
@a-hilaly a-hilaly changed the title Add Support for weight based queue management in dynamiccontroller Add support for weight based queue management in dynamiccontroller Jan 11, 2025
@Gonfidel
Copy link
Contributor

Gonfidel commented Feb 10, 2025

@a-hilaly I'm interested in reviewing this. I want to clarify, the added spec.weight would only be passed to the RGD, essentially weighting all instances of a given CRD the same. This means we would not provide a weight value to individual instances. Is that correct?

@Gonfidel
Copy link
Contributor

I've opened a PR (#307) that's ready for review. Please let me know if you have any questions and if this meets the requirements

@a-hilaly
Copy link
Member Author

This means we would not provide a weight value to individual instances. Is that correct?

That's a good point, i never thought of that... should we? weights per namespace?

@Gonfidel
Copy link
Contributor

Gonfidel commented Feb 16, 2025

I've compiled a list of thoughts related to the comments I left on the MR and this thread

What should developer experience be like?

I'm not convinced we need weights per namespace. My initial thought is we should support a weight on the RGD that configures the default weight of the RGD-I's. Later, when creating an RGD-I, we should have the option to specify a weight for that specific instance instead of using the default RGD weight.

An example might be the examples/kubernetes/webapp/rg.yaml file. Perhaps I have a few basic applications deployed using this RGD but only one of them is a public-facing customer-driven application and considered critical. The flexibility of determining the priority for that instance specifically has a lot of merit, I think.

Proposed workflow example:

  1. Apply the RGD
apiVersion: kro.run/v1alpha1
kind: ResourceGraphDefinition
metadata:
  name: webapp.kro.run
spec:
  weight: 200  # When the CRD is generated, this value will be the default. This value is defaulted to 100
  schema:
    apiVersion: v1alpha1
    kind: WebApp
    spec:
      name: string
      namespace: string | default=default
      image: string | default=nginx
    status:
      ...
  resources:
  - id: deployment
    template:
      apiVersion: apps/v1
      kind: Deployment
      ...
  1. The RGD-I CRD is generated with a default weight set
apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
  name: webapps.kro.run
spec:
  conversion:
    strategy: None
  group: kro.run
  names:
    kind: WebApp
    ...
  scope: Namespaced
  versions:
  - name: v1alpha1
    schema:
      openAPIV3Schema:
        properties:
          apiVersion:
            type: string
          kind:
            type: string
          metadata:
            type: object
          spec:
            properties:
              weight:
                default: 200
                description: The weight used to determine priority when queuing reconciliation.
                maximum: 1000
                minimum: 1
                type: integer
  ...
  1. We create an instance of the RGD-I without specifying the weight and it uses the default for that RGD
apiVersion: kro.run/v1alpha1
kind: WebApp
metadata:
  name: test-app
spec:
  name: test-app
# weight will default to 200 per the generated CRD
  1. Now we decide to create another RGD-I that is more critical and should be prioritized.
apiVersion: kro.run/v1alpha1
kind: WebApp
metadata:
  name: production-app
spec:
  name: production-app
  weight: 999

Potential problems with the current PR and recommended solutions

Mutex locking on queue selection:
When calling queue.Get(), either an item is returned or the process will wait() until an item is present in the queue. We try to avoid this by eliminating queues that are empty from selection in the selectQueueByWeight() function call, which performs an RLock, potentially creating lock contention when processing events. If we choose not to perform an RLock, there is a chance two workers try to pull from the same queue at the same time. If the queue only has one item, the second worker will wait until an item is present in the selected queue that was determined at the beginning of the processNextWorkItem() function. Depending on how many items are using that weight, an entire worker could hang for a while.

Potential solutions

  1. Initially I thought we might be able to use TypedInterface queues from client-go but I think this would fall into the same problem. This also creates some problems for supporting instance level weights. I don't think it would be a great solution at first glance.
  2. We could review a new implementation using a weight-aware queue. This adds complexity to the item selection logic in the queue but only requires us to manage a single queue, removing the risk of selecting an empty queue. This solution still suffers from only one worker being able to pull an item at a time; albeit, with extremely short locks. I think the queue implementation will be more complex than handling empty queues' edge-cases. I think I need to spend more time thinking about how this could be implemented because it would fundamentally change how we track default weights per RGD.
  3. Adding a mutex to the weighted queues themselves. We still need to get the effectiveWeight of ALL non-empty queues by multiplying the weight and the length of each queue. This means while determining the selected queue, we would manage multiple locks instead of one. Not sure that's better. (Although this concept in addition to option 5 would be strong option due to the reason listed in that section)
  4. Leave the lock during the queue selection process. This process is relatively quick and in a small environment was in the magnitude of 50μs on my local machine during testing. It's difficult to say what the impact would look like for larger environments with a large number of workers.
  5. ⭐ Add a mutex to the weighted queue and move the lock to a function named getQueueItem() that takes the WeightedQueue as an argument. This call only locks a single queue long enough to fetch an item instead of locking the entire controllers queue map during the queue selection process (similar to how client_go#queue is doing it). With this approach we limit lock time to match queue.Get() and limit contention because different workers won't always be working on the same queue. I believe this will be the most performant option and is only necessary because queue.Get() waits when empty, requiring us to length-check the queue before calling the call. We could take it a step further if we still have concerns, managing a queue per gvr-weight combination which makes lock contention less likely but queue selection more expensive.
type WeightedQueue struct {
	weight int
	queue workqueue.TypedRateLimitingInterface[any]
	gvrSet map[schema.GroupVersionResource]struct{}
	mu sync.RWMutex // new mutex for queue.Get()
}

func (dc *DynamicController) getQueueItem(wq *WeightedQueue) (item any, shutdown bool, ok bool) {
	wq.mu.RLock()
	defer wq.mu.RUnlock()
	if wq.queue.Len() == 0 {
		return nil, false, false
	}

	obj, shutdown := wq.queue.Get()
	return obj, shutdown, true
}
func (dc *DynamicController) processNextWorkItem(ctx context.Context) bool {
	weightedQueue := dc.selectQueueByWeight()
	obj, shutdown, ok := dc.getQueueItem(weightedQueue)

	if !ok || shutdown { 
	    // We should quit processing the current event if the queue is shutting down or if
	    // we were unable to fetch an item from the queue due to selecting an empty queue
	    return true
	}
        ...
}

TLDR; I think I lean towards option 5 with the option to have RGD-I specific weights.

Thanks in advance for any feedback you might have 👋

@n3wscott
Copy link
Contributor

n3wscott commented Feb 18, 2025

The question of when to put something in spec vs when to put it in an annotation is interesting. Here is my thinking:

Something goes in the spec if you would like the user to influence the resulting work. Something goes in annotations if you want to influence how the work happens. A really good case for annotations is signalling which reconciliation strategy to use. Or which networking implementation to use. Or In the case of Knative serving, the upper and lower bounds of scaling to use.

Thinking about why Knative chose to use an annotation to control the scale is easy to rule if you think about why spec vs metadata with the above rubrick. The scale of the service has to do with how we reconcile the load of a service and react to it rather than something that is created directly from knowing the bounds of scale ahead of time. It also allows the annotation to be more easily editable if the spec is immutable.

So in the case of attempting to add priority queues, I think this is asking to control how we reconcile. Not what we reconcile so it does not belong in the spec. The ideas around where overrides happen is interesting and valid, for example if priority (or a mix of priority and weight) was an annotation and the controller implementation chooses to respect annotations found in this order: CRD -- Namespace -- Instance, where as another implementation could choose to not respect the priority or weights at all because it has some kind of learned queuing system. Slapping it in the spec does not open doors to different ideas on how to reconcile the population differently later.

fyi, here is Knative's implementation of a "two lane queue" for work that needs to happen asap and jumps the normal line.

--- update

Reading the PR proposed, I think the current weighted queue idea is not going to work long term and the plan needs to account for a more dynamic queuing strategy. Pretend for a moment that KRO takes off and there is a cluster with 10k instances of 10k kinds of CRDs created from RGDs. We are within the limit of etcd but just barely.

Rather than attempt to solve reconciling all the meta types from RGDs with heterogeneous work queues, I think we need to start with the assumption that all of the work queues will not fit on a single instance of the controller. If we further assume KRO follows the normal patterns of a work queue per Kind, we will see that there ideally would be 10k work queues supporting the population of instances.

The weighted queue does not solve starvation in the 10k*10k world because it does not address how to shard the key space across more than one controller. If instead KRO solves that issue out of the gate, how to:

  1. Assign a work queue to a specific controller instance.
  2. Shard an instance of a work queue across multiple controller instances.
  3. Consider assigning weights or priorities to those queues to reconcile some kinds of KRO-RGD-CRD Instances more than or ontop of others. Though how this is implemented could be an input in the method we use to shard the key spaces of the kinds rather than cooperative work queues.

@Gonfidel
Copy link
Contributor

Gonfidel commented Feb 22, 2025

@n3wscott thanks for your feedback! It looks like you have some great conversation points. I have a couple thoughts and questions that I'll try to organize without digressing to heavily.

I think this is asking to control how we reconcile. Not what we reconcile so it does not belong in the spec. The ideas around where overrides happen is interesting and valid, for example if priority (or a mix of priority and weight) was an annotation and the controller implementation chooses to respect annotations found in this order: CRD -- Namespace -- Instance

I like the idea of storing the weight/priority in an annotation. I particularly resonate with the spec immutability concern you raised. I originally leaned into CRD -> Instance because of the ease of propagating weight/priority through CRD default values. I'd like to better understand your vision for annotations. Assuming we used annotations, how do you imagine we handle propagation to the instance? Passing the annotations from the RGD -> CRD -> Instance could be clean. Checking namespace annotations on every enqueue could get expensive at scale. Caching them in the informer might help—do you see a better way?

Rather than attempt to solve reconciling all the meta types from RGDs with heterogeneous work queues, I think we need to start with the assumption that all of the work queues will not fit on a single instance of the controller. If we further assume KRO follows the normal patterns of a work queue per Kind, we will see that there ideally would be 10k work queues supporting the population of instances.

I think this is a good point. To support that scale, do you think we would need to change our informer strategy as well? Running all informers on the leader could bottleneck event distribution and running all informers on every controller could overwhelm the api server with 10k kinds. Sharding informers across controllers might balance the load better. What do you think? This could also keep informer->queue localization while avoiding duplicate work. If so, it might be a strong argument for Kind-Queues.

The downside of using Kind-Queues instead of Weight-Queues is propagating critical items from queue selection becomes more difficult with a large number of queues. I've provided an example below (please tell me if I'm misunderstanding your point here).

We have the following queues:

queue<=>Kind:WebApp<==>Priority:High
queue<=>Kind:InternalApp<==>Priority:Low

Create Instance of InternalApp with Priority:Critical

In this case, if we performed queue selection, we might not land on the InternalApp queue for a while. With 10k kinds, the InternalApp queue might not be selected until thousands of other queues are processed, delaying critical reconciliation despite its priority.

It might make sense to still use Weight-Queues but sharding on a weight/GVR hash (or something similar). Since each GVR is mapped to a weight already we get to keep deterministic queue selection while distributing shards across controllers. The downside is controllers would overlap on informers. 🤔 Any thoughts on mitigating this or other preferred sharding strategies?

Thanks again for your detailed feedback

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants