Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement weighted queues #307

Open
wants to merge 11 commits into
base: main
Choose a base branch
from

Conversation

Gonfidel
Copy link
Contributor

@Gonfidel Gonfidel commented Feb 15, 2025

Fixes #198

// TODO(a-hilaly): Make the queue size configurable.
dc.weightedQueues[weight] = &WeightedQueue{
weight: weight,
queue: workqueue.NewNamedRateLimitingQueue(
Copy link
Contributor Author

@Gonfidel Gonfidel Feb 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SAME COMMENT AS ABOVE: The old queue rate limiting types were failing linter tests due to being deprecated. These new limiters support type specific limiting. I believe we want these configured as any but feel free to provide feedback

// queue items will be prioritized based on GVR weight, between 1 and 1000
weight int
// queues are the workqueue used to process items
queue workqueue.TypedRateLimitingInterface[any]
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SAME COMMENT AS BELOW: The old queue rate limiting types were failing linter tests due to being deprecated. These new limiters support type specific limiting. I believe we want these configured as any but feel free to provide feedback

// processNextWorkItem processes a single item from the queue.
func (dc *DynamicController) processNextWorkItem(ctx context.Context) bool {
obj, shutdown := dc.queue.Get()
weightedQueue := dc.selectQueueByWeight()
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function performs an RLock and might be contentious due it's call frequency. Any thoughts or feedback on how best to improve this?

if shutdown {
return false
return true
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this returned false previously to exist the processing loop when our queue was shutting down because we only had one. Now that we have multiple, I think returning true is what we want, allowing us to short-circuit the processing event but not break the processing loop. Does that make the most sense here?

Copy link
Contributor

@n3wscott n3wscott left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is the direction the weighted queue should go. We should continue to have a discussion in the linked issue.

defer dc.mu.Unlock()

dc.gvrWeights[gvr] = weight
dc.weightedQueues[weight].gvrSet[gvr] = struct{}{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand why the gvr set is being set to an anonymous struct. If the value does not matter, why not a bool?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a WIP and it was set this way before adding the safety check for queue deletion. If a queue isn't managing any gvrs, we should delete it. The controller fails without at least one queue. This was a work around that can be removed now that we protect again deleting the default queue


// Ensure weighted queue exists with the specified weight
func (dc *DynamicController) ensureWeightedQueue(weight int) bool {
if weight < 1 || weight > 1000 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this kind of protection should happen at RGD creation and should be bounds set inside of the DC or the env, not magic values inside this method. What happens if we want to add tightening the bounds of the range? Or expanding the range? Does it matter to the implementation inside of DC that the weight is a 1-1000 range at all? It seems it would just work if it treated it as a number genetically.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably don't need it. It's already defaulted and set on the CRD when we reconcile the RGD. It was just being extra cautious. We can definitely remove it

@Gonfidel
Copy link
Contributor Author

@n3wscott I'm totally cool with taking it a different direction. This was a first pass and gave us some insight into potential hiccups. Happy to continue discussing until we can come up with a pattern that works for us 🙏

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add support for weight based queue management in dynamiccontroller
2 participants