From bf1ff45ebb912b42f26a615caf41dd6083aa557b Mon Sep 17 00:00:00 2001 From: Daniel Hu Date: Thu, 17 Aug 2023 15:49:32 +0800 Subject: [PATCH] Add configurable task queue size to GoPool - Updated README.md and README_zh.md to reflect the new feature of configurable task queue size. - Added a new field 'taskQueueSize' in the 'goPool' struct to store the size of the task queue. - Modified the 'NewGoPool' function to initialize the task queue with the specified size. - Added a new function 'GetTaskQueueSize' to return the size of the task queue. - Updated the test file 'gopool_test.go' to include tests for the new feature. - Added a new option 'WithTaskQueueSize' in 'option.go' to allow users to set the size of the task queue when creating the pool. Signed-off-by: Daniel Hu --- README.md | 31 ++++++++++++++++++++++++++++++- README_zh.md | 31 ++++++++++++++++++++++++++++++- gopool.go | 15 ++++++++++++++- gopool_test.go | 10 ++++++++++ option.go | 7 +++++++ 5 files changed, 91 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index f223c0e..bdea344 100644 --- a/README.md +++ b/README.md @@ -81,7 +81,7 @@ ok github.com/devchat-ai/gopool 3.946s -- [x] **Task Queue**: GoPool uses a thread-safe task queue to store tasks waiting to be processed. Multiple workers can simultaneously fetch tasks from this queue. +- [x] **Task Queue**: GoPool uses a thread-safe task queue to store tasks waiting to be processed. Multiple workers can simultaneously fetch tasks from this queue. The size of the task queue can be configured. - [x] **Concurrency Control**: GoPool can control the number of concurrent tasks to prevent system overload. @@ -163,6 +163,35 @@ func main() { } ``` +## Configurable Task Queue Size + +GoPool uses a thread-safe task queue to store tasks waiting to be processed. Multiple workers can simultaneously fetch tasks from this queue. The size of the task queue can be configured when creating the pool using the `WithTaskQueueSize` option. + +Here is an example of how to use GoPool with a configurable task queue size: + +```go +package main + +import ( + "time" + + "github.com/devchat-ai/gopool" +) + +func main() { + pool := gopool.NewGoPool(100, gopool.WithTaskQueueSize(5000)) + defer pool.Release() + + for i := 0; i < 1000; i++ { + pool.AddTask(func() (interface{}, error){ + time.Sleep(10 * time.Millisecond) + return nil, nil + }) + } + pool.Wait() +} +``` + ## Dynamic Worker Adjustment GoPool supports dynamic worker adjustment. This means that the number of workers in the pool can increase or decrease based on the number of tasks in the queue. This feature can be enabled by setting the MinWorkers option when creating the pool. diff --git a/README_zh.md b/README_zh.md index e30f87f..84825c0 100644 --- a/README_zh.md +++ b/README_zh.md @@ -81,7 +81,7 @@ ok github.com/devchat-ai/gopool 3.946s -- [x] **任务队列**:GoPool 使用一个线程安全的任务队列来存储等待处理的任务。多个工作器可以同时从这个队列中获取任务。 +- [x] **任务队列**:GoPool 使用一个线程安全的任务队列来存储等待处理的任务。多个工作器可以同时从这个队列中获取任务。任务队列的大小可配置。 - [x] **并发控制**:GoPool 可以控制并发任务的数量,防止系统过载。 @@ -163,6 +163,35 @@ func main() { } ``` +## 配置任务队列大小 + +GoPool 使用一个线程安全的任务队列来存储等待处理的任务。多个工作器可以同时从这个队列中获取任务。任务队列的大小可配置。可以通过在创建池时设置 `WithQueueSize` 选项来配置任务队列的大小。 + +这是一个如何配置 GoPool 任务队列大小的示例: + +```go +package main + +import ( + "time" + + "github.com/devchat-ai/gopool" +) + +func main() { + pool := gopool.NewGoPool(100, gopool.WithTaskQueueSize(5000)) + defer pool.Release() + + for i := 0; i < 1000; i++ { + pool.AddTask(func() (interface{}, error){ + time.Sleep(10 * time.Millisecond) + return nil, nil + }) + } + pool.Wait() +} +``` + ## 动态工作器调整 GoPool 支持动态工作器调整。这意味着池中的工作器数量可以根据队列中的任务数量增加或减少。可以通过在创建池时设置 MinWorkers 选项来启用此功能。 diff --git a/gopool.go b/gopool.go index 4681dea..e96ef8f 100644 --- a/gopool.go +++ b/gopool.go @@ -18,6 +18,8 @@ type GoPool interface { Running() int // GetWorkerCount returns the number of workers. GetWorkerCount() int + // GetTaskQueueSize returns the size of the task queue. + GetTaskQueueSize() int } // task represents a function that will be executed by a worker. @@ -33,6 +35,8 @@ type goPool struct { minWorkers int // tasks are added to this channel first, then dispatched to workers. Default buffer size is 1 million. taskQueue chan task + // Set by WithTaskQueueSize(), used to set the size of the task queue. Default is 1e6. + taskQueueSize int // Set by WithRetryCount(), used to retry a task when it fails. Default is 0. retryCount int lock sync.Locker @@ -60,7 +64,8 @@ func NewGoPool(maxWorkers int, opts ...Option) GoPool { // workers and workerStack should be initialized after WithMinWorkers() is called workers: nil, workerStack: nil, - taskQueue: make(chan task, 1e6), + taskQueue: nil, + taskQueueSize: 1e6, retryCount: 0, lock: new(sync.Mutex), timeout: 0, @@ -73,6 +78,7 @@ func NewGoPool(maxWorkers int, opts ...Option) GoPool { opt(pool) } + pool.taskQueue = make(chan task, pool.taskQueueSize) pool.workers = make([]*worker, pool.minWorkers) pool.workerStack = make([]int, pool.minWorkers) @@ -213,3 +219,10 @@ func (p *goPool) GetWorkerCount() int { defer p.lock.Unlock() return len(p.workers) } + +// GetTaskQueueSize returns the size of the task queue. +func (p *goPool) GetTaskQueueSize() int { + p.lock.Lock() + defer p.lock.Unlock() + return p.taskQueueSize +} diff --git a/gopool_test.go b/gopool_test.go index 6b9fc58..9d9519d 100644 --- a/gopool_test.go +++ b/gopool_test.go @@ -129,4 +129,14 @@ var _ = Describe("Gopool", func() { Expect(pool.GetWorkerCount()).To(Equal(minWorkers)) }) }) + + Describe("With TaskQueueSize", func() { + It("should work correctly", func() { + size := 5000 + pool := gopool.NewGoPool(100, gopool.WithTaskQueueSize(size)) + defer pool.Release() + + Expect(pool.GetTaskQueueSize()).To(Equal(size)) + }) + }) }) diff --git a/option.go b/option.go index b86b64d..54c9631 100644 --- a/option.go +++ b/option.go @@ -50,3 +50,10 @@ func WithRetryCount(retryCount int) Option { p.retryCount = retryCount } } + +// WithTaskQueueSize sets the size of the task queue for the pool. +func WithTaskQueueSize(size int) Option { + return func(p *goPool) { + p.taskQueueSize = size + } +}