Skip to content

Commit

Permalink
final refactoring
Browse files Browse the repository at this point in the history
Signed-off-by: Viktor Kramarenko <viktor.kramarenko@flant.com>
  • Loading branch information
ViktorKram committed Sep 26, 2024
1 parent 88e4f55 commit bcc10bb
Show file tree
Hide file tree
Showing 5 changed files with 214 additions and 129 deletions.
30 changes: 16 additions & 14 deletions images/sds-local-volume-scheduler-extender/src/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,14 @@ const (
)

type Config struct {
ListenAddr string `json:"listen"`
DefaultDivisor float64 `json:"default-divisor"`
LogLevel string `json:"log-level"`
CacheSize int `json:"cache-size"`
HealthProbeBindAddress string `json:"health-probe-bind-address"`
CertFile string `json:"cert-file"`
KeyFile string `json:"key-file"`
ListenAddr string `json:"listen"`
DefaultDivisor float64 `json:"default-divisor"`
LogLevel string `json:"log-level"`
CacheSize int `json:"cache-size"`
HealthProbeBindAddress string `json:"health-probe-bind-address"`
CertFile string `json:"cert-file"`
KeyFile string `json:"key-file"`
PVCExpiredDurationSec time.Duration `json:"pvc-expired-duration-sec"`
}

var cfgFilePath string
Expand All @@ -73,12 +74,13 @@ var resourcesSchemeFuncs = []func(*runtime.Scheme) error{
}

var config = &Config{
ListenAddr: defaultListenAddr,
DefaultDivisor: defaultDivisor,
LogLevel: "2",
CacheSize: defaultCacheSize,
CertFile: defaultcertFile,
KeyFile: defaultkeyFile,
ListenAddr: defaultListenAddr,
DefaultDivisor: defaultDivisor,
LogLevel: "2",
CacheSize: defaultCacheSize,
CertFile: defaultcertFile,
KeyFile: defaultkeyFile,
PVCExpiredDurationSec: cache.DefaultPVCExpiredDurationSec * time.Second,
}

var rootCmd = &cobra.Command{
Expand Down Expand Up @@ -167,7 +169,7 @@ func subMain(ctx context.Context) error {
return err
}

schedulerCache := cache.NewCache(*log)
schedulerCache := cache.NewCache(*log, config.PVCExpiredDurationSec)
log.Info("[subMain] scheduler cache was initialized")

h, err := scheduler.NewHandler(ctx, mgr.GetClient(), *log, schedulerCache, config.DefaultDivisor)
Expand Down
54 changes: 0 additions & 54 deletions images/sds-local-volume-scheduler-extender/src/config/config.go

This file was deleted.

95 changes: 48 additions & 47 deletions images/sds-local-volume-scheduler-extender/src/pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
)

const (
DefaultPVCExpiredDurationSec = 30

pvcPerLVGCount = 150
lvgsPerPVCCount = 5
lvgsPerNodeCount = 5
Expand Down Expand Up @@ -45,23 +47,21 @@ type pvcCache struct {
}

// NewCache initialize new cache.
func NewCache(logger logger.Logger) *Cache {
func NewCache(logger logger.Logger, pvcExpDurSec time.Duration) *Cache {
ch := &Cache{
log: logger,
expiredDuration: 30 * time.Second,
expiredDuration: pvcExpDurSec,
}

go func() {
timer := time.NewTimer(ch.expiredDuration)

for {
select {
case <-timer.C:
ch.clearBoundExpiredPVC()
timer.Reset(ch.expiredDuration)
}
for range timer.C {
ch.clearBoundExpiredPVC()
timer.Reset(ch.expiredDuration)
}
}()

return ch
}

Expand All @@ -80,7 +80,7 @@ func (c *Cache) clearBoundExpiredPVC() {
continue
}

if time.Now().Sub(pvc.CreationTimestamp.Time) > c.expiredDuration {
if time.Since(pvc.CreationTimestamp.Time) > c.expiredDuration {
c.log.Warning(fmt.Sprintf("[clearBoundExpiredPVC] PVC %s is in a Bound state and expired, remove it from the cache", pvc.Name))
c.RemovePVCFromTheCache(pvc)
} else {
Expand Down Expand Up @@ -118,29 +118,30 @@ func (c *Cache) AddLVG(lvg *snc.LVMVolumeGroup) {

// UpdateLVG updated selected LVMVolumeGroup resource in the cache. If such LVMVolumeGroup is not stored, returns an error.
func (c *Cache) UpdateLVG(lvg *snc.LVMVolumeGroup) error {
if lvgCh, found := c.lvgs.Load(lvg.Name); found {
lvgCh.(*lvgCache).lvg = lvg

c.log.Trace(fmt.Sprintf("[UpdateLVG] the LVMVolumeGroup %s nodes: %v", lvg.Name, lvg.Status.Nodes))
for _, node := range lvg.Status.Nodes {
lvgsOnTheNode, _ := c.nodeLVGs.Load(node.Name)
if lvgsOnTheNode == nil {
lvgsOnTheNode = make([]string, 0, lvgsPerNodeCount)
}
lvgCh, found := c.lvgs.Load(lvg.Name)
if !found {
return fmt.Errorf("the LVMVolumeGroup %s was not found in the lvgCh", lvg.Name)
}

if !slices2.Contains(lvgsOnTheNode.([]string), lvg.Name) {
lvgsOnTheNode = append(lvgsOnTheNode.([]string), lvg.Name)
c.log.Debug(fmt.Sprintf("[UpdateLVG] the LVMVolumeGroup %s has been added to the node %s", lvg.Name, node.Name))
c.nodeLVGs.Store(node.Name, lvgsOnTheNode)
} else {
c.log.Debug(fmt.Sprintf("[UpdateLVG] the LVMVolumeGroup %s has been already added to the node %s", lvg.Name, node.Name))
}
lvgCh.(*lvgCache).lvg = lvg

c.log.Trace(fmt.Sprintf("[UpdateLVG] the LVMVolumeGroup %s nodes: %v", lvg.Name, lvg.Status.Nodes))
for _, node := range lvg.Status.Nodes {
lvgsOnTheNode, _ := c.nodeLVGs.Load(node.Name)
if lvgsOnTheNode == nil {
lvgsOnTheNode = make([]string, 0, lvgsPerNodeCount)
}

return nil
if !slices2.Contains(lvgsOnTheNode.([]string), lvg.Name) {
lvgsOnTheNode = append(lvgsOnTheNode.([]string), lvg.Name)
c.log.Debug(fmt.Sprintf("[UpdateLVG] the LVMVolumeGroup %s has been added to the node %s", lvg.Name, node.Name))
c.nodeLVGs.Store(node.Name, lvgsOnTheNode)
} else {
c.log.Debug(fmt.Sprintf("[UpdateLVG] the LVMVolumeGroup %s has been already added to the node %s", lvg.Name, node.Name))
}
}

return fmt.Errorf("the LVMVolumeGroup %s was not found in the lvgCh", lvg.Name)
return nil
}

// TryGetLVG returns selected LVMVolumeGroup resource if it is stored in the cache, otherwise returns nil.
Expand Down Expand Up @@ -228,8 +229,9 @@ func (c *Cache) DeleteLVG(lvgName string) {
c.nodeLVGs.Range(func(_, lvgNames any) bool {
for i, lvg := range lvgNames.([]string) {
if lvg == lvgName {
//nolint:gocritic
//nolint:gocritic,ineffassign
lvgNames = append(lvgNames.([]string)[:i], lvgNames.([]string)[i+1:]...)
return false
}
}

Expand All @@ -239,8 +241,9 @@ func (c *Cache) DeleteLVG(lvgName string) {
c.pvcLVGs.Range(func(_, lvgNames any) bool {
for i, lvg := range lvgNames.([]string) {
if lvg == lvgName {
//nolint:gocritic
//nolint:gocritic,ineffassign
lvgNames = append(lvgNames.([]string)[:i], lvgNames.([]string)[i+1:]...)
return false
}
}

Expand Down Expand Up @@ -687,27 +690,24 @@ func (c *Cache) RemoveSpaceReservationForPVCWithSelectedNode(pvc *v1.PersistentV

// RemovePVCFromTheCache completely removes selected PVC in the cache.
func (c *Cache) RemovePVCFromTheCache(pvc *v1.PersistentVolumeClaim) {
targetPvcKey := configurePVCKey(pvc)

c.log.Debug(fmt.Sprintf("[RemovePVCFromTheCache] run full cache wipe for PVC %s", targetPvcKey))
c.pvcLVGs.Range(func(pvcKey, lvgArray any) bool {
if pvcKey == targetPvcKey {
for _, lvgName := range lvgArray.([]string) {
lvgCh, found := c.lvgs.Load(lvgName)
if found {
lvgCh.(*lvgCache).thickPVCs.Delete(pvcKey.(string))
lvgCh.(*lvgCache).thinPools.Range(func(_, tpCh any) bool {
tpCh.(*thinPoolCache).pvcs.Delete(pvcKey)
return true
})
}
pvcKey := configurePVCKey(pvc)

c.log.Debug(fmt.Sprintf("[RemovePVCFromTheCache] run full cache wipe for PVC %s", pvcKey))
lvgSlice, ok := c.pvcLVGs.Load(pvcKey)
if ok {
for _, lvgName := range lvgSlice.([]string) {
lvgCh, found := c.lvgs.Load(lvgName)
if found {
lvgCh.(*lvgCache).thickPVCs.Delete(pvcKey)
lvgCh.(*lvgCache).thinPools.Range(func(_, tpCh any) bool {
tpCh.(*thinPoolCache).pvcs.Delete(pvcKey)
return true
})
}
}
}

return true
})

c.pvcLVGs.Delete(targetPvcKey)
c.pvcLVGs.Delete(pvcKey)
}

// FindLVGForPVCBySelectedNode finds a suitable LVMVolumeGroup resource's name for selected PVC based on selected node. If no such LVMVolumeGroup found, returns empty string.
Expand All @@ -730,6 +730,7 @@ func (c *Cache) FindLVGForPVCBySelectedNode(pvc *v1.PersistentVolumeClaim, nodeN
for _, lvgName := range lvgsForPVC.([]string) {
if slices2.Contains(lvgsOnTheNode.([]string), lvgName) {
targetLVG = lvgName
break
}
}

Expand Down
Loading

0 comments on commit bcc10bb

Please sign in to comment.