From bcc10bb76427cfbcd24eb55c4509b7e3dea8d5b5 Mon Sep 17 00:00:00 2001 From: Viktor Kramarenko Date: Thu, 26 Sep 2024 13:56:16 +0300 Subject: [PATCH] final refactoring Signed-off-by: Viktor Kramarenko --- .../src/cmd/main.go | 30 ++-- .../src/config/config.go | 54 ------ .../src/pkg/cache/cache.go | 95 +++++------ .../src/pkg/cache/cache_test.go | 156 ++++++++++++++++-- .../src/pkg/controller/lvg_watcher_cache.go | 8 +- 5 files changed, 214 insertions(+), 129 deletions(-) delete mode 100644 images/sds-local-volume-scheduler-extender/src/config/config.go diff --git a/images/sds-local-volume-scheduler-extender/src/cmd/main.go b/images/sds-local-volume-scheduler-extender/src/cmd/main.go index 68ced09d..0550266c 100644 --- a/images/sds-local-volume-scheduler-extender/src/cmd/main.go +++ b/images/sds-local-volume-scheduler-extender/src/cmd/main.go @@ -54,13 +54,14 @@ const ( ) type Config struct { - ListenAddr string `json:"listen"` - DefaultDivisor float64 `json:"default-divisor"` - LogLevel string `json:"log-level"` - CacheSize int `json:"cache-size"` - HealthProbeBindAddress string `json:"health-probe-bind-address"` - CertFile string `json:"cert-file"` - KeyFile string `json:"key-file"` + ListenAddr string `json:"listen"` + DefaultDivisor float64 `json:"default-divisor"` + LogLevel string `json:"log-level"` + CacheSize int `json:"cache-size"` + HealthProbeBindAddress string `json:"health-probe-bind-address"` + CertFile string `json:"cert-file"` + KeyFile string `json:"key-file"` + PVCExpiredDurationSec time.Duration `json:"pvc-expired-duration-sec"` } var cfgFilePath string @@ -73,12 +74,13 @@ var resourcesSchemeFuncs = []func(*runtime.Scheme) error{ } var config = &Config{ - ListenAddr: defaultListenAddr, - DefaultDivisor: defaultDivisor, - LogLevel: "2", - CacheSize: defaultCacheSize, - CertFile: defaultcertFile, - KeyFile: defaultkeyFile, + ListenAddr: defaultListenAddr, + DefaultDivisor: defaultDivisor, + LogLevel: "2", + CacheSize: defaultCacheSize, + CertFile: defaultcertFile, + KeyFile: defaultkeyFile, + PVCExpiredDurationSec: cache.DefaultPVCExpiredDurationSec * time.Second, } var rootCmd = &cobra.Command{ @@ -167,7 +169,7 @@ func subMain(ctx context.Context) error { return err } - schedulerCache := cache.NewCache(*log) + schedulerCache := cache.NewCache(*log, config.PVCExpiredDurationSec) log.Info("[subMain] scheduler cache was initialized") h, err := scheduler.NewHandler(ctx, mgr.GetClient(), *log, schedulerCache, config.DefaultDivisor) diff --git a/images/sds-local-volume-scheduler-extender/src/config/config.go b/images/sds-local-volume-scheduler-extender/src/config/config.go deleted file mode 100644 index 4d90b2aa..00000000 --- a/images/sds-local-volume-scheduler-extender/src/config/config.go +++ /dev/null @@ -1,54 +0,0 @@ -/* -Copyright 2024 Flant JSC -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package config - -import ( - "os" - - "sds-local-volume-scheduler-extender/pkg/logger" -) - -const ( - NodeName = "KUBE_NODE_NAME" - LogLevel = "LOG_LEVEL" - DefaultHealthProbeBindAddressEnvName = "HEALTH_PROBE_BIND_ADDRESS" - DefaultHealthProbeBindAddress = ":8081" -) - -type Options struct { - NodeName string - Version string - Loglevel logger.Verbosity - HealthProbeBindAddress string -} - -func NewConfig() *Options { - var opts Options - - loglevel := os.Getenv(LogLevel) - if loglevel == "" { - opts.Loglevel = logger.DebugLevel - } else { - opts.Loglevel = logger.Verbosity(loglevel) - } - - opts.HealthProbeBindAddress = os.Getenv(DefaultHealthProbeBindAddressEnvName) - if opts.HealthProbeBindAddress == "" { - opts.HealthProbeBindAddress = DefaultHealthProbeBindAddress - } - - opts.Version = "dev" - - return &opts -} diff --git a/images/sds-local-volume-scheduler-extender/src/pkg/cache/cache.go b/images/sds-local-volume-scheduler-extender/src/pkg/cache/cache.go index 60f169dc..8ca08ae6 100644 --- a/images/sds-local-volume-scheduler-extender/src/pkg/cache/cache.go +++ b/images/sds-local-volume-scheduler-extender/src/pkg/cache/cache.go @@ -15,6 +15,8 @@ import ( ) const ( + DefaultPVCExpiredDurationSec = 30 + pvcPerLVGCount = 150 lvgsPerPVCCount = 5 lvgsPerNodeCount = 5 @@ -45,23 +47,21 @@ type pvcCache struct { } // NewCache initialize new cache. -func NewCache(logger logger.Logger) *Cache { +func NewCache(logger logger.Logger, pvcExpDurSec time.Duration) *Cache { ch := &Cache{ log: logger, - expiredDuration: 30 * time.Second, + expiredDuration: pvcExpDurSec, } go func() { timer := time.NewTimer(ch.expiredDuration) - for { - select { - case <-timer.C: - ch.clearBoundExpiredPVC() - timer.Reset(ch.expiredDuration) - } + for range timer.C { + ch.clearBoundExpiredPVC() + timer.Reset(ch.expiredDuration) } }() + return ch } @@ -80,7 +80,7 @@ func (c *Cache) clearBoundExpiredPVC() { continue } - if time.Now().Sub(pvc.CreationTimestamp.Time) > c.expiredDuration { + if time.Since(pvc.CreationTimestamp.Time) > c.expiredDuration { c.log.Warning(fmt.Sprintf("[clearBoundExpiredPVC] PVC %s is in a Bound state and expired, remove it from the cache", pvc.Name)) c.RemovePVCFromTheCache(pvc) } else { @@ -118,29 +118,30 @@ func (c *Cache) AddLVG(lvg *snc.LVMVolumeGroup) { // UpdateLVG updated selected LVMVolumeGroup resource in the cache. If such LVMVolumeGroup is not stored, returns an error. func (c *Cache) UpdateLVG(lvg *snc.LVMVolumeGroup) error { - if lvgCh, found := c.lvgs.Load(lvg.Name); found { - lvgCh.(*lvgCache).lvg = lvg - - c.log.Trace(fmt.Sprintf("[UpdateLVG] the LVMVolumeGroup %s nodes: %v", lvg.Name, lvg.Status.Nodes)) - for _, node := range lvg.Status.Nodes { - lvgsOnTheNode, _ := c.nodeLVGs.Load(node.Name) - if lvgsOnTheNode == nil { - lvgsOnTheNode = make([]string, 0, lvgsPerNodeCount) - } + lvgCh, found := c.lvgs.Load(lvg.Name) + if !found { + return fmt.Errorf("the LVMVolumeGroup %s was not found in the lvgCh", lvg.Name) + } - if !slices2.Contains(lvgsOnTheNode.([]string), lvg.Name) { - lvgsOnTheNode = append(lvgsOnTheNode.([]string), lvg.Name) - c.log.Debug(fmt.Sprintf("[UpdateLVG] the LVMVolumeGroup %s has been added to the node %s", lvg.Name, node.Name)) - c.nodeLVGs.Store(node.Name, lvgsOnTheNode) - } else { - c.log.Debug(fmt.Sprintf("[UpdateLVG] the LVMVolumeGroup %s has been already added to the node %s", lvg.Name, node.Name)) - } + lvgCh.(*lvgCache).lvg = lvg + + c.log.Trace(fmt.Sprintf("[UpdateLVG] the LVMVolumeGroup %s nodes: %v", lvg.Name, lvg.Status.Nodes)) + for _, node := range lvg.Status.Nodes { + lvgsOnTheNode, _ := c.nodeLVGs.Load(node.Name) + if lvgsOnTheNode == nil { + lvgsOnTheNode = make([]string, 0, lvgsPerNodeCount) } - return nil + if !slices2.Contains(lvgsOnTheNode.([]string), lvg.Name) { + lvgsOnTheNode = append(lvgsOnTheNode.([]string), lvg.Name) + c.log.Debug(fmt.Sprintf("[UpdateLVG] the LVMVolumeGroup %s has been added to the node %s", lvg.Name, node.Name)) + c.nodeLVGs.Store(node.Name, lvgsOnTheNode) + } else { + c.log.Debug(fmt.Sprintf("[UpdateLVG] the LVMVolumeGroup %s has been already added to the node %s", lvg.Name, node.Name)) + } } - return fmt.Errorf("the LVMVolumeGroup %s was not found in the lvgCh", lvg.Name) + return nil } // TryGetLVG returns selected LVMVolumeGroup resource if it is stored in the cache, otherwise returns nil. @@ -228,8 +229,9 @@ func (c *Cache) DeleteLVG(lvgName string) { c.nodeLVGs.Range(func(_, lvgNames any) bool { for i, lvg := range lvgNames.([]string) { if lvg == lvgName { - //nolint:gocritic + //nolint:gocritic,ineffassign lvgNames = append(lvgNames.([]string)[:i], lvgNames.([]string)[i+1:]...) + return false } } @@ -239,8 +241,9 @@ func (c *Cache) DeleteLVG(lvgName string) { c.pvcLVGs.Range(func(_, lvgNames any) bool { for i, lvg := range lvgNames.([]string) { if lvg == lvgName { - //nolint:gocritic + //nolint:gocritic,ineffassign lvgNames = append(lvgNames.([]string)[:i], lvgNames.([]string)[i+1:]...) + return false } } @@ -687,27 +690,24 @@ func (c *Cache) RemoveSpaceReservationForPVCWithSelectedNode(pvc *v1.PersistentV // RemovePVCFromTheCache completely removes selected PVC in the cache. func (c *Cache) RemovePVCFromTheCache(pvc *v1.PersistentVolumeClaim) { - targetPvcKey := configurePVCKey(pvc) - - c.log.Debug(fmt.Sprintf("[RemovePVCFromTheCache] run full cache wipe for PVC %s", targetPvcKey)) - c.pvcLVGs.Range(func(pvcKey, lvgArray any) bool { - if pvcKey == targetPvcKey { - for _, lvgName := range lvgArray.([]string) { - lvgCh, found := c.lvgs.Load(lvgName) - if found { - lvgCh.(*lvgCache).thickPVCs.Delete(pvcKey.(string)) - lvgCh.(*lvgCache).thinPools.Range(func(_, tpCh any) bool { - tpCh.(*thinPoolCache).pvcs.Delete(pvcKey) - return true - }) - } + pvcKey := configurePVCKey(pvc) + + c.log.Debug(fmt.Sprintf("[RemovePVCFromTheCache] run full cache wipe for PVC %s", pvcKey)) + lvgSlice, ok := c.pvcLVGs.Load(pvcKey) + if ok { + for _, lvgName := range lvgSlice.([]string) { + lvgCh, found := c.lvgs.Load(lvgName) + if found { + lvgCh.(*lvgCache).thickPVCs.Delete(pvcKey) + lvgCh.(*lvgCache).thinPools.Range(func(_, tpCh any) bool { + tpCh.(*thinPoolCache).pvcs.Delete(pvcKey) + return true + }) } } + } - return true - }) - - c.pvcLVGs.Delete(targetPvcKey) + c.pvcLVGs.Delete(pvcKey) } // FindLVGForPVCBySelectedNode finds a suitable LVMVolumeGroup resource's name for selected PVC based on selected node. If no such LVMVolumeGroup found, returns empty string. @@ -730,6 +730,7 @@ func (c *Cache) FindLVGForPVCBySelectedNode(pvc *v1.PersistentVolumeClaim, nodeN for _, lvgName := range lvgsForPVC.([]string) { if slices2.Contains(lvgsOnTheNode.([]string), lvgName) { targetLVG = lvgName + break } } diff --git a/images/sds-local-volume-scheduler-extender/src/pkg/cache/cache_test.go b/images/sds-local-volume-scheduler-extender/src/pkg/cache/cache_test.go index e3586bc4..41dc3f6e 100644 --- a/images/sds-local-volume-scheduler-extender/src/pkg/cache/cache_test.go +++ b/images/sds-local-volume-scheduler-extender/src/pkg/cache/cache_test.go @@ -3,6 +3,7 @@ package cache import ( "fmt" "testing" + "time" snc "github.com/deckhouse/sds-node-configurator/api/v1alpha1" "github.com/stretchr/testify/assert" @@ -13,8 +14,141 @@ import ( "sds-local-volume-scheduler-extender/pkg/logger" ) +func TestCache(t *testing.T) { + log := logger.Logger{} + t.Run("clearBoundExpiredPVC", func(t *testing.T) { + const ( + thickBoundExpiredPVC = "thick-bound-expired-pvc" + thickPendingExpiredPVC = "thick-pending-expired-pvc" + thickBoundNotExpiredPVC = "thick-bound-not-expired-pvc" + + thinBoundExpiredPVC = "thin-bound-expired-pvc" + thinPendingExpiredPVC = "thin-pending-expired-pvc" + thinBoundNotExpiredPVC = "thin-bound-not-expired-pvc" + ) + ch := NewCache(log, DefaultPVCExpiredDurationSec*time.Second) + expiredTime := time.Now().Add((-DefaultPVCExpiredDurationSec - 1) * time.Second) + thickPVCs := map[string]*pvcCache{ + "/" + thickBoundExpiredPVC: { + pvc: &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: thickBoundExpiredPVC, + CreationTimestamp: metav1.NewTime(expiredTime), + }, + Status: v1.PersistentVolumeClaimStatus{ + Phase: v1.ClaimBound, + }, + }, + }, + "/" + thickPendingExpiredPVC: { + pvc: &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: thickPendingExpiredPVC, + CreationTimestamp: metav1.NewTime(expiredTime), + }, + Status: v1.PersistentVolumeClaimStatus{ + Phase: v1.ClaimPending, + }, + }, + }, + "/" + thickBoundNotExpiredPVC: { + pvc: &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: thickBoundNotExpiredPVC, + CreationTimestamp: metav1.NewTime(time.Now()), + }, + Status: v1.PersistentVolumeClaimStatus{ + Phase: v1.ClaimBound, + }, + }, + }, + } + thinPVCs := map[string]*pvcCache{ + "/" + thinBoundExpiredPVC: { + pvc: &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: thinBoundExpiredPVC, + CreationTimestamp: metav1.NewTime(expiredTime), + }, + Status: v1.PersistentVolumeClaimStatus{ + Phase: v1.ClaimBound, + }, + }, + }, + "/" + thinPendingExpiredPVC: { + pvc: &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: thinPendingExpiredPVC, + CreationTimestamp: metav1.NewTime(expiredTime), + }, + Status: v1.PersistentVolumeClaimStatus{ + Phase: v1.ClaimPending, + }, + }, + }, + "/" + thinBoundNotExpiredPVC: { + pvc: &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: thinBoundNotExpiredPVC, + CreationTimestamp: metav1.NewTime(time.Now()), + }, + Status: v1.PersistentVolumeClaimStatus{ + Phase: v1.ClaimBound, + }, + }, + }, + } + + const tpName = "thin-pool" + thinPools := map[string]*thinPoolCache{ + tpName: {}, + } + for pvcName, pvc := range thinPVCs { + thinPools[tpName].pvcs.Store(pvcName, pvc) + } + + const lvgName = "lvg-name" + lvgs := map[string]*lvgCache{ + lvgName: {}, + } + + for name, pvc := range thickPVCs { + lvgs[lvgName].thickPVCs.Store(name, pvc) + } + for name, tp := range thinPools { + lvgs[lvgName].thinPools.Store(name, tp) + } + + ch.lvgs.Store(lvgName, lvgs[lvgName]) + ch.pvcLVGs.Store("/"+thickBoundExpiredPVC, []string{lvgName}) + ch.pvcLVGs.Store("/"+thickPendingExpiredPVC, []string{lvgName}) + ch.pvcLVGs.Store("/"+thickBoundNotExpiredPVC, []string{lvgName}) + ch.pvcLVGs.Store("/"+thinBoundExpiredPVC, []string{lvgName}) + ch.pvcLVGs.Store("/"+thinBoundNotExpiredPVC, []string{lvgName}) + ch.pvcLVGs.Store("/"+thinPendingExpiredPVC, []string{lvgName}) + + ch.clearBoundExpiredPVC() + + lvgCh, _ := ch.lvgs.Load(lvgName) + _, found := lvgCh.(*lvgCache).thickPVCs.Load("/" + thickBoundExpiredPVC) + assert.False(t, found) + _, found = lvgCh.(*lvgCache).thickPVCs.Load("/" + thickPendingExpiredPVC) + assert.True(t, found) + _, found = lvgCh.(*lvgCache).thickPVCs.Load("/" + thickBoundNotExpiredPVC) + assert.True(t, found) + + tpCh, _ := lvgCh.(*lvgCache).thinPools.Load(tpName) + _, found = tpCh.(*thinPoolCache).pvcs.Load("/" + thinBoundExpiredPVC) + assert.False(t, found) + _, found = tpCh.(*thinPoolCache).pvcs.Load("/" + thinPendingExpiredPVC) + assert.True(t, found) + _, found = tpCh.(*thinPoolCache).pvcs.Load("/" + thinBoundNotExpiredPVC) + assert.True(t, found) + }) +} + func BenchmarkCache_DeleteLVG(b *testing.B) { - cache := NewCache(logger.Logger{}) + cache := NewCache(logger.Logger{}, DefaultPVCExpiredDurationSec*time.Second) lvg := &snc.LVMVolumeGroup{ ObjectMeta: metav1.ObjectMeta{ Name: "first", @@ -33,7 +167,7 @@ func BenchmarkCache_DeleteLVG(b *testing.B) { } func BenchmarkCache_GetLVGReservedSpace(b *testing.B) { - cache := NewCache(logger.Logger{}) + cache := NewCache(logger.Logger{}, DefaultPVCExpiredDurationSec*time.Second) lvg := &snc.LVMVolumeGroup{ ObjectMeta: metav1.ObjectMeta{ Name: "first", @@ -99,7 +233,7 @@ func BenchmarkCache_GetLVGReservedSpace(b *testing.B) { } func BenchmarkCache_AddPVC(b *testing.B) { - cache := NewCache(logger.Logger{}) + cache := NewCache(logger.Logger{}, DefaultPVCExpiredDurationSec*time.Second) lvg1 := &snc.LVMVolumeGroup{ ObjectMeta: metav1.ObjectMeta{ @@ -167,7 +301,7 @@ func BenchmarkCache_AddPVC(b *testing.B) { } func BenchmarkCache_GetAllLVG(b *testing.B) { - cache := NewCache(logger.Logger{}) + cache := NewCache(logger.Logger{}, DefaultPVCExpiredDurationSec*time.Second) lvgs := map[string]*lvgCache{ "first": { lvg: &snc.LVMVolumeGroup{ @@ -201,7 +335,7 @@ func BenchmarkCache_GetAllLVG(b *testing.B) { } func BenchmarkCache_GetLVGNamesByNodeName(b *testing.B) { - cache := NewCache(logger.Logger{}) + cache := NewCache(logger.Logger{}, DefaultPVCExpiredDurationSec*time.Second) lvgs := []string{ "first", "second", @@ -222,7 +356,7 @@ func BenchmarkCache_GetLVGNamesByNodeName(b *testing.B) { } func BenchmarkCache_TryGetLVG(b *testing.B) { - cache := NewCache(logger.Logger{}) + cache := NewCache(logger.Logger{}, DefaultPVCExpiredDurationSec*time.Second) name := "test-name" lvg := &snc.LVMVolumeGroup{ @@ -243,7 +377,7 @@ func BenchmarkCache_TryGetLVG(b *testing.B) { } func BenchmarkCache_AddLVG(b *testing.B) { - cache := NewCache(logger.Logger{}) + cache := NewCache(logger.Logger{}, DefaultPVCExpiredDurationSec*time.Second) i := 0 b.RunParallel(func(pb *testing.PB) { @@ -299,7 +433,7 @@ func BenchmarkCache_AddLVG(b *testing.B) { } func TestCache_UpdateLVG(t *testing.T) { - cache := NewCache(logger.Logger{}) + cache := NewCache(logger.Logger{}, DefaultPVCExpiredDurationSec*time.Second) name := "test-lvg" lvg := &snc.LVMVolumeGroup{ ObjectMeta: metav1.ObjectMeta{ @@ -330,7 +464,7 @@ func TestCache_UpdateLVG(t *testing.T) { } func BenchmarkCache_UpdateLVG(b *testing.B) { - cache := NewCache(logger.Logger{}) + cache := NewCache(logger.Logger{}, DefaultPVCExpiredDurationSec*time.Second) name := "test-name" i := 0 @@ -367,7 +501,7 @@ func BenchmarkCache_UpdateLVG(b *testing.B) { } func BenchmarkCache_UpdatePVC(b *testing.B) { - cache := NewCache(logger.Logger{}) + cache := NewCache(logger.Logger{}, DefaultPVCExpiredDurationSec*time.Second) i := 0 lvg := &snc.LVMVolumeGroup{ ObjectMeta: metav1.ObjectMeta{ @@ -415,7 +549,7 @@ func BenchmarkCache_UpdatePVC(b *testing.B) { } func BenchmarkCache_FullLoad(b *testing.B) { - cache := NewCache(logger.Logger{}) + cache := NewCache(logger.Logger{}, DefaultPVCExpiredDurationSec*time.Second) const ( nodeName = "test-node" diff --git a/images/sds-local-volume-scheduler-extender/src/pkg/controller/lvg_watcher_cache.go b/images/sds-local-volume-scheduler-extender/src/pkg/controller/lvg_watcher_cache.go index 7d7b31f5..7a331e31 100644 --- a/images/sds-local-volume-scheduler-extender/src/pkg/controller/lvg_watcher_cache.go +++ b/images/sds-local-volume-scheduler-extender/src/pkg/controller/lvg_watcher_cache.go @@ -3,18 +3,20 @@ package controller import ( "context" "fmt" + "reflect" + snc "github.com/deckhouse/sds-node-configurator/api/v1alpha1" v1 "k8s.io/api/core/v1" "k8s.io/client-go/util/workqueue" - "reflect" - "sds-local-volume-scheduler-extender/pkg/cache" - "sds-local-volume-scheduler-extender/pkg/logger" "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/manager" "sigs.k8s.io/controller-runtime/pkg/reconcile" "sigs.k8s.io/controller-runtime/pkg/source" + + "sds-local-volume-scheduler-extender/pkg/cache" + "sds-local-volume-scheduler-extender/pkg/logger" ) const (