Skip to content

Commit

Permalink
Use Worker Pool
Browse files Browse the repository at this point in the history
  • Loading branch information
강보원 authored and 강보원 committed Jan 22, 2025
1 parent 9e826ec commit 7b02ed4
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 64 deletions.
39 changes: 34 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@
package main

import (
"context"
"fmt"
"log"
"net/http"
"os"
"os/signal"
"syscall"
"time"

"github.com/asynkron/protoactor-go/actor"

Expand All @@ -25,20 +30,44 @@ func main() {
props := actor.PropsFromProducer(func() actor.Actor {
return &check.KorcenActor{}
})

korcenPID := system.Root.Spawn(props)

config := middleware.MiddlewareConfig{
Capacity: 100, // 최대 토큰 수
RefillRate: 100.0 / 60.0, // 초당 리필 속도 (1분당 100개)
RefillRate: 100.0 / 60.0, // 초당 리필 속도 (1분당 100회)
}

r := router.SetupRouter(system, korcenPID, config)

r.GET("/swagger/*any", ginSwagger.WrapHandler(swaggerFiles.Handler))

if err := r.Run(":7777"); err != nil {
log.Println("Failed to start server:", err)
os.Exit(1)
srv := &http.Server{
Addr: ":7777",
Handler: r,
}

go func() {
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("Failed to start server: %v\n", err)
}
}()
log.Println("Server is running on port 7777.")

quit := make(chan os.Signal, 1)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM)
<-quit
log.Println("Shutdown signal received...")

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

if err := srv.Shutdown(ctx); err != nil {
log.Fatalf("Server forced to shutdown: %v", err)
}

log.Println("Shutting down worker pool...")
check.ShutdownWorkerPool()
log.Println("Worker pool has been shut down successfully.")

log.Println("Server exiting.")
}
29 changes: 20 additions & 9 deletions pkg/check/korcen.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,14 @@ func freeKorcenResult(result *KorcenResult) {
korcenPool.Put(result)
}

var workerPool = NewWorkerPool(10)

func ShutdownWorkerPool() {
workerPool.Shutdown()
}

var (
globalLRU = NewShardedLRUCache(16, 64)
globalLRU = NewShardedLRUCache(16, 64, workerPool)
)

func Korcen(header *Header) (*Respond, error) {
Expand Down Expand Up @@ -119,6 +125,11 @@ type KorcenResponse struct {
Err error
}

type KorcenResponseMessage struct {
Respond *Respond
Err error
}

type KorcenActor struct{}

func (k *KorcenActor) Receive(context actor.Context) {
Expand All @@ -133,18 +144,18 @@ func (k *KorcenActor) Receive(context actor.Context) {
return
}

resp, err := Korcen(msg.Header)
if err != nil {
context.Respond(&KorcenResponse{
Respond: nil,
workerPool.Submit(func() {
resp, err := Korcen(msg.Header)
context.Send(context.Self(), &KorcenResponseMessage{
Respond: resp,
Err: err,
})
return
}
})

case *KorcenResponseMessage:
context.Respond(&KorcenResponse{
Respond: resp,
Err: nil,
Respond: msg.Respond,
Err: msg.Err,
})

default:
Expand Down
57 changes: 7 additions & 50 deletions pkg/check/sharded_lru.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,47 +70,6 @@ func putEntry(e *entry) {
entryPool.Put(e)
}

type GoroutineManager struct {
numWorkers int
jobChan chan func()
stopChan chan struct{}
wg sync.WaitGroup
}

func NewGoroutineManager(workerCount int) *GoroutineManager {
manager := &GoroutineManager{
numWorkers: workerCount,
jobChan: make(chan func(), 1000),
stopChan: make(chan struct{}),
}

manager.wg.Add(workerCount)
for i := 0; i < workerCount; i++ {
go func() {
defer manager.wg.Done()
for {
select {
case job := <-manager.jobChan:
job()
case <-manager.stopChan:
return
}
}
}()
}

return manager
}

func (gm *GoroutineManager) Submit(job func()) {
gm.jobChan <- job
}

func (gm *GoroutineManager) Shutdown() {
close(gm.stopChan)
gm.wg.Wait()
}

type UsageStats struct {
Hits uint64
Misses uint64
Expand Down Expand Up @@ -236,11 +195,11 @@ func (c *LRUCache) removeOldest() {
type ShardedLRUCache struct {
shards []*TrackableLRUCache
shardCount int
manager *GoroutineManager
workerPool *WorkerPool
allocator *SlabAllocator
}

func NewShardedLRUCache(shardCount int, perShardCapacity int) *ShardedLRUCache {
func NewShardedLRUCache(shardCount int, perShardCapacity int, workerPool *WorkerPool) *ShardedLRUCache {
if shardCount <= 0 || (shardCount&(shardCount-1)) != 0 {
panic("shardCount must be a power of two")
}
Expand All @@ -255,12 +214,10 @@ func NewShardedLRUCache(shardCount int, perShardCapacity int) *ShardedLRUCache {
shards[i] = trackedLRU
}

manager := NewGoroutineManager(shardCount)

return &ShardedLRUCache{
shards: shards,
shardCount: shardCount,
manager: manager,
workerPool: workerPool,
allocator: allocator,
}
}
Expand All @@ -279,9 +236,9 @@ func (c *ShardedLRUCache) Set(key string, value *KorcenResult) error {
shard := c.getShard(key)
done := make(chan error, 1)

c.manager.Submit(func() {
err := shard.Set(key, value)
done <- err
c.workerPool.Submit(func() {
shard.Set(key, value)
done <- nil
})

err := <-done
Expand Down Expand Up @@ -309,5 +266,5 @@ func (c *ShardedLRUCache) GetStats() UsageStats {
}

func (c *ShardedLRUCache) Stop() {
c.manager.Shutdown()
c.workerPool.Shutdown()
}

0 comments on commit 7b02ed4

Please sign in to comment.