From a3eb340d43ee6d333a320dbf22b7bbd0e0d73e8b Mon Sep 17 00:00:00 2001 From: Viktor Kramarenko Date: Fri, 26 Apr 2024 11:50:15 +0300 Subject: [PATCH] [controller] Add some logic refactoring and minor fixes Signed-off-by: Viktor Kramarenko --- .../pkg/cache/cache.go | 19 ++++++++++++++++++ .../pkg/cache/cache_test.go | 20 +++++++++---------- .../pkg/controller/lvg_watcher_cache.go | 16 +++++++-------- .../pkg/scheduler/filter.go | 2 +- 4 files changed, 38 insertions(+), 19 deletions(-) diff --git a/images/sds-local-volume-scheduler-extender/pkg/cache/cache.go b/images/sds-local-volume-scheduler-extender/pkg/cache/cache.go index ecabb60c..32ffbca6 100644 --- a/images/sds-local-volume-scheduler-extender/pkg/cache/cache.go +++ b/images/sds-local-volume-scheduler-extender/pkg/cache/cache.go @@ -51,6 +51,7 @@ func (c *Cache) AddLVG(lvg *v1alpha1.LvmVolumeGroup) { return } + c.log.Trace(fmt.Sprintf("[AddLVG] the LVMVolumeGroup %s nodes: %v", lvg.Name, lvg.Status.Nodes)) for _, node := range lvg.Status.Nodes { lvgsOnTheNode, _ := c.nodeLVGs.Load(node.Name) if lvgsOnTheNode == nil { @@ -58,6 +59,7 @@ func (c *Cache) AddLVG(lvg *v1alpha1.LvmVolumeGroup) { } lvgsOnTheNode = append(lvgsOnTheNode.([]string), lvg.Name) + c.log.Debug(fmt.Sprintf("[AddLVG] the LVMVolumeGroup %s has been added to the node %s", lvg.Name, node.Name)) c.nodeLVGs.Store(node.Name, lvgsOnTheNode) } } @@ -66,6 +68,23 @@ func (c *Cache) AddLVG(lvg *v1alpha1.LvmVolumeGroup) { func (c *Cache) UpdateLVG(lvg *v1alpha1.LvmVolumeGroup) error { if cache, found := c.lvgs.Load(lvg.Name); found { cache.(*lvgCache).lvg = lvg + + c.log.Trace(fmt.Sprintf("[UpdateLVG] the LVMVolumeGroup %s nodes: %v", lvg.Name, lvg.Status.Nodes)) + for _, node := range lvg.Status.Nodes { + lvgsOnTheNode, _ := c.nodeLVGs.Load(node.Name) + if lvgsOnTheNode == nil { + lvgsOnTheNode = make([]string, 0, lvgsPerNodeCount) + } + + if !slices2.Contains(lvgsOnTheNode.([]string), lvg.Name) { + lvgsOnTheNode = append(lvgsOnTheNode.([]string), lvg.Name) + c.log.Debug(fmt.Sprintf("[UpdateLVG] the LVMVolumeGroup %s has been added to the node %s", lvg.Name, node.Name)) + c.nodeLVGs.Store(node.Name, lvgsOnTheNode) + } else { + c.log.Debug(fmt.Sprintf("[UpdateLVG] the LVMVolumeGroup %s has been already added to the node %s", lvg.Name, node.Name)) + } + } + return nil } diff --git a/images/sds-local-volume-scheduler-extender/pkg/cache/cache_test.go b/images/sds-local-volume-scheduler-extender/pkg/cache/cache_test.go index dc28272b..bc367531 100644 --- a/images/sds-local-volume-scheduler-extender/pkg/cache/cache_test.go +++ b/images/sds-local-volume-scheduler-extender/pkg/cache/cache_test.go @@ -304,7 +304,7 @@ func TestCache_UpdateLVG(t *testing.T) { Name: name, }, Status: v1alpha1.LvmVolumeGroupStatus{ - AllocatedSize: "1Gi", + AllocatedSize: resource.MustParse("1Gi"), }, } cache.AddLVG(lvg) @@ -314,7 +314,7 @@ func TestCache_UpdateLVG(t *testing.T) { Name: name, }, Status: v1alpha1.LvmVolumeGroupStatus{ - AllocatedSize: "2Gi", + AllocatedSize: resource.MustParse("2Gi"), }, } @@ -352,10 +352,10 @@ func BenchmarkCache_UpdateLVG(b *testing.B) { Name: name, }, Status: v1alpha1.LvmVolumeGroupStatus{ - AllocatedSize: fmt.Sprintf("2%dGi", i), + AllocatedSize: resource.MustParse(fmt.Sprintf("2%dGi", i)), }, } - b.Logf("updates the LVG with allocated size: %s", updated.Status.AllocatedSize) + b.Logf("updates the LVG with allocated size: %s", updated.Status.AllocatedSize.String()) err := cache.UpdateLVG(updated) if err != nil { b.Error(err) @@ -435,7 +435,7 @@ func BenchmarkCache_FullLoad(b *testing.B) { Name: nodeName, }, }, - AllocatedSize: fmt.Sprintf("1%dGi", i), + AllocatedSize: resource.MustParse(fmt.Sprintf("1%dGi", i)), }, }, { @@ -448,7 +448,7 @@ func BenchmarkCache_FullLoad(b *testing.B) { Name: nodeName, }, }, - AllocatedSize: fmt.Sprintf("1%dGi", i), + AllocatedSize: resource.MustParse(fmt.Sprintf("1%dGi", i)), }, }, { @@ -461,7 +461,7 @@ func BenchmarkCache_FullLoad(b *testing.B) { Name: nodeName, }, }, - AllocatedSize: fmt.Sprintf("1%dGi", i), + AllocatedSize: resource.MustParse(fmt.Sprintf("1%dGi", i)), }, }, } @@ -505,7 +505,7 @@ func BenchmarkCache_FullLoad(b *testing.B) { Name: fmt.Sprintf("test-lvg-%d", i), }, Status: v1alpha1.LvmVolumeGroupStatus{ - AllocatedSize: fmt.Sprintf("1%dGi", i+1), + AllocatedSize: resource.MustParse(fmt.Sprintf("1%dGi", i+1)), }, }, { @@ -513,7 +513,7 @@ func BenchmarkCache_FullLoad(b *testing.B) { Name: fmt.Sprintf("test-lvg-%d", i+1), }, Status: v1alpha1.LvmVolumeGroupStatus{ - AllocatedSize: fmt.Sprintf("1%dGi", i+1), + AllocatedSize: resource.MustParse(fmt.Sprintf("1%dGi", i+1)), }, }, { @@ -521,7 +521,7 @@ func BenchmarkCache_FullLoad(b *testing.B) { Name: fmt.Sprintf("test-lvg-%d", i+2), }, Status: v1alpha1.LvmVolumeGroupStatus{ - AllocatedSize: fmt.Sprintf("1%dGi", i+1), + AllocatedSize: resource.MustParse(fmt.Sprintf("1%dGi", i+1)), }, }, } diff --git a/images/sds-local-volume-scheduler-extender/pkg/controller/lvg_watcher_cache.go b/images/sds-local-volume-scheduler-extender/pkg/controller/lvg_watcher_cache.go index 56e69f25..1d7b1202 100644 --- a/images/sds-local-volume-scheduler-extender/pkg/controller/lvg_watcher_cache.go +++ b/images/sds-local-volume-scheduler-extender/pkg/controller/lvg_watcher_cache.go @@ -105,6 +105,13 @@ func RunLVGWatcherCacheController( return } + err = cache.UpdateLVG(newLvg) + if err != nil { + log.Error(err, fmt.Sprintf("[RunLVGWatcherCacheController] unable to update the LVMVolumeGroup %s cache", newLvg.Name)) + return + } + log.Debug(fmt.Sprintf("[RunLVGWatcherCacheController] successfully updated the LVMVolumeGroup %s in the cache", newLvg.Name)) + log.Debug(fmt.Sprintf("[RunLVGWatcherCacheController] starts to calculate the size difference for LVMVolumeGroup %s", newLvg.Name)) log.Trace(fmt.Sprintf("[RunLVGWatcherCacheController] old state LVMVolumeGroup %s has size %s", oldLvg.Name, oldLvg.Status.AllocatedSize.String())) log.Trace(fmt.Sprintf("[RunLVGWatcherCacheController] new state LVMVolumeGroup %s has size %s", newLvg.Name, newLvg.Status.AllocatedSize.String())) @@ -114,14 +121,7 @@ func RunLVGWatcherCacheController( log.Debug(fmt.Sprintf("[RunLVGWatcherCacheController] the LVMVolumeGroup %s should not be reconciled", newLvg.Name)) return } - - log.Debug(fmt.Sprintf("[RunLVGWatcherCacheController] the LVMVolumeGroup %s should be reconciled by Update Func. It will be updated in the cache", newLvg.Name)) - err = cache.UpdateLVG(newLvg) - if err != nil { - log.Error(err, fmt.Sprintf("[RunLVGWatcherCacheController] unable to update the LVMVolumeGroup %s cache", newLvg.Name)) - return - } - log.Debug(fmt.Sprintf("[RunLVGWatcherCacheController] successfully updated the LVMVolumeGroup %s in the cache", newLvg.Name)) + log.Debug(fmt.Sprintf("[RunLVGWatcherCacheController] the LVMVolumeGroup %s should be reconciled by Update Func", newLvg.Name)) cachedPVCs, err := cache.GetAllPVCForLVG(newLvg.Name) if err != nil { diff --git a/images/sds-local-volume-scheduler-extender/pkg/scheduler/filter.go b/images/sds-local-volume-scheduler-extender/pkg/scheduler/filter.go index 5508dd96..3291bad4 100644 --- a/images/sds-local-volume-scheduler-extender/pkg/scheduler/filter.go +++ b/images/sds-local-volume-scheduler-extender/pkg/scheduler/filter.go @@ -289,7 +289,7 @@ func filterNodes( usedLVGs := RemoveUnusedLVGs(lvgs, scLVGs) for _, lvg := range usedLVGs { - log.Trace(fmt.Sprintf("[filterNodes] the LVMVolumeGroup %s is actually used. VG size: %s, allocatedSize: %s", lvg.Name, lvg.Status.VGSize, lvg.Status.AllocatedSize)) + log.Trace(fmt.Sprintf("[filterNodes] the LVMVolumeGroup %s is actually used. VG size: %s, allocatedSize: %s", lvg.Name, lvg.Status.VGSize.String(), lvg.Status.AllocatedSize.String())) } lvgsThickFree := getLVGThickFreeSpaces(log, usedLVGs)