From 3dfd94cbc3b7a1cd6b0d796eeee315cd80886311 Mon Sep 17 00:00:00 2001 From: Viktor Kramarenko Date: Tue, 24 Sep 2024 17:33:27 +0300 Subject: [PATCH] first draft Signed-off-by: Viktor Kramarenko --- .../src/pkg/cache/cache.go | 60 ++++++++++++++++--- .../src/pkg/controller/lvg_watcher_cache.go | 9 +-- 2 files changed, 54 insertions(+), 15 deletions(-) diff --git a/images/sds-local-volume-scheduler-extender/src/pkg/cache/cache.go b/images/sds-local-volume-scheduler-extender/src/pkg/cache/cache.go index 16954fdb..60f169dc 100644 --- a/images/sds-local-volume-scheduler-extender/src/pkg/cache/cache.go +++ b/images/sds-local-volume-scheduler-extender/src/pkg/cache/cache.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "sync" + "time" snc "github.com/deckhouse/sds-node-configurator/api/v1alpha1" v1 "k8s.io/api/core/v1" @@ -21,10 +22,11 @@ const ( ) type Cache struct { - lvgs sync.Map // map[string]*lvgCache - pvcLVGs sync.Map // map[string][]string - nodeLVGs sync.Map // map[string][]string - log logger.Logger + lvgs sync.Map // map[string]*lvgCache + pvcLVGs sync.Map // map[string][]string + nodeLVGs sync.Map // map[string][]string + log logger.Logger + expiredDuration time.Duration } type lvgCache struct { @@ -44,17 +46,57 @@ type pvcCache struct { // NewCache initialize new cache. func NewCache(logger logger.Logger) *Cache { - return &Cache{ - log: logger, + ch := &Cache{ + log: logger, + expiredDuration: 30 * time.Second, } + + go func() { + timer := time.NewTimer(ch.expiredDuration) + + for { + select { + case <-timer.C: + ch.clearBoundExpiredPVC() + timer.Reset(ch.expiredDuration) + } + } + }() + return ch +} + +func (c *Cache) clearBoundExpiredPVC() { + c.log.Debug("[clearBoundExpiredPVC] starts to clear expired PVC") + c.lvgs.Range(func(lvgName, _ any) bool { + pvcs, err := c.GetAllPVCForLVG(lvgName.(string)) + if err != nil { + c.log.Error(err, fmt.Sprintf("[clearBoundExpiredPVC] unable to get PVCs for the LVMVolumeGroup %s", lvgName.(string))) + return false + } + + for _, pvc := range pvcs { + if pvc.Status.Phase != v1.ClaimBound { + c.log.Trace(fmt.Sprintf("[clearBoundExpiredPVC] PVC %s is not in a Bound state", pvc.Name)) + continue + } + + if time.Now().Sub(pvc.CreationTimestamp.Time) > c.expiredDuration { + c.log.Warning(fmt.Sprintf("[clearBoundExpiredPVC] PVC %s is in a Bound state and expired, remove it from the cache", pvc.Name)) + c.RemovePVCFromTheCache(pvc) + } else { + c.log.Trace(fmt.Sprintf("[clearBoundExpiredPVC] PVC %s is in a Bound state but not expired yet.", pvc.Name)) + } + } + + return true + }) + c.log.Debug("[clearBoundExpiredPVC] finished the expired PVC clearing") } // AddLVG adds selected LVMVolumeGroup resource to the cache. If it is already stored, does nothing. func (c *Cache) AddLVG(lvg *snc.LVMVolumeGroup) { _, loaded := c.lvgs.LoadOrStore(lvg.Name, &lvgCache{ - lvg: lvg, - thickPVCs: sync.Map{}, - thinPools: sync.Map{}, + lvg: lvg, }) if loaded { c.log.Debug(fmt.Sprintf("[AddLVG] the LVMVolumeGroup %s has been already added to the cache", lvg.Name)) diff --git a/images/sds-local-volume-scheduler-extender/src/pkg/controller/lvg_watcher_cache.go b/images/sds-local-volume-scheduler-extender/src/pkg/controller/lvg_watcher_cache.go index 7c6e4ea3..7d7b31f5 100644 --- a/images/sds-local-volume-scheduler-extender/src/pkg/controller/lvg_watcher_cache.go +++ b/images/sds-local-volume-scheduler-extender/src/pkg/controller/lvg_watcher_cache.go @@ -3,20 +3,18 @@ package controller import ( "context" "fmt" - "reflect" - snc "github.com/deckhouse/sds-node-configurator/api/v1alpha1" v1 "k8s.io/api/core/v1" "k8s.io/client-go/util/workqueue" + "reflect" + "sds-local-volume-scheduler-extender/pkg/cache" + "sds-local-volume-scheduler-extender/pkg/logger" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" - - "sds-local-volume-scheduler-extender/pkg/cache" - "sds-local-volume-scheduler-extender/pkg/logger" ) const ( @@ -90,7 +88,6 @@ func RunLVGWatcherCacheController( log.Info(fmt.Sprintf("[RunCacheWatcherController] UpdateFunc starts the cache reconciliation for the LVMVolumeGroup %s", e.ObjectNew.GetName())) oldLvg := e.ObjectOld newLvg := e.ObjectNew - err := cache.UpdateLVG(newLvg) if err != nil { log.Error(err, fmt.Sprintf("[RunLVGWatcherCacheController] unable to update the LVMVolumeGroup %s cache", newLvg.Name))