diff --git a/images/sds-local-volume-csi/driver/controller.go b/images/sds-local-volume-csi/driver/controller.go index 6b058689..04c08ca6 100644 --- a/images/sds-local-volume-csi/driver/controller.go +++ b/images/sds-local-volume-csi/driver/controller.go @@ -31,6 +31,7 @@ import ( "k8s.io/apimachinery/pkg/api/resource" ) +// TODO: сделать поддержку расчета свободного места при Immediate func (d *Driver) CreateVolume(ctx context.Context, request *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { d.log.Info("method CreateVolume") diff --git a/images/sds-local-volume-scheduler-extender/pkg/cache/cache.go b/images/sds-local-volume-scheduler-extender/pkg/cache/cache.go index fe68e405..eb4c5a30 100644 --- a/images/sds-local-volume-scheduler-extender/pkg/cache/cache.go +++ b/images/sds-local-volume-scheduler-extender/pkg/cache/cache.go @@ -6,6 +6,7 @@ import ( v1 "k8s.io/api/core/v1" slices2 "k8s.io/utils/strings/slices" "sds-local-volume-scheduler-extender/api/v1alpha1" + "sds-local-volume-scheduler-extender/pkg/consts" "sds-local-volume-scheduler-extender/pkg/logger" "sync" ) @@ -280,7 +281,7 @@ func (c *Cache) shouldAddPVC(pvc *v1.PersistentVolumeClaim, lvgCh *lvgCache, pvc _, found = lvgCh.thickPVCs.Load(pvcKey) if found { - c.log.Warning(fmt.Sprintf("[shouldAddPVC] PVC %s cache has been already added as thick to the LVMVolumeGroup %s", pvcKey, lvgName)) + c.log.Debug(fmt.Sprintf("[shouldAddPVC] PVC %s was found in the cache of the LVMVolumeGroup %s", pvcKey, lvgName)) return false, nil } @@ -292,7 +293,7 @@ func (c *Cache) shouldAddPVC(pvc *v1.PersistentVolumeClaim, lvgCh *lvgCache, pvc } if _, found = thinPoolCh.(*thinPoolCache).pvcs.Load(pvcKey); found { - c.log.Debug(fmt.Sprintf("[shouldAddPVC] PVC %s was found in the Thin pool %s cache. No need to add", pvcKey, thinPoolName)) + c.log.Debug(fmt.Sprintf("[shouldAddPVC] PVC %s was found in the Thin pool %s cache of the LVMVolumeGroup %s. No need to add", pvcKey, thinPoolName, lvgName)) return false, nil } } @@ -316,6 +317,13 @@ func (c *Cache) AddThinPVC(lvgName, thinPoolName string, pvc *v1.PersistentVolum return err } + thinPoolBelongs := c.checkIfThinPoolBelongsToLVG(lvgCh.(*lvgCache), thinPoolName) + if !thinPoolBelongs { + err := fmt.Errorf("thin pool %s was not found in the LVMVolumeGroup %s", thinPoolName, lvgName) + c.log.Error(err, fmt.Sprintf("[AddThinPVC] unable to add Thin pool %s of the LVMVolumeGroup %s for the PVC %s", thinPoolName, lvgName, pvcKey)) + return err + } + // this case might be triggered if the extender recovers after fail and finds some pending thickPVCs with selected nodes c.log.Trace(fmt.Sprintf("[AddThinPVC] PVC %s/%s annotations: %v", pvc.Namespace, pvc.Name, pvc.Annotations)) shouldAdd, err := c.shouldAddPVC(pvc, lvgCh.(*lvgCache), pvcKey, lvgName, thinPoolName) @@ -329,14 +337,28 @@ func (c *Cache) AddThinPVC(lvgName, thinPoolName string, pvc *v1.PersistentVolum } c.log.Debug(fmt.Sprintf("[AddThinPVC] new PVC %s cache will be added to the LVMVolumeGroup %s", pvcKey, lvgName)) - c.addNewThinPVC(lvgCh.(*lvgCache), pvc, thinPoolName) + err = c.addNewThinPVC(lvgCh.(*lvgCache), pvc, thinPoolName) + if err != nil { + c.log.Error(err, fmt.Sprintf("[AddThinPVC] unable to add PVC %s to Thin Pool %s of the LVMVolumeGroup %s", pvcKey, thinPoolName, lvgName)) + return err + } return nil } +func (c *Cache) checkIfThinPoolBelongsToLVG(lvgCh *lvgCache, thinPoolName string) bool { + for _, tp := range lvgCh.lvg.Status.ThinPools { + if tp.Name == thinPoolName { + return true + } + } + + return false +} + func (c *Cache) addNewThickPVC(lvgCh *lvgCache, pvc *v1.PersistentVolumeClaim) { pvcKey := configurePVCKey(pvc) - lvgCh.thinPools.Store(pvcKey, &pvcCache{pvc: pvc, selectedNode: pvc.Annotations[SelectedNodeAnnotation]}) + lvgCh.thickPVCs.Store(pvcKey, &pvcCache{pvc: pvc, selectedNode: pvc.Annotations[SelectedNodeAnnotation]}) c.addLVGToPVC(lvgCh.lvg.Name, pvcKey) } @@ -350,14 +372,20 @@ func (c *Cache) addNewThinPVC(lvgCh *lvgCache, pvc *v1.PersistentVolumeClaim, th return err } + err := c.addNewThinPool(lvgCh, pvc, thinPoolName) + if err != nil { + c.log.Error(err, fmt.Sprintf("[addNewThinPVC] unable to add Thin pool %s in the LVMVolumeGroup %s cache for PVC %s", thinPoolName, lvgCh.lvg.Name, pvc.Name)) + return err + } + thinPoolCh, found := lvgCh.thinPools.Load(thinPoolName) if !found { - err := fmt.Errorf("thin pool %s not found", thinPoolName) + err = fmt.Errorf("thin pool %s not found", thinPoolName) c.log.Error(err, fmt.Sprintf("[addNewThinPVC] unable to add Thin PVC %s to the cache", pvcKey)) return err } - thinPoolCh.(*thinPoolCache).pvcs.Store(pvcKey, pvc) + thinPoolCh.(*thinPoolCache).pvcs.Store(pvcKey, &pvcCache{pvc: pvc, selectedNode: pvc.Annotations[SelectedNodeAnnotation]}) c.log.Debug(fmt.Sprintf("[addNewThinPVC] THIN PVC %s was added to the cache to Thin Pool %s", pvcKey, thinPoolName)) c.addLVGToPVC(lvgCh.lvg.Name, pvcKey) @@ -459,21 +487,42 @@ func (c *Cache) addNewThinPool(lvgCh *lvgCache, pvc *v1.PersistentVolumeClaim, t return nil } -// GetAllThickPVCForLVG returns slice of PVC belonging to selected LVMVolumeGroup resource. If such LVMVolumeGroup is not stored in the cache, returns an error. -func (c *Cache) GetAllThickPVCForLVG(lvgName string) ([]*v1.PersistentVolumeClaim, error) { +// GetAllPVCForLVG returns slice of PVC belonging to selected LVMVolumeGroup resource. If such LVMVolumeGroup is not stored in the cache, returns an error. +func (c *Cache) GetAllPVCForLVG(lvgName string) ([]*v1.PersistentVolumeClaim, error) { lvgCh, found := c.lvgs.Load(lvgName) if !found { err := fmt.Errorf("cache was not found for the LVMVolumeGroup %s", lvgName) - c.log.Error(err, fmt.Sprintf("[GetAllThickPVCForLVG] an error occured while trying to get all PVC for the LVMVolumeGroup %s", lvgName)) + c.log.Error(err, fmt.Sprintf("[GetAllPVCForLVG] an error occured while trying to get all PVC for the LVMVolumeGroup %s", lvgName)) return nil, err } - result := make([]*v1.PersistentVolumeClaim, 0, pvcPerLVGCount) + size := 0 + lvgCh.(*lvgCache).thickPVCs.Range(func(key, value any) bool { + size++ + return true + }) + lvgCh.(*lvgCache).thinPools.Range(func(tpName, tpCh any) bool { + tpCh.(*thinPoolCache).pvcs.Range(func(key, value any) bool { + size++ + return true + }) + return true + }) + + result := make([]*v1.PersistentVolumeClaim, 0, size) lvgCh.(*lvgCache).thickPVCs.Range(func(pvcName, pvcCh any) bool { result = append(result, pvcCh.(*pvcCache).pvc) return true }) + lvgCh.(*lvgCache).thinPools.Range(func(tpName, tpCh any) bool { + tpCh.(*thinPoolCache).pvcs.Range(func(pvcName, pvcCh any) bool { + result = append(result, pvcCh.(*pvcCache).pvc) + return true + }) + return true + }) + return result, nil } @@ -585,14 +634,14 @@ func (c *Cache) CheckIsPVCStored(pvc *v1.PersistentVolumeClaim) bool { return false } -// RemoveSpaceReservationForThickPVCWithSelectedNode removes space reservation for selected PVC for every LVMVolumeGroup resource, which is not bound to the PVC selected node. -func (c *Cache) RemoveSpaceReservationForThickPVCWithSelectedNode(pvc *v1.PersistentVolumeClaim, thin bool) error { +// RemoveSpaceReservationForPVCWithSelectedNode removes space reservation for selected PVC for every LVMVolumeGroup resource, which is not bound to the PVC selected node. +func (c *Cache) RemoveSpaceReservationForPVCWithSelectedNode(pvc *v1.PersistentVolumeClaim, deviceType string) error { pvcKey := configurePVCKey(pvc) selectedLVGName := "" lvgNamesForPVC, found := c.pvcLVGs.Load(pvcKey) if !found { - c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForThickPVCWithSelectedNode] cache for PVC %s has been already removed", pvcKey)) + c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] cache for PVC %s has been already removed", pvcKey)) return nil } @@ -600,60 +649,60 @@ func (c *Cache) RemoveSpaceReservationForThickPVCWithSelectedNode(pvc *v1.Persis lvgCh, found := c.lvgs.Load(lvgName) if !found || lvgCh == nil { err := fmt.Errorf("no cache found for the LVMVolumeGroup %s", lvgName) - c.log.Error(err, fmt.Sprintf("[RemoveSpaceReservationForThickPVCWithSelectedNode] an error occured while trying to remove space reservation for PVC %s", pvcKey)) + c.log.Error(err, fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] an error occured while trying to remove space reservation for PVC %s", pvcKey)) return err } - switch thin { - case true: + switch deviceType { + case consts.Thin: lvgCh.(*lvgCache).thinPools.Range(func(thinPoolName, thinPoolCh any) bool { pvcCh, found := thinPoolCh.(*thinPoolCache).pvcs.Load(pvcKey) if !found { - c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForThickPVCWithSelectedNode] PVC %s space reservation in the LVMVolumeGroup %s has been already removed", pvcKey, lvgName)) + c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] PVC %s space reservation in the LVMVolumeGroup %s has been already removed", pvcKey, lvgName)) return true } selectedNode := pvcCh.(*pvcCache).selectedNode if selectedNode == "" { thinPoolCh.(*thinPoolCache).pvcs.Delete(pvcKey) - c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForThickPVCWithSelectedNode] removed space reservation for PVC %s in the Thin pool %s of the LVMVolumeGroup %s due the PVC got selected to the node %s", pvcKey, thinPoolName.(string), lvgName, pvc.Annotations[SelectedNodeAnnotation])) + c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] removed space reservation for PVC %s in the Thin pool %s of the LVMVolumeGroup %s due the PVC got selected to the node %s", pvcKey, thinPoolName.(string), lvgName, pvc.Annotations[SelectedNodeAnnotation])) } else { selectedLVGName = lvgName - c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForThickPVCWithSelectedNode] PVC %s got selected to the node %s. It should not be revomed from the LVMVolumeGroup %s", pvcKey, pvc.Annotations[SelectedNodeAnnotation], lvgName)) + c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] PVC %s got selected to the node %s. It should not be revomed from the LVMVolumeGroup %s", pvcKey, pvc.Annotations[SelectedNodeAnnotation], lvgName)) } return true }) - case false: + case consts.Thick: pvcCh, found := lvgCh.(*lvgCache).thickPVCs.Load(pvcKey) if !found { - c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForThickPVCWithSelectedNode] PVC %s space reservation in the LVMVolumeGroup %s has been already removed", pvcKey, lvgName)) + c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] PVC %s space reservation in the LVMVolumeGroup %s has been already removed", pvcKey, lvgName)) continue } selectedNode := pvcCh.(*pvcCache).selectedNode if selectedNode == "" { lvgCh.(*lvgCache).thickPVCs.Delete(pvcKey) - c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForThickPVCWithSelectedNode] removed space reservation for PVC %s in the LVMVolumeGroup %s due the PVC got selected to the node %s", pvcKey, lvgName, pvc.Annotations[SelectedNodeAnnotation])) + c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] removed space reservation for PVC %s in the LVMVolumeGroup %s due the PVC got selected to the node %s", pvcKey, lvgName, pvc.Annotations[SelectedNodeAnnotation])) } else { selectedLVGName = lvgName - c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForThickPVCWithSelectedNode] PVC %s got selected to the node %s. It should not be revomed from the LVMVolumeGroup %s", pvcKey, pvc.Annotations[SelectedNodeAnnotation], lvgName)) + c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] PVC %s got selected to the node %s. It should not be revomed from the LVMVolumeGroup %s", pvcKey, pvc.Annotations[SelectedNodeAnnotation], lvgName)) } } } - c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForThickPVCWithSelectedNode] PVC %s space reservation has been removed from LVMVolumeGroup cache", pvcKey)) + c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] PVC %s space reservation has been removed from LVMVolumeGroup cache", pvcKey)) - c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForThickPVCWithSelectedNode] cache for PVC %s will be wiped from unused LVMVolumeGroups", pvcKey)) + c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] cache for PVC %s will be wiped from unused LVMVolumeGroups", pvcKey)) cleared := make([]string, 0, len(lvgNamesForPVC.([]string))) for _, lvgName := range lvgNamesForPVC.([]string) { if lvgName == selectedLVGName { - c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForThickPVCWithSelectedNode] the LVMVolumeGroup %s will be saved for PVC %s cache as used", lvgName, pvcKey)) + c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] the LVMVolumeGroup %s will be saved for PVC %s cache as used", lvgName, pvcKey)) cleared = append(cleared, lvgName) } else { - c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForThickPVCWithSelectedNode] the LVMVolumeGroup %s will be removed from PVC %s cache as not used", lvgName, pvcKey)) + c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] the LVMVolumeGroup %s will be removed from PVC %s cache as not used", lvgName, pvcKey)) } } - c.log.Trace(fmt.Sprintf("[RemoveSpaceReservationForThickPVCWithSelectedNode] cleared LVMVolumeGroups for PVC %s: %v", pvcKey, cleared)) + c.log.Trace(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] cleared LVMVolumeGroups for PVC %s: %v", pvcKey, cleared)) c.pvcLVGs.Store(pvcKey, cleared) return nil @@ -722,7 +771,16 @@ func (c *Cache) PrintTheCacheLog() { c.log.Cache(fmt.Sprintf("[%s]", lvgName)) lvgCh.(*lvgCache).thickPVCs.Range(func(pvcName, pvcCh any) bool { - c.log.Cache(fmt.Sprintf(" PVC %s, selected node: %s", pvcName, pvcCh.(*pvcCache).selectedNode)) + c.log.Cache(fmt.Sprintf(" THICK PVC %s, selected node: %s", pvcName, pvcCh.(*pvcCache).selectedNode)) + return true + }) + + lvgCh.(*lvgCache).thinPools.Range(func(thinPoolName, thinPoolCh any) bool { + thinPoolCh.(*thinPoolCache).pvcs.Range(func(pvcName, pvcCh any) bool { + c.log.Cache(fmt.Sprintf(" THIN POOL %s PVC %s, selected node: %s", thinPoolName, pvcName, pvcCh.(*pvcCache).selectedNode)) + return true + }) + return true }) diff --git a/images/sds-local-volume-scheduler-extender/pkg/cache/cache_test.go b/images/sds-local-volume-scheduler-extender/pkg/cache/cache_test.go index 0c819571..31816c1e 100644 --- a/images/sds-local-volume-scheduler-extender/pkg/cache/cache_test.go +++ b/images/sds-local-volume-scheduler-extender/pkg/cache/cache_test.go @@ -573,7 +573,7 @@ func BenchmarkCache_FullLoad(b *testing.B) { lvgMp := cache.GetAllLVG() for lvgName := range lvgMp { - _, err := cache.GetAllThickPVCForLVG(lvgName) + _, err := cache.GetAllPVCForLVG(lvgName) if err != nil { b.Error(err) } diff --git a/images/sds-local-volume-scheduler-extender/pkg/consts/consts.go b/images/sds-local-volume-scheduler-extender/pkg/consts/consts.go new file mode 100644 index 00000000..947c77ab --- /dev/null +++ b/images/sds-local-volume-scheduler-extender/pkg/consts/consts.go @@ -0,0 +1,11 @@ +package consts + +const ( + SdsLocalVolumeProvisioner = "local.csi.storage.deckhouse.io" + + LvmTypeParamKey = "local.csi.storage.deckhouse.io/lvm-type" + LvmVolumeGroupsParamKey = "local.csi.storage.deckhouse.io/lvm-volume-groups" + + Thick = "Thick" + Thin = "Thin" +) diff --git a/images/sds-local-volume-scheduler-extender/pkg/controller/lvg_watcher_cache.go b/images/sds-local-volume-scheduler-extender/pkg/controller/lvg_watcher_cache.go index 033401ee..f9ce77bb 100644 --- a/images/sds-local-volume-scheduler-extender/pkg/controller/lvg_watcher_cache.go +++ b/images/sds-local-volume-scheduler-extender/pkg/controller/lvg_watcher_cache.go @@ -6,6 +6,7 @@ import ( "fmt" v1 "k8s.io/api/core/v1" "k8s.io/client-go/util/workqueue" + "reflect" "sds-local-volume-scheduler-extender/api/v1alpha1" "sds-local-volume-scheduler-extender/pkg/cache" "sds-local-volume-scheduler-extender/pkg/logger" @@ -71,7 +72,7 @@ func RunLVGWatcherCacheController( } log.Debug(fmt.Sprintf("[RunLVGWatcherCacheController] starts to clear the cache for the LVMVolumeGroup %s", lvg.Name)) - pvcs, err := cache.GetAllThickPVCForLVG(lvg.Name) + pvcs, err := cache.GetAllPVCForLVG(lvg.Name) if err != nil { log.Error(err, fmt.Sprintf("[RunLVGWatcherCacheController] unable to get all PVC for the LVMVolumeGroup %s", lvg.Name)) } @@ -116,27 +117,31 @@ func RunLVGWatcherCacheController( log.Trace(fmt.Sprintf("[RunLVGWatcherCacheController] old state LVMVolumeGroup %s has size %s", oldLvg.Name, oldLvg.Status.AllocatedSize.String())) log.Trace(fmt.Sprintf("[RunLVGWatcherCacheController] new state LVMVolumeGroup %s has size %s", newLvg.Name, newLvg.Status.AllocatedSize.String())) - if newLvg.DeletionTimestamp != nil || - oldLvg.Status.AllocatedSize.Value() == newLvg.Status.AllocatedSize.Value() { + if !shouldReconcileLVG(oldLvg, newLvg) { log.Debug(fmt.Sprintf("[RunLVGWatcherCacheController] the LVMVolumeGroup %s should not be reconciled", newLvg.Name)) return } log.Debug(fmt.Sprintf("[RunLVGWatcherCacheController] the LVMVolumeGroup %s should be reconciled by Update Func", newLvg.Name)) - cachedPVCs, err := cache.GetAllThickPVCForLVG(newLvg.Name) + cachedPVCs, err := cache.GetAllPVCForLVG(newLvg.Name) if err != nil { log.Error(err, fmt.Sprintf("[RunLVGWatcherCacheController] unable to get all PVC for the LVMVolumeGroup %s", newLvg.Name)) } + for _, pvc := range cachedPVCs { log.Trace(fmt.Sprintf("[RunLVGWatcherCacheController] PVC %s/%s from the cache belongs to LVMVolumeGroup %s", pvc.Namespace, pvc.Name, newLvg.Name)) log.Trace(fmt.Sprintf("[RunLVGWatcherCacheController] PVC %s/%s has status phase %s", pvc.Namespace, pvc.Name, pvc.Status.Phase)) if pvc.Status.Phase == v1.ClaimBound { log.Debug(fmt.Sprintf("[RunLVGWatcherCacheController] PVC %s/%s from the cache has Status.Phase Bound. It will be removed from the reserved space in the LVMVolumeGroup %s", pvc.Namespace, pvc.Name, newLvg.Name)) - err = cache.RemoveBoundedThickPVCSpaceReservation(newLvg.Name, pvc) - if err != nil { - log.Error(err, fmt.Sprintf("[RunLVGWatcherCacheController] unable to remove PVC %s/%s from the cache in the LVMVolumeGroup %s", pvc.Namespace, pvc.Name, newLvg.Name)) - continue - } + //err = cache.RemoveBoundedThickPVCSpaceReservation(newLvg.Name, pvc) + //if err != nil { + // log.Error(err, fmt.Sprintf("[RunLVGWatcherCacheController] unable to remove PVC %s/%s from the cache in the LVMVolumeGroup %s", pvc.Namespace, pvc.Name, newLvg.Name)) + // continue + //} + + cache.RemovePVCFromTheCache(pvc) + + //err = cache.RemoveBoundedThinPVCSpaceReservation() log.Debug(fmt.Sprintf("[RunLVGWatcherCacheController] PVC %s/%s was removed from the LVMVolumeGroup %s in the cache", pvc.Namespace, pvc.Name, newLvg.Name)) } @@ -163,3 +168,16 @@ func RunLVGWatcherCacheController( return c, nil } + +func shouldReconcileLVG(oldLVG, newLVG *v1alpha1.LvmVolumeGroup) bool { + if newLVG.DeletionTimestamp != nil { + return false + } + + if oldLVG.Status.AllocatedSize.Value() == newLVG.Status.AllocatedSize.Value() && + reflect.DeepEqual(oldLVG.Status.ThinPools, newLVG.Status.ThinPools) { + return false + } + + return true +} diff --git a/images/sds-local-volume-scheduler-extender/pkg/controller/pvc_watcher_cache.go b/images/sds-local-volume-scheduler-extender/pkg/controller/pvc_watcher_cache.go index 160834b8..0119084f 100644 --- a/images/sds-local-volume-scheduler-extender/pkg/controller/pvc_watcher_cache.go +++ b/images/sds-local-volume-scheduler-extender/pkg/controller/pvc_watcher_cache.go @@ -9,6 +9,7 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/utils/strings/slices" "sds-local-volume-scheduler-extender/pkg/cache" + "sds-local-volume-scheduler-extender/pkg/consts" "sds-local-volume-scheduler-extender/pkg/logger" "sds-local-volume-scheduler-extender/pkg/scheduler" "sigs.k8s.io/controller-runtime/pkg/client" @@ -21,8 +22,7 @@ import ( ) const ( - PVCWatcherCacheCtrlName = "pvc-watcher-cache-controller" - sdsLocalVolumeProvisioner = "local.csi.storage.deckhouse.io" + PVCWatcherCacheCtrlName = "pvc-watcher-cache-controller" ) func RunPVCWatcherCacheController( @@ -124,27 +124,28 @@ func reconcilePVC(ctx context.Context, mgr manager.Manager, log logger.Logger, s log.Trace(fmt.Sprintf("[reconcilePVC] LVMVolumeGroup %s belongs to the node %s", lvgName, selectedNodeName)) } + sc := &v12.StorageClass{} + err := mgr.GetClient().Get(ctx, client.ObjectKey{ + Name: *pvc.Spec.StorageClassName, + }, sc) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcilePVC] unable to get Storage Class %s for PVC %s/%s", *pvc.Spec.StorageClassName, pvc.Namespace, pvc.Name)) + return + } + + if sc.Provisioner != consts.SdsLocalVolumeProvisioner { + log.Debug(fmt.Sprintf("[reconcilePVC] Storage Class %s for PVC %s/%s is not managed by sds-local-volume-provisioner. Ends the reconciliation", sc.Name, pvc.Namespace, pvc.Name)) + return + } + + lvgsFromSc, err := scheduler.ExtractLVGsFromSC(sc) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcilePVC] unable to extract LVMVolumeGroups from the Storage Class %s", sc.Name)) + } + lvgsForPVC := schedulerCache.GetLVGNamesForPVC(pvc) if lvgsForPVC == nil || len(lvgsForPVC) == 0 { log.Debug(fmt.Sprintf("[reconcilePVC] no LVMVolumeGroups were found in the cache for PVC %s/%s. Use Storage Class %s instead", pvc.Namespace, pvc.Name, *pvc.Spec.StorageClassName)) - sc := &v12.StorageClass{} - err := mgr.GetClient().Get(ctx, client.ObjectKey{ - Name: *pvc.Spec.StorageClassName, - }, sc) - if err != nil { - log.Error(err, fmt.Sprintf("[reconcilePVC] unable to get Storage Class %s for PVC %s/%s", *pvc.Spec.StorageClassName, pvc.Namespace, pvc.Name)) - return - } - - if sc.Provisioner != sdsLocalVolumeProvisioner { - log.Debug(fmt.Sprintf("[reconcilePVC] Storage Class %s for PVC %s/%s is not managed by sds-local-volume-provisioner. Ends the reconciliation", sc.Name, pvc.Namespace, pvc.Name)) - return - } - - lvgsFromSc, err := scheduler.ExtractLVGsFromSC(sc) - if err != nil { - log.Error(err, fmt.Sprintf("[reconcilePVC] unable to extract LVMVolumeGroups from the Storage Class %s", sc.Name)) - } for _, lvg := range lvgsFromSc { lvgsForPVC = append(lvgsForPVC, lvg.Name) @@ -158,6 +159,7 @@ func reconcilePVC(ctx context.Context, mgr manager.Manager, log logger.Logger, s for _, pvcLvg := range lvgsForPVC { if slices.Contains(lvgsOnTheNode, pvcLvg) { commonLVGName = pvcLvg + break } } if commonLVGName == "" { @@ -167,18 +169,33 @@ func reconcilePVC(ctx context.Context, mgr manager.Manager, log logger.Logger, s log.Debug(fmt.Sprintf("[reconcilePVC] successfully found common LVMVolumeGroup %s for the selected node %s and PVC %s/%s", commonLVGName, selectedNodeName, pvc.Namespace, pvc.Name)) log.Debug(fmt.Sprintf("[reconcilePVC] starts to update PVC %s/%s in the cache", pvc.Namespace, pvc.Name)) - log.Trace(fmt.Sprintf("[reconcilePVC] PVC %s/%s has status phase: %s", pvc.Namespace, pvc.Name, pvc.Status.Phase)) - err := schedulerCache.UpdateThickPVC(commonLVGName, pvc) - if err != nil { - log.Error(err, fmt.Sprintf("[reconcilePVC] unable to update PVC %s/%s in the cache", pvc.Namespace, pvc.Name)) - return + + log.Trace(fmt.Sprintf("[reconcilePVC] %s PVC %s/%s has status phase: %s", sc.Parameters[consts.LvmTypeParamKey], pvc.Namespace, pvc.Name, pvc.Status.Phase)) + switch sc.Parameters[consts.LvmTypeParamKey] { + case consts.Thick: + err = schedulerCache.UpdateThickPVC(commonLVGName, pvc) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcilePVC] unable to update Thick PVC %s/%s in the cache", pvc.Namespace, pvc.Name)) + return + } + case consts.Thin: + for _, lvg := range lvgsFromSc { + if lvg.Name == commonLVGName { + err = schedulerCache.UpdateThinPVC(commonLVGName, pvc, lvg.Thin.PoolName) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcilePVC] unable to update Thin PVC %s/%s in the cache", pvc.Namespace, pvc.Name)) + return + } + break + } + } } - log.Debug(fmt.Sprintf("[reconcilePVC] successfully updated PVC %s/%s in the cache", pvc.Namespace, pvc.Name)) + log.Debug(fmt.Sprintf("[reconcilePVC] successfully updated %s PVC %s/%s in the cache", sc.Parameters[consts.LvmTypeParamKey], pvc.Namespace, pvc.Name)) log.Cache(fmt.Sprintf("[reconcilePVC] cache state BEFORE the removal space reservation for PVC %s/%s", pvc.Namespace, pvc.Name)) schedulerCache.PrintTheCacheLog() log.Debug(fmt.Sprintf("[reconcilePVC] starts to remove space reservation for PVC %s/%s with selected node from the cache", pvc.Namespace, pvc.Name)) - err = schedulerCache.RemoveSpaceReservationForThickPVCWithSelectedNode(pvc, false) + err = schedulerCache.RemoveSpaceReservationForPVCWithSelectedNode(pvc, sc.Parameters[consts.LvmTypeParamKey]) if err != nil { log.Error(err, fmt.Sprintf("[reconcilePVC] unable to remove PVC %s/%s space reservation in the cache", pvc.Namespace, pvc.Name)) return diff --git a/images/sds-local-volume-scheduler-extender/pkg/scheduler/filter.go b/images/sds-local-volume-scheduler-extender/pkg/scheduler/filter.go index ca2a52e4..39d2f7af 100644 --- a/images/sds-local-volume-scheduler-extender/pkg/scheduler/filter.go +++ b/images/sds-local-volume-scheduler-extender/pkg/scheduler/filter.go @@ -28,22 +28,13 @@ import ( "net/http" "sds-local-volume-scheduler-extender/api/v1alpha1" "sds-local-volume-scheduler-extender/pkg/cache" + "sds-local-volume-scheduler-extender/pkg/consts" "sds-local-volume-scheduler-extender/pkg/logger" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/yaml" "sync" ) -const ( - sdsLocalVolumeProvisioner = "local.csi.storage.deckhouse.io" - - lvmTypeParamKey = "local.csi.storage.deckhouse.io/lvm-type" - lvmVolumeGroupsParamKey = "local.csi.storage.deckhouse.io/lvm-volume-groups" - - thick = "Thick" - thin = "Thin" -) - func (s *scheduler) filter(w http.ResponseWriter, r *http.Request) { s.log.Debug("[filter] starts the serving") var input ExtenderArgs @@ -146,7 +137,7 @@ func filterNotManagedPVC(log logger.Logger, pvcs map[string]*corev1.PersistentVo filteredPVCs := make(map[string]*corev1.PersistentVolumeClaim, len(pvcs)) for _, pvc := range pvcs { sc := scs[*pvc.Spec.StorageClassName] - if sc.Provisioner != sdsLocalVolumeProvisioner { + if sc.Provisioner != consts.SdsLocalVolumeProvisioner { log.Debug(fmt.Sprintf("[filterNotManagedPVC] filter out PVC %s/%s due to used Storage class %s is not managed by sds-local-volume-provisioner", pvc.Name, pvc.Namespace, sc.Name)) continue } @@ -167,8 +158,8 @@ func populateCache(log logger.Logger, nodes []corev1.Node, pod *corev1.Pod, sche pvc := pvcs[volume.PersistentVolumeClaim.ClaimName] sc := scs[*pvc.Spec.StorageClassName] - switch sc.Parameters[lvmTypeParamKey] { - case thick: + switch sc.Parameters[consts.LvmTypeParamKey] { + case consts.Thick: log.Debug(fmt.Sprintf("[populateCache] Storage Class %s has device type Thick, so the cache will be populated by PVC space requests", sc.Name)) lvgsForPVC, err := ExtractLVGsFromSC(sc) if err != nil { @@ -185,7 +176,7 @@ func populateCache(log logger.Logger, nodes []corev1.Node, pod *corev1.Pod, sche } } } - case thin: + case consts.Thin: log.Debug(fmt.Sprintf("[populateCache] Storage Class %s has device type Thin, so the cache will be populated by PVC space requests", sc.Name)) lvgsForPVC, err := ExtractLVGsFromSC(sc) if err != nil { @@ -233,30 +224,30 @@ func extractRequestedSize( log.Debug(fmt.Sprintf("[extractRequestedSize] PVC %s/%s has status phase: %s", pvc.Namespace, pvc.Name, pvc.Status.Phase)) switch pvc.Status.Phase { case corev1.ClaimPending: - switch sc.Parameters[lvmTypeParamKey] { - case thick: + switch sc.Parameters[consts.LvmTypeParamKey] { + case consts.Thick: pvcRequests[pvc.Name] = PVCRequest{ - DeviceType: thick, + DeviceType: consts.Thick, RequestedSize: pvc.Spec.Resources.Requests.Storage().Value(), } - case thin: + case consts.Thin: pvcRequests[pvc.Name] = PVCRequest{ - DeviceType: thin, + DeviceType: consts.Thin, RequestedSize: pvc.Spec.Resources.Requests.Storage().Value(), } } case corev1.ClaimBound: pv := pvs[pvc.Spec.VolumeName] - switch sc.Parameters[lvmTypeParamKey] { - case thick: + switch sc.Parameters[consts.LvmTypeParamKey] { + case consts.Thick: pvcRequests[pvc.Name] = PVCRequest{ - DeviceType: thick, + DeviceType: consts.Thick, RequestedSize: pvc.Spec.Resources.Requests.Storage().Value() - pv.Spec.Capacity.Storage().Value(), } - case thin: + case consts.Thin: pvcRequests[pvc.Name] = PVCRequest{ - DeviceType: thin, + DeviceType: consts.Thin, RequestedSize: pvc.Spec.Resources.Requests.Storage().Value() - pv.Spec.Capacity.Storage().Value(), } } @@ -279,7 +270,7 @@ func filterNodes( scs map[string]*v1.StorageClass, pvcRequests map[string]PVCRequest, ) (*ExtenderFilterResult, error) { - // Param "pvcRequests" is a total amount of the pvcRequests space (both thick and thin) for Pod (i.e. from every PVC) + // Param "pvcRequests" is a total amount of the pvcRequests space (both Thick and Thin) for Pod (i.e. from every PVC) if len(pvcRequests) == 0 { return &ExtenderFilterResult{ Nodes: nodes, @@ -400,7 +391,7 @@ func filterNodes( // see what kind of space does the PVC need switch pvcReq.DeviceType { - case thick: + case consts.Thick: lvg := lvgs[commonLVG.Name] thickFreeMtx.RLock() freeSpace := lvgsThickFree[lvg.Name] @@ -415,13 +406,13 @@ func filterNodes( thickFreeMtx.Lock() lvgsThickFree[lvg.Name] -= pvcReq.RequestedSize thickFreeMtx.Unlock() - case thin: + case consts.Thin: lvg := lvgs[commonLVG.Name] // we try to find specific ThinPool which the PVC can use in the LVMVolumeGroup targetThinPool := findMatchedThinPool(lvg.Status.ThinPools, commonLVG.Thin.PoolName) if targetThinPool == nil { - err = fmt.Errorf("unable to match Storage Class's ThinPools with the node's one, Storage Class: %s; node: %s; lvg thin pools: %+v; thin.poolName from StorageClass: %s", *pvc.Spec.StorageClassName, node.Name, lvg.Status.ThinPools, commonLVG.Thin.PoolName) + err = fmt.Errorf("unable to match Storage Class's ThinPools with the node's one, Storage Class: %s; node: %s; lvg Thin pools: %+v; Thin.poolName from StorageClass: %s", *pvc.Spec.StorageClassName, node.Name, lvg.Status.ThinPools, commonLVG.Thin.PoolName) errs <- err return } @@ -610,13 +601,13 @@ type LVMVolumeGroup struct { Name string `yaml:"name"` Thin struct { PoolName string `yaml:"poolName"` - } `yaml:"thin"` + } `yaml:"Thin"` } type LVMVolumeGroups []LVMVolumeGroup func ExtractLVGsFromSC(sc *v1.StorageClass) (LVMVolumeGroups, error) { var lvmVolumeGroups LVMVolumeGroups - err := yaml.Unmarshal([]byte(sc.Parameters[lvmVolumeGroupsParamKey]), &lvmVolumeGroups) + err := yaml.Unmarshal([]byte(sc.Parameters[consts.LvmVolumeGroupsParamKey]), &lvmVolumeGroups) if err != nil { return nil, err } @@ -634,21 +625,6 @@ func SortLVGsByNodeName(lvgs map[string]*v1alpha1.LvmVolumeGroup) map[string][]* return sorted } -//func getVGFreeSpace(lvg *v1alpha1.LvmVolumeGroup) resource.Quantity { -// // notice that .Sub method uses pointer but not a copy of the quantity -// free := lvg.Status.VGSize -// free.Sub(lvg.Status.AllocatedSize) -// return free -//} - -//func getThinPoolFreeSpace(tp *v1alpha1.LvmVolumeGroupThinPoolStatus) resource.Quantity { -// // notice that .Sub method uses pointer but not a copy of the quantity -// free := tp.ActualSize -// free.Sub(tp.UsedSize) -// -// return free -//} - func getPersistentVolumes(ctx context.Context, cl client.Client) (map[string]corev1.PersistentVolume, error) { pvs := &corev1.PersistentVolumeList{} err := cl.List(ctx, pvs) diff --git a/images/sds-local-volume-scheduler-extender/pkg/scheduler/filter_test.go b/images/sds-local-volume-scheduler-extender/pkg/scheduler/filter_test.go index 29b4d5c8..42e1c58f 100644 --- a/images/sds-local-volume-scheduler-extender/pkg/scheduler/filter_test.go +++ b/images/sds-local-volume-scheduler-extender/pkg/scheduler/filter_test.go @@ -20,13 +20,13 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: sc1, }, - Provisioner: sdsLocalVolumeProvisioner, + Provisioner: SdsLocalVolumeProvisioner, }, sc2: { ObjectMeta: metav1.ObjectMeta{ Name: sc2, }, - Provisioner: sdsLocalVolumeProvisioner, + Provisioner: SdsLocalVolumeProvisioner, }, sc3: { ObjectMeta: metav1.ObjectMeta{ diff --git a/images/sds-local-volume-scheduler-extender/pkg/scheduler/prioritize.go b/images/sds-local-volume-scheduler-extender/pkg/scheduler/prioritize.go index 9e7cb459..c4e3f4b1 100644 --- a/images/sds-local-volume-scheduler-extender/pkg/scheduler/prioritize.go +++ b/images/sds-local-volume-scheduler-extender/pkg/scheduler/prioritize.go @@ -23,6 +23,7 @@ import ( "math" "net/http" "sds-local-volume-scheduler-extender/pkg/cache" + "sds-local-volume-scheduler-extender/pkg/consts" "sds-local-volume-scheduler-extender/pkg/logger" "sync" @@ -156,9 +157,9 @@ func scoreNodes( var freeSpace resource.Quantity lvg := lvgs[commonLVG.Name] switch pvcReq.DeviceType { - case thick: + case consts.Thick: freeSpace = lvg.Status.VGFree - log.Trace(fmt.Sprintf("[scoreNodes] LVMVolumeGroup %s free thick space before PVC reservation: %s", lvg.Name, freeSpace.String())) + log.Trace(fmt.Sprintf("[scoreNodes] LVMVolumeGroup %s free Thick space before PVC reservation: %s", lvg.Name, freeSpace.String())) reserved, err := schedulerCache.GetLVGThickReservedSpace(lvg.Name) if err != nil { log.Error(err, fmt.Sprintf("[scoreNodes] unable to count reserved space for the LVMVolumeGroup %s", lvg.Name)) @@ -167,8 +168,8 @@ func scoreNodes( log.Trace(fmt.Sprintf("[scoreNodes] LVMVolumeGroup %s PVC Space reservation: %s", lvg.Name, resource.NewQuantity(reserved, resource.BinarySI))) spaceWithReserved := freeSpace.Value() - reserved freeSpace = *resource.NewQuantity(spaceWithReserved, resource.BinarySI) - log.Trace(fmt.Sprintf("[scoreNodes] LVMVolumeGroup %s free thick space after PVC reservation: %s", lvg.Name, freeSpace.String())) - case thin: + log.Trace(fmt.Sprintf("[scoreNodes] LVMVolumeGroup %s free Thick space after PVC reservation: %s", lvg.Name, freeSpace.String())) + case consts.Thin: thinPool := findMatchedThinPool(lvg.Status.ThinPools, commonLVG.Thin.PoolName) if thinPool == nil { err = errors.New(fmt.Sprintf("unable to match Storage Class's ThinPools with the node's one, Storage Class: %s, node: %s", *pvc.Spec.StorageClassName, node.Name)) diff --git a/images/sds-local-volume-scheduler-extender/pkg/scheduler/route.go b/images/sds-local-volume-scheduler-extender/pkg/scheduler/route.go index 5d0213d8..8d237701 100644 --- a/images/sds-local-volume-scheduler-extender/pkg/scheduler/route.go +++ b/images/sds-local-volume-scheduler-extender/pkg/scheduler/route.go @@ -95,7 +95,7 @@ func (s *scheduler) getCache(w http.ResponseWriter, r *http.Request) { lvgs := s.cache.GetAllLVG() for _, lvg := range lvgs { - pvcs, err := s.cache.GetAllThickPVCForLVG(lvg.Name) + pvcs, err := s.cache.GetAllPVCForLVG(lvg.Name) if err != nil { w.WriteHeader(http.StatusInternalServerError) s.log.Error(err, "something bad") @@ -168,7 +168,7 @@ func (s *scheduler) getCacheStat(w http.ResponseWriter, r *http.Request) { pvcTotalCount := 0 lvgs := s.cache.GetAllLVG() for _, lvg := range lvgs { - pvcs, err := s.cache.GetAllThickPVCForLVG(lvg.Name) + pvcs, err := s.cache.GetAllPVCForLVG(lvg.Name) if err != nil { s.log.Error(err, "something bad") }