From 10fa48363581b8c6cdf367d2947903f98722cd0c Mon Sep 17 00:00:00 2001 From: Mikhail Scherba Date: Mon, 17 Feb 2025 12:49:10 +0300 Subject: [PATCH] refactor queueset locks Signed-off-by: Mikhail Scherba --- pkg/shell-operator/manager_events_handler.go | 3 +-- pkg/task/queue/queue_set.go | 27 +++++++++++++++----- 2 files changed, 22 insertions(+), 8 deletions(-) diff --git a/pkg/shell-operator/manager_events_handler.go b/pkg/shell-operator/manager_events_handler.go index e0df9a51..5d790edf 100644 --- a/pkg/shell-operator/manager_events_handler.go +++ b/pkg/shell-operator/manager_events_handler.go @@ -88,8 +88,7 @@ func (m *ManagerEventsHandler) Start() { m.taskQueues.DoWithLock(func(tqs *queue.TaskQueueSet) { for _, resTask := range tailTasks { - q := tqs.GetByName(resTask.GetQueueName()) - if q == nil { + if q := tqs.Queues[resTask.GetQueueName()]; q == nil { log.Error("Possible bug!!! Got task for queue but queue is not created yet.", slog.String("queueName", resTask.GetQueueName()), slog.String("description", resTask.GetDescription())) diff --git a/pkg/task/queue/queue_set.go b/pkg/task/queue/queue_set.go index 00113eea..ff756f1a 100644 --- a/pkg/task/queue/queue_set.go +++ b/pkg/task/queue/queue_set.go @@ -20,14 +20,14 @@ type TaskQueueSet struct { ctx context.Context cancel context.CancelFunc - m sync.Mutex + m *sync.RWMutex Queues map[string]*TaskQueue } func NewTaskQueueSet() *TaskQueueSet { return &TaskQueueSet{ Queues: make(map[string]*TaskQueue), - m: sync.Mutex{}, + m: new(sync.RWMutex), MainName: MainQueueName, } } @@ -45,9 +45,12 @@ func (tqs *TaskQueueSet) WithMetricStorage(mstor *metricstorage.MetricStorage) { } func (tqs *TaskQueueSet) Stop() { + tqs.m.RLock() if tqs.cancel != nil { tqs.cancel() } + + tqs.m.RUnlock() } func (tqs *TaskQueueSet) StartMain() { @@ -55,13 +58,18 @@ func (tqs *TaskQueueSet) StartMain() { } func (tqs *TaskQueueSet) Start() { + tqs.m.RLock() for _, q := range tqs.Queues { q.Start() } + + tqs.m.RUnlock() } func (tqs *TaskQueueSet) Add(queue *TaskQueue) { + tqs.m.Lock() tqs.Queues[queue.Name] = queue + tqs.m.Unlock() } func (tqs *TaskQueueSet) NewNamedQueue(name string, handler func(task.Task) TaskResult) { @@ -76,6 +84,8 @@ func (tqs *TaskQueueSet) NewNamedQueue(name string, handler func(task.Task) Task } func (tqs *TaskQueueSet) GetByName(name string) *TaskQueue { + tqs.m.RLock() + defer tqs.m.RUnlock() ts, exists := tqs.Queues[name] if exists { return ts @@ -105,8 +115,6 @@ func (tqs *TaskQueueSet) Iterate(doFn func(queue *TaskQueue)) { if doFn == nil { return } - tqs.m.Lock() - defer tqs.m.Unlock() if len(tqs.Queues) == 0 { return @@ -118,21 +126,25 @@ func (tqs *TaskQueueSet) Iterate(doFn func(queue *TaskQueue)) { } // TODO sort names + tqs.m.RLock() for _, q := range tqs.Queues { if q.Name != tqs.MainName { doFn(q) } } + + tqs.m.RUnlock() } func (tqs *TaskQueueSet) Remove(name string) { + tqs.m.Lock() ts, exists := tqs.Queues[name] if exists { ts.Stop() } - tqs.m.Lock() - defer tqs.m.Unlock() + delete(tqs.Queues, name) + tqs.m.Unlock() } func (tqs *TaskQueueSet) WaitStopWithTimeout(timeout time.Duration) { @@ -145,15 +157,18 @@ func (tqs *TaskQueueSet) WaitStopWithTimeout(timeout time.Duration) { select { case <-checkTick.C: stopped := true + tqs.m.RLock() for _, q := range tqs.Queues { if q.Status != "stop" { stopped = false break } } + tqs.m.RUnlock() if stopped { return } + case <-timeoutTick.C: return }