From 77b627cf692de25fb7060592d754e9d1cfe9c85d Mon Sep 17 00:00:00 2001 From: Viktor Kramarenko Date: Thu, 27 Jun 2024 20:15:52 +0300 Subject: [PATCH] [controller] Add thin pool reserved space to the cache Signed-off-by: Viktor Kramarenko --- .../api/v1alpha1/lvm_volume_group.go | 52 ++- .../pkg/cache/cache.go | 419 +++++++++++++++--- .../pkg/cache/cache_test.go | 22 +- .../pkg/controller/lvg_watcher_cache.go | 6 +- .../pkg/controller/pvc_watcher_cache.go | 4 +- .../pkg/scheduler/filter.go | 156 ++++--- .../pkg/scheduler/prioritize.go | 6 +- .../pkg/scheduler/route.go | 6 +- 8 files changed, 510 insertions(+), 161 deletions(-) diff --git a/images/sds-local-volume-scheduler-extender/api/v1alpha1/lvm_volume_group.go b/images/sds-local-volume-scheduler-extender/api/v1alpha1/lvm_volume_group.go index d931267f..70900f72 100644 --- a/images/sds-local-volume-scheduler-extender/api/v1alpha1/lvm_volume_group.go +++ b/images/sds-local-volume-scheduler-extender/api/v1alpha1/lvm_volume_group.go @@ -1,9 +1,12 @@ /* Copyright 2024 Flant JSC + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -33,16 +36,24 @@ type LvmVolumeGroup struct { Status LvmVolumeGroupStatus `json:"status,omitempty"` } -type SpecThinPool struct { - Name string `json:"name"` - Size resource.Quantity `json:"size"` +type LvmVolumeGroupSpec struct { + ActualVGNameOnTheNode string `json:"actualVGNameOnTheNode"` + BlockDeviceNames []string `json:"blockDeviceNames"` + ThinPools []LvmVolumeGroupThinPoolSpec `json:"thinPools"` + Type string `json:"type"` } -type LvmVolumeGroupSpec struct { - ActualVGNameOnTheNode string `json:"actualVGNameOnTheNode"` - BlockDeviceNames []string `json:"blockDeviceNames"` - ThinPools []SpecThinPool `json:"thinPools"` - Type string `json:"type"` +type LvmVolumeGroupStatus struct { + AllocatedSize resource.Quantity `json:"allocatedSize"` + Nodes []LvmVolumeGroupNode `json:"nodes"` + ThinPools []LvmVolumeGroupThinPoolStatus `json:"thinPools"` + VGSize resource.Quantity `json:"vgSize"` + VGUuid string `json:"vgUUID"` + Phase string `json:"phase"` + Conditions []metav1.Condition `json:"conditions"` + ThinPoolReady string `json:"thinPoolReady"` + ConfigurationApplied string `json:"configurationApplied"` + VGFree resource.Quantity `json:"vgFree"` } type LvmVolumeGroupDevice struct { @@ -58,18 +69,19 @@ type LvmVolumeGroupNode struct { Name string `json:"name"` } -type StatusThinPool struct { - Name string `json:"name"` - ActualSize resource.Quantity `json:"actualSize"` - UsedSize resource.Quantity `json:"usedSize"` +type LvmVolumeGroupThinPoolStatus struct { + Name string `json:"name"` + ActualSize resource.Quantity `json:"actualSize"` + UsedSize resource.Quantity `json:"usedSize"` + AllocatedSize resource.Quantity `json:"allocatedSize"` + AvailableSpace resource.Quantity `json:"availableSpace"` + AllocationLimit string `json:"allocationLimit"` + Ready bool `json:"ready"` + Message string `json:"message"` } -type LvmVolumeGroupStatus struct { - AllocatedSize resource.Quantity `json:"allocatedSize"` - Health string `json:"health"` - Message string `json:"message"` - Nodes []LvmVolumeGroupNode `json:"nodes"` - ThinPools []StatusThinPool `json:"thinPools"` - VGSize resource.Quantity `json:"vgSize"` - VGUuid string `json:"vgUUID"` +type LvmVolumeGroupThinPoolSpec struct { + Name string `json:"name"` + Size resource.Quantity `json:"size"` + AllocationLimit string `json:"allocationLimit"` } diff --git a/images/sds-local-volume-scheduler-extender/pkg/cache/cache.go b/images/sds-local-volume-scheduler-extender/pkg/cache/cache.go index 32ffbca6..fe68e405 100644 --- a/images/sds-local-volume-scheduler-extender/pkg/cache/cache.go +++ b/images/sds-local-volume-scheduler-extender/pkg/cache/cache.go @@ -1,6 +1,7 @@ package cache import ( + "errors" "fmt" v1 "k8s.io/api/core/v1" slices2 "k8s.io/utils/strings/slices" @@ -24,7 +25,12 @@ type Cache struct { } type lvgCache struct { - lvg *v1alpha1.LvmVolumeGroup + lvg *v1alpha1.LvmVolumeGroup + thickPVCs sync.Map //map[string]*pvcCache + thinPools sync.Map //map[string]*thinPoolCache +} + +type thinPoolCache struct { pvcs sync.Map //map[string]*pvcCache } @@ -43,8 +49,9 @@ func NewCache(logger logger.Logger) *Cache { // AddLVG adds selected LVMVolumeGroup resource to the cache. If it is already stored, does nothing. func (c *Cache) AddLVG(lvg *v1alpha1.LvmVolumeGroup) { _, loaded := c.lvgs.LoadOrStore(lvg.Name, &lvgCache{ - lvg: lvg, - pvcs: sync.Map{}, + lvg: lvg, + thickPVCs: sync.Map{}, + thinPools: sync.Map{}, }) if loaded { c.log.Debug(fmt.Sprintf("[AddLVG] the LVMVolumeGroup %s has been already added to the cache", lvg.Name)) @@ -129,16 +136,39 @@ func (c *Cache) GetAllLVG() map[string]*v1alpha1.LvmVolumeGroup { return lvgs } -// GetLVGReservedSpace returns a sum of reserved space by every PVC in the selected LVMVolumeGroup resource. If such LVMVolumeGroup resource is not stored, returns an error. -func (c *Cache) GetLVGReservedSpace(lvgName string) (int64, error) { +// GetLVGThickReservedSpace returns a sum of reserved space by every thick PVC in the selected LVMVolumeGroup resource. If such LVMVolumeGroup resource is not stored, returns an error. +func (c *Cache) GetLVGThickReservedSpace(lvgName string) (int64, error) { lvg, found := c.lvgs.Load(lvgName) if !found { - c.log.Debug(fmt.Sprintf("[GetLVGReservedSpace] the LVMVolumeGroup %s was not found in the cache. Returns 0", lvgName)) + c.log.Debug(fmt.Sprintf("[GetLVGThickReservedSpace] the LVMVolumeGroup %s was not found in the cache. Returns 0", lvgName)) + return 0, nil + } + + var space int64 + lvg.(*lvgCache).thickPVCs.Range(func(pvcName, pvcCh any) bool { + space += pvcCh.(*pvcCache).pvc.Spec.Resources.Requests.Storage().Value() + return true + }) + + return space, nil +} + +// GetLVGThinReservedSpace returns a sum of reserved space by every thin PVC in the selected LVMVolumeGroup resource. If such LVMVolumeGroup resource is not stored, returns an error. +func (c *Cache) GetLVGThinReservedSpace(lvgName string, thinPoolName string) (int64, error) { + lvgCh, found := c.lvgs.Load(lvgName) + if !found { + c.log.Debug(fmt.Sprintf("[GetLVGThinReservedSpace] the LVMVolumeGroup %s was not found in the cache. Returns 0", lvgName)) + return 0, nil + } + + thinPool, found := lvgCh.(*lvgCache).thinPools.Load(thinPoolName) + if !found { + c.log.Debug(fmt.Sprintf("[GetLVGThinReservedSpace] the Thin pool %s of the LVMVolumeGroup %s was not found in the cache. Returns 0", lvgName, thinPoolName)) return 0, nil } var space int64 - lvg.(*lvgCache).pvcs.Range(func(pvcName, pvcCh any) bool { + thinPool.(*thinPoolCache).pvcs.Range(func(pvcName, pvcCh any) bool { space += pvcCh.(*pvcCache).pvc.Spec.Resources.Requests.Storage().Value() return true }) @@ -171,11 +201,11 @@ func (c *Cache) DeleteLVG(lvgName string) { }) } -// AddPVC adds selected PVC to selected LVMVolumeGroup resource. If the LVMVolumeGroup resource is not stored, returns an error. +// AddThickPVC adds selected PVC to selected LVMVolumeGroup resource. If the LVMVolumeGroup resource is not stored, returns an error. // If selected PVC is already stored in the cache, does nothing. -func (c *Cache) AddPVC(lvgName string, pvc *v1.PersistentVolumeClaim) error { +func (c *Cache) AddThickPVC(lvgName string, pvc *v1.PersistentVolumeClaim) error { if pvc.Status.Phase == v1.ClaimBound { - c.log.Warning(fmt.Sprintf("[AddPVC] PVC %s/%s has status phase BOUND. It will not be added to the cache", pvc.Namespace, pvc.Name)) + c.log.Warning(fmt.Sprintf("[AddThickPVC] PVC %s/%s has status phase BOUND. It will not be added to the cache", pvc.Namespace, pvc.Name)) return nil } @@ -184,59 +214,170 @@ func (c *Cache) AddPVC(lvgName string, pvc *v1.PersistentVolumeClaim) error { lvgCh, found := c.lvgs.Load(lvgName) if !found { err := fmt.Errorf("the LVMVolumeGroup %s was not found in the cache", lvgName) - c.log.Error(err, fmt.Sprintf("[AddPVC] an error occured while trying to add PVC %s to the cache", pvcKey)) + c.log.Error(err, fmt.Sprintf("[AddThickPVC] an error occured while trying to add PVC %s to the cache", pvcKey)) + return err + } + + // this case might be triggered if the extender recovers after fail and finds some pending thickPVCs with selected nodes + c.log.Trace(fmt.Sprintf("[AddThickPVC] PVC %s/%s annotations: %v", pvc.Namespace, pvc.Name, pvc.Annotations)) + + shouldAdd, err := c.shouldAddPVC(pvc, lvgCh.(*lvgCache), pvcKey, lvgName, "") + if err != nil { return err } - // this case might be triggered if the extender recovers after fail and finds some pending pvcs with selected nodes - c.log.Trace(fmt.Sprintf("[AddPVC] PVC %s/%s annotations: %v", pvc.Namespace, pvc.Name, pvc.Annotations)) + if !shouldAdd { + c.log.Debug(fmt.Sprintf("[AddThickPVC] PVC %s should not be added", pvcKey)) + return nil + } + //if pvc.Annotations[SelectedNodeAnnotation] != "" { + // c.log.Debug(fmt.Sprintf("[AddThickPVC] PVC %s/%s has selected node anotation, selected node: %s", pvc.Namespace, pvc.Name, pvc.Annotations[SelectedNodeAnnotation])) + // + // lvgsOnTheNode, found := c.nodeLVGs.Load(pvc.Annotations[SelectedNodeAnnotation]) + // if !found { + // err := fmt.Errorf("no LVMVolumeGroups found for the node %s", pvc.Annotations[SelectedNodeAnnotation]) + // c.log.Error(err, fmt.Sprintf("[AddThickPVC] an error occured while trying to add PVC %s to the cache", pvcKey)) + // return err + // } + // + // if !slices2.Contains(lvgsOnTheNode.([]string), lvgName) { + // c.log.Debug(fmt.Sprintf("[AddThickPVC] LVMVolumeGroup %s does not belong to PVC %s/%s selected node %s. It will be skipped", lvgName, pvc.Namespace, pvc.Name, pvc.Annotations[SelectedNodeAnnotation])) + // return nil + // } + // + // c.log.Debug(fmt.Sprintf("[AddThickPVC] LVMVolumeGroup %s belongs to PVC %s/%s selected node %s", lvgName, pvc.Namespace, pvc.Name, pvc.Annotations[SelectedNodeAnnotation])) + // + // _, found = lvgCh.(*lvgCache).thickPVCs.Load(pvcKey) + // if found { + // c.log.Warning(fmt.Sprintf("[AddThickPVC] PVC %s cache has been already added to the LVMVolumeGroup %s", pvcKey, lvgName)) + // return nil + // } + //} + + c.log.Debug(fmt.Sprintf("[AddThickPVC] new PVC %s cache will be added to the LVMVolumeGroup %s", pvcKey, lvgName)) + c.addNewThickPVC(lvgCh.(*lvgCache), pvc) + + return nil +} + +func (c *Cache) shouldAddPVC(pvc *v1.PersistentVolumeClaim, lvgCh *lvgCache, pvcKey, lvgName, thinPoolName string) (bool, error) { if pvc.Annotations[SelectedNodeAnnotation] != "" { - c.log.Debug(fmt.Sprintf("[AddPVC] PVC %s/%s has selected node anotation, selected node: %s", pvc.Namespace, pvc.Name, pvc.Annotations[SelectedNodeAnnotation])) + c.log.Debug(fmt.Sprintf("[shouldAddPVC] PVC %s/%s has selected node anotation, selected node: %s", pvc.Namespace, pvc.Name, pvc.Annotations[SelectedNodeAnnotation])) lvgsOnTheNode, found := c.nodeLVGs.Load(pvc.Annotations[SelectedNodeAnnotation]) if !found { err := fmt.Errorf("no LVMVolumeGroups found for the node %s", pvc.Annotations[SelectedNodeAnnotation]) - c.log.Error(err, fmt.Sprintf("[AddPVC] an error occured while trying to add PVC %s to the cache", pvcKey)) - return err + c.log.Error(err, fmt.Sprintf("[shouldAddPVC] an error occured while trying to add PVC %s to the cache", pvcKey)) + return false, err } if !slices2.Contains(lvgsOnTheNode.([]string), lvgName) { - c.log.Debug(fmt.Sprintf("[AddPVC] LVMVolumeGroup %s does not belong to PVC %s/%s selected node %s. It will be skipped", lvgName, pvc.Namespace, pvc.Name, pvc.Annotations[SelectedNodeAnnotation])) - return nil + c.log.Debug(fmt.Sprintf("[shouldAddPVC] LVMVolumeGroup %s does not belong to PVC %s/%s selected node %s. It will be skipped", lvgName, pvc.Namespace, pvc.Name, pvc.Annotations[SelectedNodeAnnotation])) + return false, nil } - c.log.Debug(fmt.Sprintf("[AddPVC] LVMVolumeGroup %s belongs to PVC %s/%s selected node %s", lvgName, pvc.Namespace, pvc.Name, pvc.Annotations[SelectedNodeAnnotation])) + c.log.Debug(fmt.Sprintf("[shouldAddPVC] LVMVolumeGroup %s belongs to PVC %s/%s selected node %s", lvgName, pvc.Namespace, pvc.Name, pvc.Annotations[SelectedNodeAnnotation])) - _, found = lvgCh.(*lvgCache).pvcs.Load(pvcKey) + _, found = lvgCh.thickPVCs.Load(pvcKey) if found { - c.log.Warning(fmt.Sprintf("[AddPVC] PVC %s cache has been already added to the LVMVolumeGroup %s", pvcKey, lvgName)) - return nil + c.log.Warning(fmt.Sprintf("[shouldAddPVC] PVC %s cache has been already added as thick to the LVMVolumeGroup %s", pvcKey, lvgName)) + return false, nil } + + if thinPoolName != "" { + thinPoolCh, found := lvgCh.thinPools.Load(thinPoolName) + if !found { + c.log.Debug(fmt.Sprintf("[shouldAddPVC] Thin pool %s was not found in the cache, PVC %s should be added", thinPoolName, pvcKey)) + return true, nil + } + + if _, found = thinPoolCh.(*thinPoolCache).pvcs.Load(pvcKey); found { + c.log.Debug(fmt.Sprintf("[shouldAddPVC] PVC %s was found in the Thin pool %s cache. No need to add", pvcKey, thinPoolName)) + return false, nil + } + } + } + + return true, nil +} + +func (c *Cache) AddThinPVC(lvgName, thinPoolName string, pvc *v1.PersistentVolumeClaim) error { + if pvc.Status.Phase == v1.ClaimBound { + c.log.Warning(fmt.Sprintf("[AddThinPVC] PVC %s/%s has status phase BOUND. It will not be added to the cache", pvc.Namespace, pvc.Name)) + return nil + } + + pvcKey := configurePVCKey(pvc) + + lvgCh, found := c.lvgs.Load(lvgName) + if !found { + err := fmt.Errorf("the LVMVolumeGroup %s was not found in the cache", lvgName) + c.log.Error(err, fmt.Sprintf("[AddThinPVC] an error occured while trying to add PVC %s to the cache", pvcKey)) + return err + } + + // this case might be triggered if the extender recovers after fail and finds some pending thickPVCs with selected nodes + c.log.Trace(fmt.Sprintf("[AddThinPVC] PVC %s/%s annotations: %v", pvc.Namespace, pvc.Name, pvc.Annotations)) + shouldAdd, err := c.shouldAddPVC(pvc, lvgCh.(*lvgCache), pvcKey, lvgName, thinPoolName) + if err != nil { + return err + } + + if !shouldAdd { + c.log.Debug(fmt.Sprintf("[AddThinPVC] PVC %s should not be added", pvcKey)) + return nil } - c.log.Debug(fmt.Sprintf("[AddPVC] new PVC %s cache will be added to the LVMVolumeGroup %s", pvcKey, lvgName)) - c.addNewPVC(lvgCh.(*lvgCache), pvc) + c.log.Debug(fmt.Sprintf("[AddThinPVC] new PVC %s cache will be added to the LVMVolumeGroup %s", pvcKey, lvgName)) + c.addNewThinPVC(lvgCh.(*lvgCache), pvc, thinPoolName) return nil } -func (c *Cache) addNewPVC(lvgCh *lvgCache, pvc *v1.PersistentVolumeClaim) { +func (c *Cache) addNewThickPVC(lvgCh *lvgCache, pvc *v1.PersistentVolumeClaim) { pvcKey := configurePVCKey(pvc) - lvgCh.pvcs.Store(pvcKey, &pvcCache{pvc: pvc, selectedNode: pvc.Annotations[SelectedNodeAnnotation]}) + lvgCh.thinPools.Store(pvcKey, &pvcCache{pvc: pvc, selectedNode: pvc.Annotations[SelectedNodeAnnotation]}) + + c.addLVGToPVC(lvgCh.lvg.Name, pvcKey) +} + +func (c *Cache) addNewThinPVC(lvgCh *lvgCache, pvc *v1.PersistentVolumeClaim, thinPoolName string) error { + pvcKey := configurePVCKey(pvc) + + if thinPoolName == "" { + err := errors.New("no thin pool specified") + c.log.Error(err, fmt.Sprintf("[addNewThinPVC] unable to add Thin PVC %s to the cache", pvcKey)) + return err + } + + thinPoolCh, found := lvgCh.thinPools.Load(thinPoolName) + if !found { + err := fmt.Errorf("thin pool %s not found", thinPoolName) + c.log.Error(err, fmt.Sprintf("[addNewThinPVC] unable to add Thin PVC %s to the cache", pvcKey)) + return err + } + + thinPoolCh.(*thinPoolCache).pvcs.Store(pvcKey, pvc) + c.log.Debug(fmt.Sprintf("[addNewThinPVC] THIN PVC %s was added to the cache to Thin Pool %s", pvcKey, thinPoolName)) + + c.addLVGToPVC(lvgCh.lvg.Name, pvcKey) + return nil +} +func (c *Cache) addLVGToPVC(lvgName, pvcKey string) { lvgsForPVC, found := c.pvcLVGs.Load(pvcKey) if !found || lvgsForPVC == nil { lvgsForPVC = make([]string, 0, lvgsPerPVCCount) } - c.log.Trace(fmt.Sprintf("[addNewPVC] LVMVolumeGroups from the cache for PVC %s before append: %v", pvcKey, lvgsForPVC)) - lvgsForPVC = append(lvgsForPVC.([]string), lvgCh.lvg.Name) - c.log.Trace(fmt.Sprintf("[addNewPVC] LVMVolumeGroups from the cache for PVC %s after append: %v", pvcKey, lvgsForPVC)) + c.log.Trace(fmt.Sprintf("[addLVGToPVC] LVMVolumeGroups from the cache for PVC %s before append: %v", pvcKey, lvgsForPVC)) + lvgsForPVC = append(lvgsForPVC.([]string), lvgName) + c.log.Trace(fmt.Sprintf("[addLVGToPVC] LVMVolumeGroups from the cache for PVC %s after append: %v", pvcKey, lvgsForPVC)) c.pvcLVGs.Store(pvcKey, lvgsForPVC) } -// UpdatePVC updates selected PVC in selected LVMVolumeGroup resource. If no such PVC is stored in the cache, adds it. -func (c *Cache) UpdatePVC(lvgName string, pvc *v1.PersistentVolumeClaim) error { +// UpdateThickPVC updates selected PVC in selected LVMVolumeGroup resource. If no such PVC is stored in the cache, adds it. +func (c *Cache) UpdateThickPVC(lvgName string, pvc *v1.PersistentVolumeClaim) error { pvcKey := configurePVCKey(pvc) lvgCh, found := c.lvgs.Load(lvgName) @@ -244,12 +385,12 @@ func (c *Cache) UpdatePVC(lvgName string, pvc *v1.PersistentVolumeClaim) error { return fmt.Errorf("the LVMVolumeGroup %s was not found in the cache", lvgName) } - pvcCh, found := lvgCh.(*lvgCache).pvcs.Load(pvcKey) + pvcCh, found := lvgCh.(*lvgCache).thickPVCs.Load(pvcKey) if !found { - c.log.Warning(fmt.Sprintf("[UpdatePVC] PVC %s was not found in the cache for the LVMVolumeGroup %s. It will be added", pvcKey, lvgName)) - err := c.AddPVC(lvgName, pvc) + c.log.Warning(fmt.Sprintf("[UpdateThickPVC] PVC %s was not found in the cache for the LVMVolumeGroup %s. It will be added", pvcKey, lvgName)) + err := c.AddThickPVC(lvgName, pvc) if err != nil { - c.log.Error(err, fmt.Sprintf("[UpdatePVC] an error occurred while trying to update the PVC %s", pvcKey)) + c.log.Error(err, fmt.Sprintf("[UpdateThickPVC] an error occurred while trying to update the PVC %s", pvcKey)) return err } return nil @@ -257,22 +398,102 @@ func (c *Cache) UpdatePVC(lvgName string, pvc *v1.PersistentVolumeClaim) error { pvcCh.(*pvcCache).pvc = pvc pvcCh.(*pvcCache).selectedNode = pvc.Annotations[SelectedNodeAnnotation] - c.log.Debug(fmt.Sprintf("[UpdatePVC] successfully updated PVC %s with selected node %s in the cache for LVMVolumeGroup %s", pvcKey, pvc.Annotations[SelectedNodeAnnotation], lvgName)) + c.log.Debug(fmt.Sprintf("[UpdateThickPVC] successfully updated PVC %s with selected node %s in the cache for LVMVolumeGroup %s", pvcKey, pvc.Annotations[SelectedNodeAnnotation], lvgName)) return nil } -// GetAllPVCForLVG returns slice of PVC belonging to selected LVMVolumeGroup resource. If such LVMVolumeGroup is not stored in the cache, returns an error. -func (c *Cache) GetAllPVCForLVG(lvgName string) ([]*v1.PersistentVolumeClaim, error) { +func (c *Cache) UpdateThinPVC(lvgName string, pvc *v1.PersistentVolumeClaim, thinPoolName string) error { + pvcKey := configurePVCKey(pvc) + + lvgCh, found := c.lvgs.Load(lvgName) + if !found { + return fmt.Errorf("the LVMVolumeGroup %s was not found in the cache", lvgName) + } + + thinPoolCh, found := lvgCh.(*lvgCache).thinPools.Load(thinPoolName) + if !found { + c.log.Debug(fmt.Sprintf("[UpdateThinPVC] Thin Pool %s was not found in the LVMVolumeGroup %s, add it.", thinPoolName, lvgName)) + err := c.addNewThinPool(lvgCh.(*lvgCache), pvc, thinPoolName) + if err != nil { + return err + } + thinPoolCh, _ = lvgCh.(*lvgCache).thinPools.Load(thinPoolName) + } + + pvcCh, found := thinPoolCh.(*thinPoolCache).pvcs.Load(pvcKey) + if !found { + c.log.Warning(fmt.Sprintf("[UpdateThinPVC] Thin PVC %s was not found in Thin pool %s in the cache for the LVMVolumeGroup %s. It will be added", pvcKey, thinPoolName, lvgName)) + err := c.addNewThinPVC(lvgCh.(*lvgCache), pvc, thinPoolName) + if err != nil { + c.log.Error(err, fmt.Sprintf("[UpdateThinPVC] an error occurred while trying to update the PVC %s", pvcKey)) + return err + } + return nil + } + + pvcCh.(*pvcCache).pvc = pvc + pvcCh.(*pvcCache).selectedNode = pvc.Annotations[SelectedNodeAnnotation] + c.log.Debug(fmt.Sprintf("[UpdateThinPVC] successfully updated THIN PVC %s with selected node %s in the cache for LVMVolumeGroup %s", pvcKey, pvc.Annotations[SelectedNodeAnnotation], lvgName)) + + return nil +} + +func (c *Cache) addNewThinPool(lvgCh *lvgCache, pvc *v1.PersistentVolumeClaim, thinPoolName string) error { + pvcKey := configurePVCKey(pvc) + + if len(thinPoolName) == 0 { + err := errors.New("no thin pool name specified") + c.log.Error(err, fmt.Sprintf("[addNewThinPool] unable to add thin pool for PVC %s in the LVMVolumeGroup %s", pvc.Name, lvgCh.lvg.Name)) + return err + } + + _, found := lvgCh.thinPools.Load(thinPoolName) + if found { + err := fmt.Errorf("thin pool %s is already created", thinPoolName) + c.log.Error(err, fmt.Sprintf("[addNewThinPool] unable to add new Thin pool %s to the LVMVolumeGroup %s for PVC %s", thinPoolName, lvgCh.lvg.Name, pvcKey)) + return err + } + + lvgCh.thinPools.Store(thinPoolName, &thinPoolCache{}) + return nil +} + +// GetAllThickPVCForLVG returns slice of PVC belonging to selected LVMVolumeGroup resource. If such LVMVolumeGroup is not stored in the cache, returns an error. +func (c *Cache) GetAllThickPVCForLVG(lvgName string) ([]*v1.PersistentVolumeClaim, error) { lvgCh, found := c.lvgs.Load(lvgName) if !found { err := fmt.Errorf("cache was not found for the LVMVolumeGroup %s", lvgName) - c.log.Error(err, fmt.Sprintf("[GetAllPVCForLVG] an error occured while trying to get all PVC for the LVMVolumeGroup %s", lvgName)) + c.log.Error(err, fmt.Sprintf("[GetAllThickPVCForLVG] an error occured while trying to get all PVC for the LVMVolumeGroup %s", lvgName)) return nil, err } result := make([]*v1.PersistentVolumeClaim, 0, pvcPerLVGCount) - lvgCh.(*lvgCache).pvcs.Range(func(pvcName, pvcCh any) bool { + lvgCh.(*lvgCache).thickPVCs.Range(func(pvcName, pvcCh any) bool { + result = append(result, pvcCh.(*pvcCache).pvc) + return true + }) + + return result, nil +} + +// GetAllThinPVCForLVGThinPool returns slice of PVC belonging to selected LVMVolumeGroup resource. If such LVMVolumeGroup is not stored in the cache, returns an error. +func (c *Cache) GetAllThinPVCForLVGThinPool(lvgName, thinPoolName string) ([]*v1.PersistentVolumeClaim, error) { + lvgCh, found := c.lvgs.Load(lvgName) + if !found { + err := fmt.Errorf("cache was not found for the LVMVolumeGroup %s", lvgName) + c.log.Error(err, fmt.Sprintf("[GetAllThinPVCForLVGThinPool] an error occured while trying to get all PVC for the LVMVolumeGroup %s", lvgName)) + return nil, err + } + + thinPoolCh, found := lvgCh.(*lvgCache).thinPools.Load(thinPoolName) + if !found || thinPoolCh == nil { + c.log.Debug(fmt.Sprintf("[GetAllThinPVCForLVGThinPool] no Thin pool %s in the LVMVolumeGroup %s was found. Returns nil slice", thinPoolName, lvgName)) + return nil, nil + } + + result := make([]*v1.PersistentVolumeClaim, 0, pvcPerLVGCount) + thinPoolCh.(*thinPoolCache).pvcs.Range(func(pvcName, pvcCh any) bool { result = append(result, pvcCh.(*pvcCache).pvc) return true }) @@ -292,9 +513,9 @@ func (c *Cache) GetLVGNamesForPVC(pvc *v1.PersistentVolumeClaim) []string { return lvgNames.([]string) } -// RemoveBoundedPVCSpaceReservation removes selected bounded PVC space reservation from a target LVMVolumeGroup resource. If no such LVMVolumeGroup found or PVC +// RemoveBoundedThickPVCSpaceReservation removes selected bounded PVC space reservation from a target LVMVolumeGroup resource. If no such LVMVolumeGroup found or PVC // is not in a Status Bound, returns an error. -func (c *Cache) RemoveBoundedPVCSpaceReservation(lvgName string, pvc *v1.PersistentVolumeClaim) error { +func (c *Cache) RemoveBoundedThickPVCSpaceReservation(lvgName string, pvc *v1.PersistentVolumeClaim) error { if pvc.Status.Phase != v1.ClaimBound { return fmt.Errorf("PVC %s/%s not in a Status.Phase Bound", pvc.Namespace, pvc.Name) } @@ -303,18 +524,52 @@ func (c *Cache) RemoveBoundedPVCSpaceReservation(lvgName string, pvc *v1.Persist lvgCh, found := c.lvgs.Load(lvgName) if !found { err := fmt.Errorf("LVMVolumeGroup %s was not found in the cache", lvgName) - c.log.Error(err, fmt.Sprintf("[RemoveBoundedPVCSpaceReservation] an error occured while trying to remove space reservation for PVC %s in the LVMVolumeGroup %s", pvcKey, lvgName)) + c.log.Error(err, fmt.Sprintf("[RemoveBoundedThickPVCSpaceReservation] an error occured while trying to remove space reservation for PVC %s in the LVMVolumeGroup %s", pvcKey, lvgName)) return err } - pvcCh, found := lvgCh.(*lvgCache).pvcs.Load(pvcKey) + pvcCh, found := lvgCh.(*lvgCache).thickPVCs.Load(pvcKey) if !found || pvcCh == nil { err := fmt.Errorf("cache for PVC %s was not found", pvcKey) - c.log.Error(err, fmt.Sprintf("[RemoveBoundedPVCSpaceReservation] an error occured while trying to remove space reservation for PVC %s in the LVMVolumeGroup %s", pvcKey, lvgName)) + c.log.Error(err, fmt.Sprintf("[RemoveBoundedThickPVCSpaceReservation] an error occured while trying to remove space reservation for PVC %s in the LVMVolumeGroup %s", pvcKey, lvgName)) return err } - lvgCh.(*lvgCache).pvcs.Delete(pvcKey) + lvgCh.(*lvgCache).thickPVCs.Delete(pvcKey) + c.pvcLVGs.Delete(pvcKey) + + return nil +} + +// RemoveBoundedThinPVCSpaceReservation removes selected bounded PVC space reservation from a target LVMVolumeGroup resource. If no such LVMVolumeGroup found or PVC +// is not in a Status Bound, returns an error. +func (c *Cache) RemoveBoundedThinPVCSpaceReservation(lvgName, thinPoolName string, pvc *v1.PersistentVolumeClaim) error { + if pvc.Status.Phase != v1.ClaimBound { + return fmt.Errorf("PVC %s/%s not in a Status.Phase Bound", pvc.Namespace, pvc.Name) + } + + pvcKey := configurePVCKey(pvc) + lvgCh, found := c.lvgs.Load(lvgName) + if !found { + err := fmt.Errorf("LVMVolumeGroup %s was not found in the cache", lvgName) + c.log.Error(err, fmt.Sprintf("[RemoveBoundedThinPVCSpaceReservation] an error occured while trying to remove space reservation for PVC %s in the LVMVolumeGroup %s", pvcKey, lvgName)) + return err + } + + thinPoolCh, found := lvgCh.(*lvgCache).thinPools.Load(thinPoolName) + if !found { + c.log.Warning(fmt.Sprintf("[RemoveBoundedThinPVCSpaceReservation] no Thin Pool %s was found in the cache for the LVMVolumeGroup %s", thinPoolName, lvgName)) + return nil + } + + pvcCh, found := thinPoolCh.(*thinPoolCache).pvcs.Load(pvcKey) + if !found || pvcCh == nil { + err := fmt.Errorf("cache for PVC %s was not found", pvcKey) + c.log.Error(err, fmt.Sprintf("[RemoveBoundedThinPVCSpaceReservation] an error occured while trying to remove space reservation for PVC %s in the LVMVolumeGroup %s", pvcKey, lvgName)) + return err + } + + thinPoolCh.(*thinPoolCache).pvcs.Delete(pvcKey) c.pvcLVGs.Delete(pvcKey) return nil @@ -330,14 +585,14 @@ func (c *Cache) CheckIsPVCStored(pvc *v1.PersistentVolumeClaim) bool { return false } -// RemoveSpaceReservationForPVCWithSelectedNode removes space reservation for selected PVC for every LVMVolumeGroup resource, which is not bound to the PVC selected node. -func (c *Cache) RemoveSpaceReservationForPVCWithSelectedNode(pvc *v1.PersistentVolumeClaim) error { +// RemoveSpaceReservationForThickPVCWithSelectedNode removes space reservation for selected PVC for every LVMVolumeGroup resource, which is not bound to the PVC selected node. +func (c *Cache) RemoveSpaceReservationForThickPVCWithSelectedNode(pvc *v1.PersistentVolumeClaim, thin bool) error { pvcKey := configurePVCKey(pvc) selectedLVGName := "" lvgNamesForPVC, found := c.pvcLVGs.Load(pvcKey) if !found { - c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] cache for PVC %s has been already removed", pvcKey)) + c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForThickPVCWithSelectedNode] cache for PVC %s has been already removed", pvcKey)) return nil } @@ -345,38 +600,60 @@ func (c *Cache) RemoveSpaceReservationForPVCWithSelectedNode(pvc *v1.PersistentV lvgCh, found := c.lvgs.Load(lvgName) if !found || lvgCh == nil { err := fmt.Errorf("no cache found for the LVMVolumeGroup %s", lvgName) - c.log.Error(err, fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] an error occured while trying to remove space reservation for PVC %s", pvcKey)) + c.log.Error(err, fmt.Sprintf("[RemoveSpaceReservationForThickPVCWithSelectedNode] an error occured while trying to remove space reservation for PVC %s", pvcKey)) return err } - pvcCh, found := lvgCh.(*lvgCache).pvcs.Load(pvcKey) - if !found { - c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] PVC %s space reservation in the LVMVolumeGroup %s has been already removed", pvcKey, lvgName)) - continue - } + switch thin { + case true: + lvgCh.(*lvgCache).thinPools.Range(func(thinPoolName, thinPoolCh any) bool { + pvcCh, found := thinPoolCh.(*thinPoolCache).pvcs.Load(pvcKey) + if !found { + c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForThickPVCWithSelectedNode] PVC %s space reservation in the LVMVolumeGroup %s has been already removed", pvcKey, lvgName)) + return true + } - selectedNode := pvcCh.(*pvcCache).selectedNode - if selectedNode == "" { - lvgCh.(*lvgCache).pvcs.Delete(pvcKey) - c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] removed space reservation for PVC %s in the LVMVolumeGroup %s due the PVC got selected to the node %s", pvcKey, lvgName, pvc.Annotations[SelectedNodeAnnotation])) - } else { - selectedLVGName = lvgName - c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] PVC %s got selected to the node %s. It should not be revomed from the LVMVolumeGroup %s", pvcKey, pvc.Annotations[SelectedNodeAnnotation], lvgName)) + selectedNode := pvcCh.(*pvcCache).selectedNode + if selectedNode == "" { + thinPoolCh.(*thinPoolCache).pvcs.Delete(pvcKey) + c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForThickPVCWithSelectedNode] removed space reservation for PVC %s in the Thin pool %s of the LVMVolumeGroup %s due the PVC got selected to the node %s", pvcKey, thinPoolName.(string), lvgName, pvc.Annotations[SelectedNodeAnnotation])) + } else { + selectedLVGName = lvgName + c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForThickPVCWithSelectedNode] PVC %s got selected to the node %s. It should not be revomed from the LVMVolumeGroup %s", pvcKey, pvc.Annotations[SelectedNodeAnnotation], lvgName)) + } + + return true + }) + case false: + pvcCh, found := lvgCh.(*lvgCache).thickPVCs.Load(pvcKey) + if !found { + c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForThickPVCWithSelectedNode] PVC %s space reservation in the LVMVolumeGroup %s has been already removed", pvcKey, lvgName)) + continue + } + + selectedNode := pvcCh.(*pvcCache).selectedNode + if selectedNode == "" { + lvgCh.(*lvgCache).thickPVCs.Delete(pvcKey) + c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForThickPVCWithSelectedNode] removed space reservation for PVC %s in the LVMVolumeGroup %s due the PVC got selected to the node %s", pvcKey, lvgName, pvc.Annotations[SelectedNodeAnnotation])) + } else { + selectedLVGName = lvgName + c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForThickPVCWithSelectedNode] PVC %s got selected to the node %s. It should not be revomed from the LVMVolumeGroup %s", pvcKey, pvc.Annotations[SelectedNodeAnnotation], lvgName)) + } } } - c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] PVC %s space reservation has been removed from LVMVolumeGroup cache", pvcKey)) + c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForThickPVCWithSelectedNode] PVC %s space reservation has been removed from LVMVolumeGroup cache", pvcKey)) - c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] cache for PVC %s will be wiped from unused LVMVolumeGroups", pvcKey)) + c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForThickPVCWithSelectedNode] cache for PVC %s will be wiped from unused LVMVolumeGroups", pvcKey)) cleared := make([]string, 0, len(lvgNamesForPVC.([]string))) for _, lvgName := range lvgNamesForPVC.([]string) { if lvgName == selectedLVGName { - c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] the LVMVolumeGroup %s will be saved for PVC %s cache as used", lvgName, pvcKey)) + c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForThickPVCWithSelectedNode] the LVMVolumeGroup %s will be saved for PVC %s cache as used", lvgName, pvcKey)) cleared = append(cleared, lvgName) } else { - c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] the LVMVolumeGroup %s will be removed from PVC %s cache as not used", lvgName, pvcKey)) + c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForThickPVCWithSelectedNode] the LVMVolumeGroup %s will be removed from PVC %s cache as not used", lvgName, pvcKey)) } } - c.log.Trace(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] cleared LVMVolumeGroups for PVC %s: %v", pvcKey, cleared)) + c.log.Trace(fmt.Sprintf("[RemoveSpaceReservationForThickPVCWithSelectedNode] cleared LVMVolumeGroups for PVC %s: %v", pvcKey, cleared)) c.pvcLVGs.Store(pvcKey, cleared) return nil @@ -392,7 +669,11 @@ func (c *Cache) RemovePVCFromTheCache(pvc *v1.PersistentVolumeClaim) { for _, lvgName := range lvgArray.([]string) { lvgCh, found := c.lvgs.Load(lvgName) if found { - lvgCh.(*lvgCache).pvcs.Delete(pvcKey.(string)) + lvgCh.(*lvgCache).thickPVCs.Delete(pvcKey.(string)) + lvgCh.(*lvgCache).thinPools.Range(func(tpName, tpCh any) bool { + tpCh.(*thinPoolCache).pvcs.Delete(pvcKey) + return true + }) } } } @@ -440,7 +721,7 @@ func (c *Cache) PrintTheCacheLog() { c.lvgs.Range(func(lvgName, lvgCh any) bool { c.log.Cache(fmt.Sprintf("[%s]", lvgName)) - lvgCh.(*lvgCache).pvcs.Range(func(pvcName, pvcCh any) bool { + lvgCh.(*lvgCache).thickPVCs.Range(func(pvcName, pvcCh any) bool { c.log.Cache(fmt.Sprintf(" PVC %s, selected node: %s", pvcName, pvcCh.(*pvcCache).selectedNode)) return true }) diff --git a/images/sds-local-volume-scheduler-extender/pkg/cache/cache_test.go b/images/sds-local-volume-scheduler-extender/pkg/cache/cache_test.go index bc367531..0c819571 100644 --- a/images/sds-local-volume-scheduler-extender/pkg/cache/cache_test.go +++ b/images/sds-local-volume-scheduler-extender/pkg/cache/cache_test.go @@ -80,7 +80,7 @@ func BenchmarkCache_GetLVGReservedSpace(b *testing.B) { } for _, pvc := range pvcs { - err := cache.AddPVC(lvg.Name, &pvc) + err := cache.AddThickPVC(lvg.Name, &pvc) if err != nil { b.Error(err) } @@ -88,7 +88,7 @@ func BenchmarkCache_GetLVGReservedSpace(b *testing.B) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { - _, err := cache.GetLVGReservedSpace(lvg.Name) + _, err := cache.GetLVGThickReservedSpace(lvg.Name) if err != nil { b.Error(err) } @@ -145,15 +145,15 @@ func BenchmarkCache_AddPVC(b *testing.B) { }, } - err := cache.AddPVC(lvg1.Name, pvc) + err := cache.AddThickPVC(lvg1.Name, pvc) if err != nil { b.Error(err) } - err = cache.AddPVC(lvg2.Name, pvc) + err = cache.AddThickPVC(lvg2.Name, pvc) if err != nil { b.Error(err) } - err = cache.AddPVC(lvg3.Name, pvc) + err = cache.AddThickPVC(lvg3.Name, pvc) if err != nil { b.Error(err) } @@ -400,11 +400,11 @@ func BenchmarkCache_UpdatePVC(b *testing.B) { }, }, } - err := cache.UpdatePVC(lvg.Name, pvc) + err := cache.UpdateThickPVC(lvg.Name, pvc) if err != nil { b.Error(err) } - err = cache.UpdatePVC(lvg.Name, updatedPVC) + err = cache.UpdateThickPVC(lvg.Name, updatedPVC) if err != nil { b.Error(err) } @@ -490,7 +490,7 @@ func BenchmarkCache_FullLoad(b *testing.B) { } for _, pvc := range pvcs { - err := cache.AddPVC(lvg.Name, pvc) + err := cache.AddThickPVC(lvg.Name, pvc) if err != nil { b.Error(err) } @@ -564,7 +564,7 @@ func BenchmarkCache_FullLoad(b *testing.B) { for _, pvc := range pvcs { for err != nil { - err = cache.UpdatePVC(lvg.Name, pvc) + err = cache.UpdateThickPVC(lvg.Name, pvc) } cache.GetLVGNamesForPVC(pvc) @@ -573,11 +573,11 @@ func BenchmarkCache_FullLoad(b *testing.B) { lvgMp := cache.GetAllLVG() for lvgName := range lvgMp { - _, err := cache.GetAllPVCForLVG(lvgName) + _, err := cache.GetAllThickPVCForLVG(lvgName) if err != nil { b.Error(err) } - _, err = cache.GetLVGReservedSpace(lvgName) + _, err = cache.GetLVGThickReservedSpace(lvgName) if err != nil { b.Error(err) } diff --git a/images/sds-local-volume-scheduler-extender/pkg/controller/lvg_watcher_cache.go b/images/sds-local-volume-scheduler-extender/pkg/controller/lvg_watcher_cache.go index 1d7b1202..033401ee 100644 --- a/images/sds-local-volume-scheduler-extender/pkg/controller/lvg_watcher_cache.go +++ b/images/sds-local-volume-scheduler-extender/pkg/controller/lvg_watcher_cache.go @@ -71,7 +71,7 @@ func RunLVGWatcherCacheController( } log.Debug(fmt.Sprintf("[RunLVGWatcherCacheController] starts to clear the cache for the LVMVolumeGroup %s", lvg.Name)) - pvcs, err := cache.GetAllPVCForLVG(lvg.Name) + pvcs, err := cache.GetAllThickPVCForLVG(lvg.Name) if err != nil { log.Error(err, fmt.Sprintf("[RunLVGWatcherCacheController] unable to get all PVC for the LVMVolumeGroup %s", lvg.Name)) } @@ -123,7 +123,7 @@ func RunLVGWatcherCacheController( } log.Debug(fmt.Sprintf("[RunLVGWatcherCacheController] the LVMVolumeGroup %s should be reconciled by Update Func", newLvg.Name)) - cachedPVCs, err := cache.GetAllPVCForLVG(newLvg.Name) + cachedPVCs, err := cache.GetAllThickPVCForLVG(newLvg.Name) if err != nil { log.Error(err, fmt.Sprintf("[RunLVGWatcherCacheController] unable to get all PVC for the LVMVolumeGroup %s", newLvg.Name)) } @@ -132,7 +132,7 @@ func RunLVGWatcherCacheController( log.Trace(fmt.Sprintf("[RunLVGWatcherCacheController] PVC %s/%s has status phase %s", pvc.Namespace, pvc.Name, pvc.Status.Phase)) if pvc.Status.Phase == v1.ClaimBound { log.Debug(fmt.Sprintf("[RunLVGWatcherCacheController] PVC %s/%s from the cache has Status.Phase Bound. It will be removed from the reserved space in the LVMVolumeGroup %s", pvc.Namespace, pvc.Name, newLvg.Name)) - err = cache.RemoveBoundedPVCSpaceReservation(newLvg.Name, pvc) + err = cache.RemoveBoundedThickPVCSpaceReservation(newLvg.Name, pvc) if err != nil { log.Error(err, fmt.Sprintf("[RunLVGWatcherCacheController] unable to remove PVC %s/%s from the cache in the LVMVolumeGroup %s", pvc.Namespace, pvc.Name, newLvg.Name)) continue diff --git a/images/sds-local-volume-scheduler-extender/pkg/controller/pvc_watcher_cache.go b/images/sds-local-volume-scheduler-extender/pkg/controller/pvc_watcher_cache.go index c96ee95d..160834b8 100644 --- a/images/sds-local-volume-scheduler-extender/pkg/controller/pvc_watcher_cache.go +++ b/images/sds-local-volume-scheduler-extender/pkg/controller/pvc_watcher_cache.go @@ -168,7 +168,7 @@ func reconcilePVC(ctx context.Context, mgr manager.Manager, log logger.Logger, s log.Debug(fmt.Sprintf("[reconcilePVC] successfully found common LVMVolumeGroup %s for the selected node %s and PVC %s/%s", commonLVGName, selectedNodeName, pvc.Namespace, pvc.Name)) log.Debug(fmt.Sprintf("[reconcilePVC] starts to update PVC %s/%s in the cache", pvc.Namespace, pvc.Name)) log.Trace(fmt.Sprintf("[reconcilePVC] PVC %s/%s has status phase: %s", pvc.Namespace, pvc.Name, pvc.Status.Phase)) - err := schedulerCache.UpdatePVC(commonLVGName, pvc) + err := schedulerCache.UpdateThickPVC(commonLVGName, pvc) if err != nil { log.Error(err, fmt.Sprintf("[reconcilePVC] unable to update PVC %s/%s in the cache", pvc.Namespace, pvc.Name)) return @@ -178,7 +178,7 @@ func reconcilePVC(ctx context.Context, mgr manager.Manager, log logger.Logger, s log.Cache(fmt.Sprintf("[reconcilePVC] cache state BEFORE the removal space reservation for PVC %s/%s", pvc.Namespace, pvc.Name)) schedulerCache.PrintTheCacheLog() log.Debug(fmt.Sprintf("[reconcilePVC] starts to remove space reservation for PVC %s/%s with selected node from the cache", pvc.Namespace, pvc.Name)) - err = schedulerCache.RemoveSpaceReservationForPVCWithSelectedNode(pvc) + err = schedulerCache.RemoveSpaceReservationForThickPVCWithSelectedNode(pvc, false) if err != nil { log.Error(err, fmt.Sprintf("[reconcilePVC] unable to remove PVC %s/%s space reservation in the cache", pvc.Namespace, pvc.Name)) return diff --git a/images/sds-local-volume-scheduler-extender/pkg/scheduler/filter.go b/images/sds-local-volume-scheduler-extender/pkg/scheduler/filter.go index 3291bad4..ca2a52e4 100644 --- a/images/sds-local-volume-scheduler-extender/pkg/scheduler/filter.go +++ b/images/sds-local-volume-scheduler-extender/pkg/scheduler/filter.go @@ -167,7 +167,8 @@ func populateCache(log logger.Logger, nodes []corev1.Node, pod *corev1.Pod, sche pvc := pvcs[volume.PersistentVolumeClaim.ClaimName] sc := scs[*pvc.Spec.StorageClassName] - if sc.Parameters[lvmTypeParamKey] == thick { + switch sc.Parameters[lvmTypeParamKey] { + case thick: log.Debug(fmt.Sprintf("[populateCache] Storage Class %s has device type Thick, so the cache will be populated by PVC space requests", sc.Name)) lvgsForPVC, err := ExtractLVGsFromSC(sc) if err != nil { @@ -178,14 +179,29 @@ func populateCache(log logger.Logger, nodes []corev1.Node, pod *corev1.Pod, sche for _, lvg := range lvgsForPVC { if slices.Contains(lvgNamesForTheNode, lvg.Name) { log.Trace(fmt.Sprintf("[populateCache] PVC %s/%s will reserve space in LVMVolumeGroup %s cache", pvc.Namespace, pvc.Name, lvg.Name)) - err = schedulerCache.AddPVC(lvg.Name, pvc) + err = schedulerCache.AddThickPVC(lvg.Name, pvc) + if err != nil { + return err + } + } + } + case thin: + log.Debug(fmt.Sprintf("[populateCache] Storage Class %s has device type Thin, so the cache will be populated by PVC space requests", sc.Name)) + lvgsForPVC, err := ExtractLVGsFromSC(sc) + if err != nil { + return err + } + + log.Trace(fmt.Sprintf("[populateCache] LVMVolumeGroups from Storage Class %s for PVC %s/%s: %+v", sc.Name, pvc.Namespace, pvc.Name, lvgsForPVC)) + for _, lvg := range lvgsForPVC { + if slices.Contains(lvgNamesForTheNode, lvg.Name) { + log.Trace(fmt.Sprintf("[populateCache] PVC %s/%s will reserve space in LVMVolumeGroup %s Thin Pool %s cache", pvc.Namespace, pvc.Name, lvg.Name, lvg.Thin.PoolName)) + err = schedulerCache.AddThinPVC(lvg.Name, lvg.Thin.PoolName, pvc) if err != nil { return err } } } - } else { - log.Debug(fmt.Sprintf("[populateCache] Storage Class %s has device type Thin, so the cache should NOT be populated by PVC space requests", sc.Name)) } } } @@ -292,22 +308,34 @@ func filterNodes( log.Trace(fmt.Sprintf("[filterNodes] the LVMVolumeGroup %s is actually used. VG size: %s, allocatedSize: %s", lvg.Name, lvg.Status.VGSize.String(), lvg.Status.AllocatedSize.String())) } - lvgsThickFree := getLVGThickFreeSpaces(log, usedLVGs) + lvgsThickFree := getLVGThickFreeSpaces(usedLVGs) log.Trace(fmt.Sprintf("[filterNodes] for a Pod %s/%s current LVMVolumeGroups Thick FreeSpace on the node: %+v", pod.Namespace, pod.Name, lvgsThickFree)) - for lvgName, freeSpace := range lvgsThickFree { log.Trace(fmt.Sprintf("[filterNodes] current LVMVolumeGroup %s Thick free space %s", lvgName, resource.NewQuantity(freeSpace, resource.BinarySI))) - reservedSize, err := schedulerCache.GetLVGReservedSpace(lvgName) + reservedSpace, err := schedulerCache.GetLVGThickReservedSpace(lvgName) if err != nil { - log.Error(err, fmt.Sprintf("[filterNodes] unable to cound cache reserved size for the LVMVolumeGroup %s", lvgName)) + log.Error(err, fmt.Sprintf("[filterNodes] unable to count cache reserved space for the LVMVolumeGroup %s", lvgName)) continue } - log.Trace(fmt.Sprintf("[filterNodes] current LVMVolumeGroup %s reserved PVC space %s", lvgName, resource.NewQuantity(reservedSize, resource.BinarySI))) - lvgsThickFree[lvgName] -= reservedSize + log.Trace(fmt.Sprintf("[filterNodes] current LVMVolumeGroup %s reserved PVC space %s", lvgName, resource.NewQuantity(reservedSpace, resource.BinarySI))) + lvgsThickFree[lvgName] -= reservedSpace } log.Trace(fmt.Sprintf("[filterNodes] for a Pod %s/%s current LVMVolumeGroups Thick FreeSpace with reserved PVC: %+v", pod.Namespace, pod.Name, lvgsThickFree)) - lvgsThickFreeMutex := &sync.RWMutex{} + lvgsThinFree := getLVGThinFreeSpaces(usedLVGs) + log.Trace(fmt.Sprintf("[filterNodes] for a Pod %s/%s current LVMVolumeGroups Thin FreeSpace on the node: %+v", pod.Namespace, pod.Name, lvgsThinFree)) + for lvgName, thinPools := range lvgsThinFree { + for tpName, freeSpace := range thinPools { + log.Trace(fmt.Sprintf("[filterNodes] current LVMVolumeGroup %s Thin Pool %s free space %s", lvgName, tpName, resource.NewQuantity(freeSpace, resource.BinarySI))) + reservedSpace, err := schedulerCache.GetLVGThinReservedSpace(lvgName, tpName) + if err != nil { + log.Error(err, fmt.Sprintf("[filterNodes] unable to count cache reserved space for the Thin pool %s of the LVMVolumeGroup %s", tpName, lvgName)) + continue + } + log.Trace(fmt.Sprintf("[filterNodes] current LVMVolumeGroup %s Thin pool %s reserved PVC space %s", lvgName, tpName, resource.NewQuantity(reservedSpace, resource.BinarySI))) + lvgsThinFree[lvgName][tpName] -= reservedSpace + } + } nodeLVGs := SortLVGsByNodeName(usedLVGs) for n, ls := range nodeLVGs { @@ -325,6 +353,9 @@ func filterNodes( Nodes: &corev1.NodeList{}, FailedNodes: FailedNodesMap{}, } + + thickFreeMtx := &sync.RWMutex{} + thinFreeMtx := &sync.RWMutex{} failedNodesMutex := &sync.Mutex{} wg := &sync.WaitGroup{} @@ -333,9 +364,9 @@ func filterNodes( for i, node := range nodes.Items { go func(i int, node corev1.Node) { - log.Debug(fmt.Sprintf("[filterNodes] gourutine %d starts the work", i)) + log.Trace(fmt.Sprintf("[filterNodes] gourutine %d starts the work", i)) defer func() { - log.Debug(fmt.Sprintf("[filterNodes] gourutine %d ends the work", i)) + log.Trace(fmt.Sprintf("[filterNodes] gourutine %d ends the work", i)) wg.Done() }() @@ -347,12 +378,18 @@ func filterNodes( return } + // we get all LVMVolumeGroups from the node-applicant (which is common for all the PVCs) lvgsFromNode := commonNodes[node.Name] hasEnoughSpace := true + // now we iterate all over the PVCs to get if we can place all of them on the node (does the node have enough space) for _, pvc := range pvcs { pvcReq := pvcRequests[pvc.Name] + + // we get LVGs which might be used by the PVC lvgsFromSC := scLVGs[*pvc.Spec.StorageClassName] + + // we get the specific LVG which the PVC can use on the node commonLVG := findMatchedLVG(lvgsFromNode, lvgsFromSC) if commonLVG == nil { err = errors.New(fmt.Sprintf("unable to match Storage Class's LVMVolumeGroup with the node's one, Storage Class: %s, node: %s", *pvc.Spec.StorageClassName, node.Name)) @@ -361,12 +398,13 @@ func filterNodes( } log.Trace(fmt.Sprintf("[scoreNodes] LVMVolumeGroup %s is common for storage class %s and node %s", commonLVG.Name, *pvc.Spec.StorageClassName, node.Name)) + // see what kind of space does the PVC need switch pvcReq.DeviceType { case thick: lvg := lvgs[commonLVG.Name] - lvgsThickFreeMutex.RLock() + thickFreeMtx.RLock() freeSpace := lvgsThickFree[lvg.Name] - lvgsThickFreeMutex.RUnlock() + thickFreeMtx.RUnlock() log.Trace(fmt.Sprintf("[filterNodes] LVMVolumeGroup %s Thick free space: %s, PVC requested space: %s", lvg.Name, resource.NewQuantity(freeSpace, resource.BinarySI), resource.NewQuantity(pvcReq.RequestedSize, resource.BinarySI))) if freeSpace < pvcReq.RequestedSize { @@ -374,32 +412,38 @@ func filterNodes( break } - lvgsThickFreeMutex.Lock() + thickFreeMtx.Lock() lvgsThickFree[lvg.Name] -= pvcReq.RequestedSize - lvgsThickFreeMutex.Unlock() + thickFreeMtx.Unlock() case thin: lvg := lvgs[commonLVG.Name] + + // we try to find specific ThinPool which the PVC can use in the LVMVolumeGroup targetThinPool := findMatchedThinPool(lvg.Status.ThinPools, commonLVG.Thin.PoolName) if targetThinPool == nil { err = fmt.Errorf("unable to match Storage Class's ThinPools with the node's one, Storage Class: %s; node: %s; lvg thin pools: %+v; thin.poolName from StorageClass: %s", *pvc.Spec.StorageClassName, node.Name, lvg.Status.ThinPools, commonLVG.Thin.PoolName) errs <- err return } - // TODO: add after overCommit implementation - // freeSpace, err := getThinPoolFreeSpace(targetThinPool) - // if err != nil { - // errs <- err - // return - // } - - // log.Trace(fmt.Sprintf("[filterNodes] ThinPool free space: %d, PVC requested space: %d", freeSpace.Value(), pvcReq.RequestedSize)) - - // if freeSpace.Value() < pvcReq.RequestedSize { - // hasEnoughSpace = false - // } + + thinFreeMtx.RLock() + freeSpace := lvgsThinFree[lvg.Name][targetThinPool.Name] + thinFreeMtx.RUnlock() + + log.Trace(fmt.Sprintf("[filterNodes] LVMVolumeGroup %s Thin Pool %s free space: %s, PVC requested space: %s", lvg.Name, targetThinPool.Name, resource.NewQuantity(freeSpace, resource.BinarySI), resource.NewQuantity(pvcReq.RequestedSize, resource.BinarySI))) + + if freeSpace < pvcReq.RequestedSize { + hasEnoughSpace = false + break + } + + thinFreeMtx.Lock() + lvgsThinFree[lvg.Name][targetThinPool.Name] -= pvcReq.RequestedSize + thinFreeMtx.Unlock() } if !hasEnoughSpace { + // we break as if only one PVC can't get enough space, the node does not fit break } } @@ -437,21 +481,33 @@ func filterNodes( return result, nil } -func getLVGThickFreeSpaces(log logger.Logger, lvgs map[string]*v1alpha1.LvmVolumeGroup) map[string]int64 { - result := make(map[string]int64, len(lvgs)) +func getLVGThinFreeSpaces(lvgs map[string]*v1alpha1.LvmVolumeGroup) map[string]map[string]int64 { + result := make(map[string]map[string]int64, len(lvgs)) for _, lvg := range lvgs { - log.Debug(fmt.Sprintf("[getLVGThickFreeSpaces] tries to count free VG space for LVMVolumeGroup %s", lvg.Name)) - free := getVGFreeSpace(lvg) - log.Debug(fmt.Sprintf("[getLVGThickFreeSpaces] successfully counted free VG space for LVMVolumeGroup %s", lvg.Name)) + if result[lvg.Name] == nil { + result[lvg.Name] = make(map[string]int64, len(lvg.Status.ThinPools)) + } - result[lvg.Name] = free.Value() + for _, tp := range lvg.Status.ThinPools { + result[lvg.Name][tp.Name] = tp.AvailableSpace.Value() + } } return result } -func findMatchedThinPool(thinPools []v1alpha1.StatusThinPool, name string) *v1alpha1.StatusThinPool { +func getLVGThickFreeSpaces(lvgs map[string]*v1alpha1.LvmVolumeGroup) map[string]int64 { + result := make(map[string]int64, len(lvgs)) + + for _, lvg := range lvgs { + result[lvg.Name] = lvg.Status.VGFree.Value() + } + + return result +} + +func findMatchedThinPool(thinPools []v1alpha1.LvmVolumeGroupThinPoolStatus, name string) *v1alpha1.LvmVolumeGroupThinPoolStatus { for _, tp := range thinPools { if tp.Name == name { return &tp @@ -578,20 +634,20 @@ func SortLVGsByNodeName(lvgs map[string]*v1alpha1.LvmVolumeGroup) map[string][]* return sorted } -func getVGFreeSpace(lvg *v1alpha1.LvmVolumeGroup) resource.Quantity { - // notice that .Sub method uses pointer but not a copy of the quantity - free := lvg.Status.VGSize - free.Sub(lvg.Status.AllocatedSize) - return free -} - -func getThinPoolFreeSpace(tp *v1alpha1.StatusThinPool) resource.Quantity { - // notice that .Sub method uses pointer but not a copy of the quantity - free := tp.ActualSize - free.Sub(tp.UsedSize) - - return free -} +//func getVGFreeSpace(lvg *v1alpha1.LvmVolumeGroup) resource.Quantity { +// // notice that .Sub method uses pointer but not a copy of the quantity +// free := lvg.Status.VGSize +// free.Sub(lvg.Status.AllocatedSize) +// return free +//} + +//func getThinPoolFreeSpace(tp *v1alpha1.LvmVolumeGroupThinPoolStatus) resource.Quantity { +// // notice that .Sub method uses pointer but not a copy of the quantity +// free := tp.ActualSize +// free.Sub(tp.UsedSize) +// +// return free +//} func getPersistentVolumes(ctx context.Context, cl client.Client) (map[string]corev1.PersistentVolume, error) { pvs := &corev1.PersistentVolumeList{} diff --git a/images/sds-local-volume-scheduler-extender/pkg/scheduler/prioritize.go b/images/sds-local-volume-scheduler-extender/pkg/scheduler/prioritize.go index 672ec980..9e7cb459 100644 --- a/images/sds-local-volume-scheduler-extender/pkg/scheduler/prioritize.go +++ b/images/sds-local-volume-scheduler-extender/pkg/scheduler/prioritize.go @@ -157,9 +157,9 @@ func scoreNodes( lvg := lvgs[commonLVG.Name] switch pvcReq.DeviceType { case thick: - freeSpace = getVGFreeSpace(lvg) + freeSpace = lvg.Status.VGFree log.Trace(fmt.Sprintf("[scoreNodes] LVMVolumeGroup %s free thick space before PVC reservation: %s", lvg.Name, freeSpace.String())) - reserved, err := schedulerCache.GetLVGReservedSpace(lvg.Name) + reserved, err := schedulerCache.GetLVGThickReservedSpace(lvg.Name) if err != nil { log.Error(err, fmt.Sprintf("[scoreNodes] unable to count reserved space for the LVMVolumeGroup %s", lvg.Name)) continue @@ -177,7 +177,7 @@ func scoreNodes( return } - freeSpace = getThinPoolFreeSpace(thinPool) + freeSpace = thinPool.AvailableSpace } log.Trace(fmt.Sprintf("[scoreNodes] LVMVolumeGroup %s total size: %s", lvg.Name, lvg.Status.VGSize.String())) diff --git a/images/sds-local-volume-scheduler-extender/pkg/scheduler/route.go b/images/sds-local-volume-scheduler-extender/pkg/scheduler/route.go index 85488ce3..5d0213d8 100644 --- a/images/sds-local-volume-scheduler-extender/pkg/scheduler/route.go +++ b/images/sds-local-volume-scheduler-extender/pkg/scheduler/route.go @@ -95,7 +95,7 @@ func (s *scheduler) getCache(w http.ResponseWriter, r *http.Request) { lvgs := s.cache.GetAllLVG() for _, lvg := range lvgs { - pvcs, err := s.cache.GetAllPVCForLVG(lvg.Name) + pvcs, err := s.cache.GetAllThickPVCForLVG(lvg.Name) if err != nil { w.WriteHeader(http.StatusInternalServerError) s.log.Error(err, "something bad") @@ -119,7 +119,7 @@ func (s *scheduler) getCache(w http.ResponseWriter, r *http.Request) { } for lvgName, pvcs := range result { - reserved, err := s.cache.GetLVGReservedSpace(lvgName) + reserved, err := s.cache.GetLVGThickReservedSpace(lvgName) if err != nil { w.WriteHeader(http.StatusInternalServerError) _, err = w.Write([]byte("unable to write the cache")) @@ -168,7 +168,7 @@ func (s *scheduler) getCacheStat(w http.ResponseWriter, r *http.Request) { pvcTotalCount := 0 lvgs := s.cache.GetAllLVG() for _, lvg := range lvgs { - pvcs, err := s.cache.GetAllPVCForLVG(lvg.Name) + pvcs, err := s.cache.GetAllThickPVCForLVG(lvg.Name) if err != nil { s.log.Error(err, "something bad") }