From a141becdaa2e6354290bba7b06e3d752e1a85f82 Mon Sep 17 00:00:00 2001 From: Andriy Semenets Date: Tue, 14 Jan 2025 12:50:39 +0100 Subject: [PATCH] Fix an issue with the concurrent read/write to the cache --- pkg/sources/cache.go | 11 +++++++---- pkg/sources/fetch.go | 22 +++++++++++++++------- 2 files changed, 22 insertions(+), 11 deletions(-) diff --git a/pkg/sources/cache.go b/pkg/sources/cache.go index bc55fc3..acd7643 100644 --- a/pkg/sources/cache.go +++ b/pkg/sources/cache.go @@ -21,7 +21,7 @@ type CacheItem struct { type FileCache struct { filename string - mutex sync.RWMutex + mutex sync.Mutex data map[string]CacheItem } @@ -73,8 +73,8 @@ func (c *FileCache) Set(key string, value any, expiration time.Duration) error { // Get retrieves a value from the cache and unmarshals it into the provided destination func (c *FileCache) Get(key string, dest any) (bool, error) { - c.mutex.RLock() - defer c.mutex.RUnlock() + c.mutex.Lock() + defer c.mutex.Unlock() item, exists := c.data[key] if !exists { @@ -84,7 +84,10 @@ func (c *FileCache) Get(key string, dest any) (bool, error) { // Check if item has expired if !item.ExpiresAt.IsZero() && time.Now().After(item.ExpiresAt) { delete(c.data, key) - c.save() + err := c.save() + if err != nil { + return false, err + } return false, nil } diff --git a/pkg/sources/fetch.go b/pkg/sources/fetch.go index 0d79960..f6ba9d1 100644 --- a/pkg/sources/fetch.go +++ b/pkg/sources/fetch.go @@ -3,6 +3,7 @@ package sources import ( "context" "fmt" + "sync" "time" "github.com/depshubhq/depshub/pkg/sources/crates" @@ -35,7 +36,6 @@ func (f fetcher) Fetch(uniqueDependencies []types.Dependency) (types.PackagesInf pypiSource := pypi.PyPISource{} background := context.Background() - activeRequests := 0 // Use a semaphore to limit concurrent requests sem := make(chan struct{}, MaxConcurrent) @@ -45,10 +45,13 @@ func (f fetcher) Fetch(uniqueDependencies []types.Dependency) (types.PackagesInf return nil, err } + var wg sync.WaitGroup for _, dep := range uniqueDependencies { - activeRequests++ + wg.Add(1) + + go func() { + defer wg.Done() - go func(dep types.Dependency) { sem <- struct{}{} // Acquire semaphore defer func() { <-sem // Release semaphore @@ -80,7 +83,7 @@ func (f fetcher) Fetch(uniqueDependencies []types.Dependency) (types.PackagesInf if err != nil { fmt.Printf("Error fetching package data: %s\n", err) } else { - c.Set(key, packageInfo, 24*time.Hour) + c.Set(key, packageInfo, 48*time.Hour) } } @@ -88,14 +91,19 @@ func (f fetcher) Fetch(uniqueDependencies []types.Dependency) (types.PackagesInf pkg: packageInfo, err: err, } - }(dep) + }() } + // Start a goroutine to close resultChan after all workers are done + go func() { + wg.Wait() + close(resultChan) + }() + // Collect results var packagesData = make(types.PackagesInfo) - for range activeRequests { - result := <-resultChan + for result := range resultChan { if result.err != nil { fmt.Printf("Error fetching package data: %s\n", result.err) continue