diff --git a/main.go b/main.go index 868cea5..dace603 100644 --- a/main.go +++ b/main.go @@ -3,9 +3,14 @@ package main import ( + "context" "fmt" "log" + "net/http" "os" + "os/signal" + "syscall" + "time" "github.com/asynkron/protoactor-go/actor" @@ -25,20 +30,44 @@ func main() { props := actor.PropsFromProducer(func() actor.Actor { return &check.KorcenActor{} }) - korcenPID := system.Root.Spawn(props) config := middleware.MiddlewareConfig{ Capacity: 100, // 최대 토큰 수 - RefillRate: 100.0 / 60.0, // 초당 리필 속도 (1분당 100개) + RefillRate: 100.0 / 60.0, // 초당 리필 속도 (1분당 100회) } r := router.SetupRouter(system, korcenPID, config) r.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler)) - if err := r.Run(":7777"); err != nil { - log.Println("Failed to start server:", err) - os.Exit(1) + srv := &http.Server{ + Addr: ":7777", + Handler: r, + } + + go func() { + if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed { + log.Fatalf("Failed to start server: %v\n", err) + } + }() + log.Println("Server is running on port 7777.") + + quit := make(chan os.Signal, 1) + signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM) + <-quit + log.Println("Shutdown signal received...") + + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := srv.Shutdown(ctx); err != nil { + log.Fatalf("Server forced to shutdown: %v", err) } + + log.Println("Shutting down worker pool...") + check.ShutdownWorkerPool() + log.Println("Worker pool has been shut down successfully.") + + log.Println("Server exiting.") } diff --git a/pkg/check/korcen.go b/pkg/check/korcen.go index 497d85e..eb1e253 100644 --- a/pkg/check/korcen.go +++ b/pkg/check/korcen.go @@ -40,8 +40,14 @@ func freeKorcenResult(result *KorcenResult) { korcenPool.Put(result) } +var workerPool = NewWorkerPool(10) + +func ShutdownWorkerPool() { + workerPool.Shutdown() +} + var ( - globalLRU = NewShardedLRUCache(16, 64) + globalLRU = NewShardedLRUCache(16, 64, workerPool) ) func Korcen(header *Header) (*Respond, error) { @@ -119,6 +125,11 @@ type KorcenResponse struct { Err error } +type KorcenResponseMessage struct { + Respond *Respond + Err error +} + type KorcenActor struct{} func (k *KorcenActor) Receive(context actor.Context) { @@ -133,18 +144,18 @@ func (k *KorcenActor) Receive(context actor.Context) { return } - resp, err := Korcen(msg.Header) - if err != nil { - context.Respond(&KorcenResponse{ - Respond: nil, + workerPool.Submit(func() { + resp, err := Korcen(msg.Header) + context.Send(context.Self(), &KorcenResponseMessage{ + Respond: resp, Err: err, }) - return - } + }) + case *KorcenResponseMessage: context.Respond(&KorcenResponse{ - Respond: resp, - Err: nil, + Respond: msg.Respond, + Err: msg.Err, }) default: diff --git a/pkg/check/sharded_lru.go b/pkg/check/sharded_lru.go index 113f41e..601f17c 100644 --- a/pkg/check/sharded_lru.go +++ b/pkg/check/sharded_lru.go @@ -70,47 +70,6 @@ func putEntry(e *entry) { entryPool.Put(e) } -type GoroutineManager struct { - numWorkers int - jobChan chan func() - stopChan chan struct{} - wg sync.WaitGroup -} - -func NewGoroutineManager(workerCount int) *GoroutineManager { - manager := &GoroutineManager{ - numWorkers: workerCount, - jobChan: make(chan func(), 1000), - stopChan: make(chan struct{}), - } - - manager.wg.Add(workerCount) - for i := 0; i < workerCount; i++ { - go func() { - defer manager.wg.Done() - for { - select { - case job := <-manager.jobChan: - job() - case <-manager.stopChan: - return - } - } - }() - } - - return manager -} - -func (gm *GoroutineManager) Submit(job func()) { - gm.jobChan <- job -} - -func (gm *GoroutineManager) Shutdown() { - close(gm.stopChan) - gm.wg.Wait() -} - type UsageStats struct { Hits uint64 Misses uint64 @@ -236,11 +195,11 @@ func (c *LRUCache) removeOldest() { type ShardedLRUCache struct { shards []*TrackableLRUCache shardCount int - manager *GoroutineManager + workerPool *WorkerPool allocator *SlabAllocator } -func NewShardedLRUCache(shardCount int, perShardCapacity int) *ShardedLRUCache { +func NewShardedLRUCache(shardCount int, perShardCapacity int, workerPool *WorkerPool) *ShardedLRUCache { if shardCount <= 0 || (shardCount&(shardCount-1)) != 0 { panic("shardCount must be a power of two") } @@ -255,12 +214,10 @@ func NewShardedLRUCache(shardCount int, perShardCapacity int) *ShardedLRUCache { shards[i] = trackedLRU } - manager := NewGoroutineManager(shardCount) - return &ShardedLRUCache{ shards: shards, shardCount: shardCount, - manager: manager, + workerPool: workerPool, allocator: allocator, } } @@ -279,9 +236,9 @@ func (c *ShardedLRUCache) Set(key string, value *KorcenResult) error { shard := c.getShard(key) done := make(chan error, 1) - c.manager.Submit(func() { - err := shard.Set(key, value) - done <- err + c.workerPool.Submit(func() { + shard.Set(key, value) + done <- nil }) err := <-done @@ -309,5 +266,5 @@ func (c *ShardedLRUCache) GetStats() UsageStats { } func (c *ShardedLRUCache) Stop() { - c.manager.Shutdown() + c.workerPool.Shutdown() }