diff --git a/images/sds-local-volume-controller/api/v1alpha1/lvm_volume_group.go b/images/sds-local-volume-controller/api/v1alpha1/lvm_volume_group.go index d931267f..70900f72 100644 --- a/images/sds-local-volume-controller/api/v1alpha1/lvm_volume_group.go +++ b/images/sds-local-volume-controller/api/v1alpha1/lvm_volume_group.go @@ -1,9 +1,12 @@ /* Copyright 2024 Flant JSC + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -33,16 +36,24 @@ type LvmVolumeGroup struct { Status LvmVolumeGroupStatus `json:"status,omitempty"` } -type SpecThinPool struct { - Name string `json:"name"` - Size resource.Quantity `json:"size"` +type LvmVolumeGroupSpec struct { + ActualVGNameOnTheNode string `json:"actualVGNameOnTheNode"` + BlockDeviceNames []string `json:"blockDeviceNames"` + ThinPools []LvmVolumeGroupThinPoolSpec `json:"thinPools"` + Type string `json:"type"` } -type LvmVolumeGroupSpec struct { - ActualVGNameOnTheNode string `json:"actualVGNameOnTheNode"` - BlockDeviceNames []string `json:"blockDeviceNames"` - ThinPools []SpecThinPool `json:"thinPools"` - Type string `json:"type"` +type LvmVolumeGroupStatus struct { + AllocatedSize resource.Quantity `json:"allocatedSize"` + Nodes []LvmVolumeGroupNode `json:"nodes"` + ThinPools []LvmVolumeGroupThinPoolStatus `json:"thinPools"` + VGSize resource.Quantity `json:"vgSize"` + VGUuid string `json:"vgUUID"` + Phase string `json:"phase"` + Conditions []metav1.Condition `json:"conditions"` + ThinPoolReady string `json:"thinPoolReady"` + ConfigurationApplied string `json:"configurationApplied"` + VGFree resource.Quantity `json:"vgFree"` } type LvmVolumeGroupDevice struct { @@ -58,18 +69,19 @@ type LvmVolumeGroupNode struct { Name string `json:"name"` } -type StatusThinPool struct { - Name string `json:"name"` - ActualSize resource.Quantity `json:"actualSize"` - UsedSize resource.Quantity `json:"usedSize"` +type LvmVolumeGroupThinPoolStatus struct { + Name string `json:"name"` + ActualSize resource.Quantity `json:"actualSize"` + UsedSize resource.Quantity `json:"usedSize"` + AllocatedSize resource.Quantity `json:"allocatedSize"` + AvailableSpace resource.Quantity `json:"availableSpace"` + AllocationLimit string `json:"allocationLimit"` + Ready bool `json:"ready"` + Message string `json:"message"` } -type LvmVolumeGroupStatus struct { - AllocatedSize resource.Quantity `json:"allocatedSize"` - Health string `json:"health"` - Message string `json:"message"` - Nodes []LvmVolumeGroupNode `json:"nodes"` - ThinPools []StatusThinPool `json:"thinPools"` - VGSize resource.Quantity `json:"vgSize"` - VGUuid string `json:"vgUUID"` +type LvmVolumeGroupThinPoolSpec struct { + Name string `json:"name"` + Size resource.Quantity `json:"size"` + AllocationLimit string `json:"allocationLimit"` } diff --git a/images/sds-local-volume-csi/Dockerfile b/images/sds-local-volume-csi/Dockerfile index 5b781991..04b605c2 100644 --- a/images/sds-local-volume-csi/Dockerfile +++ b/images/sds-local-volume-csi/Dockerfile @@ -1,7 +1,7 @@ ARG BASE_ALPINE=registry.deckhouse.io/base_images/alpine:3.16.3@sha256:5548e9172c24a1b0ca9afdd2bf534e265c94b12b36b3e0c0302f5853eaf00abb -ARG BASE_GOLANG_21_ALPINE_BUILDER=registry.deckhouse.io/base_images/golang:1.21.4-alpine3.18@sha256:cf84f3d6882c49ea04b6478ac514a2582c8922d7e5848b43d2918fff8329f6e6 +ARG BASE_GOLANG_ALPINE_BUILDER=registry.deckhouse.io/base_images/golang:1.21.4-alpine3.18@sha256:cf84f3d6882c49ea04b6478ac514a2582c8922d7e5848b43d2918fff8329f6e6 -FROM $BASE_GOLANG_21_ALPINE_BUILDER as builder +FROM $BASE_GOLANG_ALPINE_BUILDER as builder WORKDIR /go/src diff --git a/images/sds-local-volume-csi/api/v1alpha1/lvm_volume_group.go b/images/sds-local-volume-csi/api/v1alpha1/lvm_volume_group.go index 4e346fc2..70900f72 100644 --- a/images/sds-local-volume-csi/api/v1alpha1/lvm_volume_group.go +++ b/images/sds-local-volume-csi/api/v1alpha1/lvm_volume_group.go @@ -36,16 +36,24 @@ type LvmVolumeGroup struct { Status LvmVolumeGroupStatus `json:"status,omitempty"` } -type SpecThinPool struct { - Name string `json:"name"` - Size resource.Quantity `json:"size"` +type LvmVolumeGroupSpec struct { + ActualVGNameOnTheNode string `json:"actualVGNameOnTheNode"` + BlockDeviceNames []string `json:"blockDeviceNames"` + ThinPools []LvmVolumeGroupThinPoolSpec `json:"thinPools"` + Type string `json:"type"` } -type LvmVolumeGroupSpec struct { - ActualVGNameOnTheNode string `json:"actualVGNameOnTheNode"` - BlockDeviceNames []string `json:"blockDeviceNames"` - ThinPools []SpecThinPool `json:"thinPools"` - Type string `json:"type"` +type LvmVolumeGroupStatus struct { + AllocatedSize resource.Quantity `json:"allocatedSize"` + Nodes []LvmVolumeGroupNode `json:"nodes"` + ThinPools []LvmVolumeGroupThinPoolStatus `json:"thinPools"` + VGSize resource.Quantity `json:"vgSize"` + VGUuid string `json:"vgUUID"` + Phase string `json:"phase"` + Conditions []metav1.Condition `json:"conditions"` + ThinPoolReady string `json:"thinPoolReady"` + ConfigurationApplied string `json:"configurationApplied"` + VGFree resource.Quantity `json:"vgFree"` } type LvmVolumeGroupDevice struct { @@ -61,18 +69,19 @@ type LvmVolumeGroupNode struct { Name string `json:"name"` } -type StatusThinPool struct { - Name string `json:"name"` - ActualSize resource.Quantity `json:"actualSize"` - UsedSize resource.Quantity `json:"usedSize"` +type LvmVolumeGroupThinPoolStatus struct { + Name string `json:"name"` + ActualSize resource.Quantity `json:"actualSize"` + UsedSize resource.Quantity `json:"usedSize"` + AllocatedSize resource.Quantity `json:"allocatedSize"` + AvailableSpace resource.Quantity `json:"availableSpace"` + AllocationLimit string `json:"allocationLimit"` + Ready bool `json:"ready"` + Message string `json:"message"` } -type LvmVolumeGroupStatus struct { - AllocatedSize resource.Quantity `json:"allocatedSize"` - Health string `json:"health"` - Message string `json:"message"` - Nodes []LvmVolumeGroupNode `json:"nodes"` - ThinPools []StatusThinPool `json:"thinPools"` - VGSize resource.Quantity `json:"vgSize"` - VGUuid string `json:"vgUUID"` +type LvmVolumeGroupThinPoolSpec struct { + Name string `json:"name"` + Size resource.Quantity `json:"size"` + AllocationLimit string `json:"allocationLimit"` } diff --git a/images/sds-local-volume-csi/cmd/main.go b/images/sds-local-volume-csi/cmd/main.go index 6a999c0d..06b6edaa 100644 --- a/images/sds-local-volume-csi/cmd/main.go +++ b/images/sds-local-volume-csi/cmd/main.go @@ -56,7 +56,7 @@ func main() { cfgParams, err := config.NewConfig() if err != nil { - klog.Fatal("unable to create NewConfig") + klog.Fatalf("unable to create NewConfig, err: %s", err.Error()) } var ( diff --git a/images/sds-local-volume-csi/pkg/utils/func.go b/images/sds-local-volume-csi/pkg/utils/func.go index 6f084873..7408ed9e 100644 --- a/images/sds-local-volume-csi/pkg/utils/func.go +++ b/images/sds-local-volume-csi/pkg/utils/func.go @@ -178,7 +178,7 @@ func GetNodeWithMaxFreeSpace(lvgs []v1alpha1.LvmVolumeGroup, storageClassLVGPara switch lvmType { case internal.LVMTypeThick: - freeSpace = GetLVMVolumeGroupFreeSpace(lvg) + freeSpace = lvg.Status.VGFree case internal.LVMTypeThin: thinPoolName, ok := storageClassLVGParametersMap[lvg.Name] if !ok { @@ -199,6 +199,7 @@ func GetNodeWithMaxFreeSpace(lvgs []v1alpha1.LvmVolumeGroup, storageClassLVGPara return nodeName, *resource.NewQuantity(maxFreeSpace, resource.BinarySI), nil } +// TODO: delete the method below? func GetLVMVolumeGroupParams(ctx context.Context, kc client.Client, log logger.Logger, lvmVG map[string]string, nodeName, LvmType string) (lvgName, vgName string, err error) { listLvgs := &v1alpha1.LvmVolumeGroupList{ TypeMeta: metav1.TypeMeta{ @@ -273,7 +274,7 @@ func GetLVMVolumeGroupFreeSpace(lvg v1alpha1.LvmVolumeGroup) (vgFreeSpace resour } func GetLVMThinPoolFreeSpace(lvg v1alpha1.LvmVolumeGroup, thinPoolName string) (thinPoolFreeSpace resource.Quantity, err error) { - var storagePoolThinPool *v1alpha1.StatusThinPool + var storagePoolThinPool *v1alpha1.LvmVolumeGroupThinPoolStatus for _, thinPool := range lvg.Status.ThinPools { if thinPool.Name == thinPoolName { storagePoolThinPool = &thinPool @@ -285,11 +286,7 @@ func GetLVMThinPoolFreeSpace(lvg v1alpha1.LvmVolumeGroup, thinPoolName string) ( return thinPoolFreeSpace, fmt.Errorf("[GetLVMThinPoolFreeSpace] thin pool %s not found in lvg %+v", thinPoolName, lvg) } - thinPoolActualSize := storagePoolThinPool.ActualSize - - thinPoolFreeSpace = thinPoolActualSize.DeepCopy() - thinPoolFreeSpace.Sub(storagePoolThinPool.UsedSize) - return thinPoolFreeSpace, nil + return storagePoolThinPool.AvailableSpace, nil } func UpdateLVMLogicalVolume(ctx context.Context, kc client.Client, llv *v1alpha1.LVMLogicalVolume) error { diff --git a/images/sds-local-volume-scheduler-extender/api/v1alpha1/lvm_volume_group.go b/images/sds-local-volume-scheduler-extender/api/v1alpha1/lvm_volume_group.go index d931267f..70900f72 100644 --- a/images/sds-local-volume-scheduler-extender/api/v1alpha1/lvm_volume_group.go +++ b/images/sds-local-volume-scheduler-extender/api/v1alpha1/lvm_volume_group.go @@ -1,9 +1,12 @@ /* Copyright 2024 Flant JSC + Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 + Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -33,16 +36,24 @@ type LvmVolumeGroup struct { Status LvmVolumeGroupStatus `json:"status,omitempty"` } -type SpecThinPool struct { - Name string `json:"name"` - Size resource.Quantity `json:"size"` +type LvmVolumeGroupSpec struct { + ActualVGNameOnTheNode string `json:"actualVGNameOnTheNode"` + BlockDeviceNames []string `json:"blockDeviceNames"` + ThinPools []LvmVolumeGroupThinPoolSpec `json:"thinPools"` + Type string `json:"type"` } -type LvmVolumeGroupSpec struct { - ActualVGNameOnTheNode string `json:"actualVGNameOnTheNode"` - BlockDeviceNames []string `json:"blockDeviceNames"` - ThinPools []SpecThinPool `json:"thinPools"` - Type string `json:"type"` +type LvmVolumeGroupStatus struct { + AllocatedSize resource.Quantity `json:"allocatedSize"` + Nodes []LvmVolumeGroupNode `json:"nodes"` + ThinPools []LvmVolumeGroupThinPoolStatus `json:"thinPools"` + VGSize resource.Quantity `json:"vgSize"` + VGUuid string `json:"vgUUID"` + Phase string `json:"phase"` + Conditions []metav1.Condition `json:"conditions"` + ThinPoolReady string `json:"thinPoolReady"` + ConfigurationApplied string `json:"configurationApplied"` + VGFree resource.Quantity `json:"vgFree"` } type LvmVolumeGroupDevice struct { @@ -58,18 +69,19 @@ type LvmVolumeGroupNode struct { Name string `json:"name"` } -type StatusThinPool struct { - Name string `json:"name"` - ActualSize resource.Quantity `json:"actualSize"` - UsedSize resource.Quantity `json:"usedSize"` +type LvmVolumeGroupThinPoolStatus struct { + Name string `json:"name"` + ActualSize resource.Quantity `json:"actualSize"` + UsedSize resource.Quantity `json:"usedSize"` + AllocatedSize resource.Quantity `json:"allocatedSize"` + AvailableSpace resource.Quantity `json:"availableSpace"` + AllocationLimit string `json:"allocationLimit"` + Ready bool `json:"ready"` + Message string `json:"message"` } -type LvmVolumeGroupStatus struct { - AllocatedSize resource.Quantity `json:"allocatedSize"` - Health string `json:"health"` - Message string `json:"message"` - Nodes []LvmVolumeGroupNode `json:"nodes"` - ThinPools []StatusThinPool `json:"thinPools"` - VGSize resource.Quantity `json:"vgSize"` - VGUuid string `json:"vgUUID"` +type LvmVolumeGroupThinPoolSpec struct { + Name string `json:"name"` + Size resource.Quantity `json:"size"` + AllocationLimit string `json:"allocationLimit"` } diff --git a/images/sds-local-volume-scheduler-extender/pkg/cache/cache.go b/images/sds-local-volume-scheduler-extender/pkg/cache/cache.go index 32ffbca6..e10a7c45 100644 --- a/images/sds-local-volume-scheduler-extender/pkg/cache/cache.go +++ b/images/sds-local-volume-scheduler-extender/pkg/cache/cache.go @@ -1,10 +1,12 @@ package cache import ( + "errors" "fmt" v1 "k8s.io/api/core/v1" slices2 "k8s.io/utils/strings/slices" "sds-local-volume-scheduler-extender/api/v1alpha1" + "sds-local-volume-scheduler-extender/pkg/consts" "sds-local-volume-scheduler-extender/pkg/logger" "sync" ) @@ -24,7 +26,12 @@ type Cache struct { } type lvgCache struct { - lvg *v1alpha1.LvmVolumeGroup + lvg *v1alpha1.LvmVolumeGroup + thickPVCs sync.Map //map[string]*pvcCache + thinPools sync.Map //map[string]*thinPoolCache +} + +type thinPoolCache struct { pvcs sync.Map //map[string]*pvcCache } @@ -43,8 +50,9 @@ func NewCache(logger logger.Logger) *Cache { // AddLVG adds selected LVMVolumeGroup resource to the cache. If it is already stored, does nothing. func (c *Cache) AddLVG(lvg *v1alpha1.LvmVolumeGroup) { _, loaded := c.lvgs.LoadOrStore(lvg.Name, &lvgCache{ - lvg: lvg, - pvcs: sync.Map{}, + lvg: lvg, + thickPVCs: sync.Map{}, + thinPools: sync.Map{}, }) if loaded { c.log.Debug(fmt.Sprintf("[AddLVG] the LVMVolumeGroup %s has been already added to the cache", lvg.Name)) @@ -66,8 +74,8 @@ func (c *Cache) AddLVG(lvg *v1alpha1.LvmVolumeGroup) { // UpdateLVG updated selected LVMVolumeGroup resource in the cache. If such LVMVolumeGroup is not stored, returns an error. func (c *Cache) UpdateLVG(lvg *v1alpha1.LvmVolumeGroup) error { - if cache, found := c.lvgs.Load(lvg.Name); found { - cache.(*lvgCache).lvg = lvg + if lvgCh, found := c.lvgs.Load(lvg.Name); found { + lvgCh.(*lvgCache).lvg = lvg c.log.Trace(fmt.Sprintf("[UpdateLVG] the LVMVolumeGroup %s nodes: %v", lvg.Name, lvg.Status.Nodes)) for _, node := range lvg.Status.Nodes { @@ -88,7 +96,7 @@ func (c *Cache) UpdateLVG(lvg *v1alpha1.LvmVolumeGroup) error { return nil } - return fmt.Errorf("the LVMVolumeGroup %s was not found in the cache", lvg.Name) + return fmt.Errorf("the LVMVolumeGroup %s was not found in the lvgCh", lvg.Name) } // TryGetLVG returns selected LVMVolumeGroup resource if it is stored in the cache, otherwise returns nil. @@ -129,16 +137,39 @@ func (c *Cache) GetAllLVG() map[string]*v1alpha1.LvmVolumeGroup { return lvgs } -// GetLVGReservedSpace returns a sum of reserved space by every PVC in the selected LVMVolumeGroup resource. If such LVMVolumeGroup resource is not stored, returns an error. -func (c *Cache) GetLVGReservedSpace(lvgName string) (int64, error) { +// GetLVGThickReservedSpace returns a sum of reserved space by every thick PVC in the selected LVMVolumeGroup resource. If such LVMVolumeGroup resource is not stored, returns an error. +func (c *Cache) GetLVGThickReservedSpace(lvgName string) (int64, error) { lvg, found := c.lvgs.Load(lvgName) if !found { - c.log.Debug(fmt.Sprintf("[GetLVGReservedSpace] the LVMVolumeGroup %s was not found in the cache. Returns 0", lvgName)) + c.log.Debug(fmt.Sprintf("[GetLVGThickReservedSpace] the LVMVolumeGroup %s was not found in the cache. Returns 0", lvgName)) + return 0, nil + } + + var space int64 + lvg.(*lvgCache).thickPVCs.Range(func(pvcName, pvcCh any) bool { + space += pvcCh.(*pvcCache).pvc.Spec.Resources.Requests.Storage().Value() + return true + }) + + return space, nil +} + +// GetLVGThinReservedSpace returns a sum of reserved space by every thin PVC in the selected LVMVolumeGroup resource. If such LVMVolumeGroup resource is not stored, returns an error. +func (c *Cache) GetLVGThinReservedSpace(lvgName string, thinPoolName string) (int64, error) { + lvgCh, found := c.lvgs.Load(lvgName) + if !found { + c.log.Debug(fmt.Sprintf("[GetLVGThinReservedSpace] the LVMVolumeGroup %s was not found in the cache. Returns 0", lvgName)) + return 0, nil + } + + thinPool, found := lvgCh.(*lvgCache).thinPools.Load(thinPoolName) + if !found { + c.log.Debug(fmt.Sprintf("[GetLVGThinReservedSpace] the Thin pool %s of the LVMVolumeGroup %s was not found in the cache. Returns 0", lvgName, thinPoolName)) return 0, nil } var space int64 - lvg.(*lvgCache).pvcs.Range(func(pvcName, pvcCh any) bool { + thinPool.(*thinPoolCache).pvcs.Range(func(pvcName, pvcCh any) bool { space += pvcCh.(*pvcCache).pvc.Spec.Resources.Requests.Storage().Value() return true }) @@ -171,11 +202,11 @@ func (c *Cache) DeleteLVG(lvgName string) { }) } -// AddPVC adds selected PVC to selected LVMVolumeGroup resource. If the LVMVolumeGroup resource is not stored, returns an error. +// AddThickPVC adds selected PVC to selected LVMVolumeGroup resource. If the LVMVolumeGroup resource is not stored, returns an error. // If selected PVC is already stored in the cache, does nothing. -func (c *Cache) AddPVC(lvgName string, pvc *v1.PersistentVolumeClaim) error { +func (c *Cache) AddThickPVC(lvgName string, pvc *v1.PersistentVolumeClaim) error { if pvc.Status.Phase == v1.ClaimBound { - c.log.Warning(fmt.Sprintf("[AddPVC] PVC %s/%s has status phase BOUND. It will not be added to the cache", pvc.Namespace, pvc.Name)) + c.log.Warning(fmt.Sprintf("[AddThickPVC] PVC %s/%s has status phase BOUND. It will not be added to the cache", pvc.Namespace, pvc.Name)) return nil } @@ -184,59 +215,170 @@ func (c *Cache) AddPVC(lvgName string, pvc *v1.PersistentVolumeClaim) error { lvgCh, found := c.lvgs.Load(lvgName) if !found { err := fmt.Errorf("the LVMVolumeGroup %s was not found in the cache", lvgName) - c.log.Error(err, fmt.Sprintf("[AddPVC] an error occured while trying to add PVC %s to the cache", pvcKey)) + c.log.Error(err, fmt.Sprintf("[AddThickPVC] an error occured while trying to add PVC %s to the cache", pvcKey)) + return err + } + + // this case might be triggered if the extender recovers after fail and finds some pending thickPVCs with selected nodes + c.log.Trace(fmt.Sprintf("[AddThickPVC] PVC %s/%s annotations: %v", pvc.Namespace, pvc.Name, pvc.Annotations)) + + shouldAdd, err := c.shouldAddPVC(pvc, lvgCh.(*lvgCache), pvcKey, lvgName, "") + if err != nil { return err } - // this case might be triggered if the extender recovers after fail and finds some pending pvcs with selected nodes - c.log.Trace(fmt.Sprintf("[AddPVC] PVC %s/%s annotations: %v", pvc.Namespace, pvc.Name, pvc.Annotations)) + if !shouldAdd { + c.log.Debug(fmt.Sprintf("[AddThickPVC] PVC %s should not be added", pvcKey)) + return nil + } + + c.log.Debug(fmt.Sprintf("[AddThickPVC] new PVC %s cache will be added to the LVMVolumeGroup %s", pvcKey, lvgName)) + c.addNewThickPVC(lvgCh.(*lvgCache), pvc) + + return nil +} + +func (c *Cache) shouldAddPVC(pvc *v1.PersistentVolumeClaim, lvgCh *lvgCache, pvcKey, lvgName, thinPoolName string) (bool, error) { if pvc.Annotations[SelectedNodeAnnotation] != "" { - c.log.Debug(fmt.Sprintf("[AddPVC] PVC %s/%s has selected node anotation, selected node: %s", pvc.Namespace, pvc.Name, pvc.Annotations[SelectedNodeAnnotation])) + c.log.Debug(fmt.Sprintf("[shouldAddPVC] PVC %s/%s has selected node anotation, selected node: %s", pvc.Namespace, pvc.Name, pvc.Annotations[SelectedNodeAnnotation])) lvgsOnTheNode, found := c.nodeLVGs.Load(pvc.Annotations[SelectedNodeAnnotation]) if !found { err := fmt.Errorf("no LVMVolumeGroups found for the node %s", pvc.Annotations[SelectedNodeAnnotation]) - c.log.Error(err, fmt.Sprintf("[AddPVC] an error occured while trying to add PVC %s to the cache", pvcKey)) - return err + c.log.Error(err, fmt.Sprintf("[shouldAddPVC] an error occured while trying to add PVC %s to the cache", pvcKey)) + return false, err } if !slices2.Contains(lvgsOnTheNode.([]string), lvgName) { - c.log.Debug(fmt.Sprintf("[AddPVC] LVMVolumeGroup %s does not belong to PVC %s/%s selected node %s. It will be skipped", lvgName, pvc.Namespace, pvc.Name, pvc.Annotations[SelectedNodeAnnotation])) - return nil + c.log.Debug(fmt.Sprintf("[shouldAddPVC] LVMVolumeGroup %s does not belong to PVC %s/%s selected node %s. It will be skipped", lvgName, pvc.Namespace, pvc.Name, pvc.Annotations[SelectedNodeAnnotation])) + return false, nil } - c.log.Debug(fmt.Sprintf("[AddPVC] LVMVolumeGroup %s belongs to PVC %s/%s selected node %s", lvgName, pvc.Namespace, pvc.Name, pvc.Annotations[SelectedNodeAnnotation])) + c.log.Debug(fmt.Sprintf("[shouldAddPVC] LVMVolumeGroup %s belongs to PVC %s/%s selected node %s", lvgName, pvc.Namespace, pvc.Name, pvc.Annotations[SelectedNodeAnnotation])) - _, found = lvgCh.(*lvgCache).pvcs.Load(pvcKey) + // if pvc is thick + _, found = lvgCh.thickPVCs.Load(pvcKey) if found { - c.log.Warning(fmt.Sprintf("[AddPVC] PVC %s cache has been already added to the LVMVolumeGroup %s", pvcKey, lvgName)) - return nil + c.log.Debug(fmt.Sprintf("[shouldAddPVC] PVC %s was found in the cache of the LVMVolumeGroup %s", pvcKey, lvgName)) + return false, nil + } + + // if pvc is thin + if thinPoolName != "" { + thinPoolCh, found := lvgCh.thinPools.Load(thinPoolName) + if !found { + c.log.Debug(fmt.Sprintf("[shouldAddPVC] Thin pool %s was not found in the cache, PVC %s should be added", thinPoolName, pvcKey)) + return true, nil + } + + if _, found = thinPoolCh.(*thinPoolCache).pvcs.Load(pvcKey); found { + c.log.Debug(fmt.Sprintf("[shouldAddPVC] PVC %s was found in the Thin pool %s cache of the LVMVolumeGroup %s. No need to add", pvcKey, thinPoolName, lvgName)) + return false, nil + } } } - c.log.Debug(fmt.Sprintf("[AddPVC] new PVC %s cache will be added to the LVMVolumeGroup %s", pvcKey, lvgName)) - c.addNewPVC(lvgCh.(*lvgCache), pvc) + return true, nil +} + +func (c *Cache) AddThinPVC(lvgName, thinPoolName string, pvc *v1.PersistentVolumeClaim) error { + if pvc.Status.Phase == v1.ClaimBound { + c.log.Warning(fmt.Sprintf("[AddThinPVC] PVC %s/%s has status phase BOUND. It will not be added to the cache", pvc.Namespace, pvc.Name)) + return nil + } + + pvcKey := configurePVCKey(pvc) + + lvgCh, found := c.lvgs.Load(lvgName) + if !found { + err := fmt.Errorf("the LVMVolumeGroup %s was not found in the cache", lvgName) + c.log.Error(err, fmt.Sprintf("[AddThinPVC] an error occured while trying to add PVC %s to the cache", pvcKey)) + return err + } + + thinPoolBelongs := c.checkIfThinPoolBelongsToLVG(lvgCh.(*lvgCache), thinPoolName) + if !thinPoolBelongs { + err := fmt.Errorf("thin pool %s was not found in the LVMVolumeGroup %s", thinPoolName, lvgName) + c.log.Error(err, fmt.Sprintf("[AddThinPVC] unable to add Thin pool %s of the LVMVolumeGroup %s for the PVC %s", thinPoolName, lvgName, pvcKey)) + return err + } + + // this case might be triggered if the extender recovers after fail and finds some pending thin PVCs with selected nodes + c.log.Trace(fmt.Sprintf("[AddThinPVC] PVC %s/%s annotations: %v", pvc.Namespace, pvc.Name, pvc.Annotations)) + shouldAdd, err := c.shouldAddPVC(pvc, lvgCh.(*lvgCache), pvcKey, lvgName, thinPoolName) + if err != nil { + return err + } + + if !shouldAdd { + c.log.Debug(fmt.Sprintf("[AddThinPVC] PVC %s should not be added", pvcKey)) + return nil + } + + c.log.Debug(fmt.Sprintf("[AddThinPVC] new PVC %s cache will be added to the LVMVolumeGroup %s", pvcKey, lvgName)) + err = c.addNewThinPVC(lvgCh.(*lvgCache), pvc, thinPoolName) + if err != nil { + c.log.Error(err, fmt.Sprintf("[AddThinPVC] unable to add PVC %s to Thin Pool %s of the LVMVolumeGroup %s", pvcKey, thinPoolName, lvgName)) + return err + } return nil } -func (c *Cache) addNewPVC(lvgCh *lvgCache, pvc *v1.PersistentVolumeClaim) { +func (c *Cache) checkIfThinPoolBelongsToLVG(lvgCh *lvgCache, thinPoolName string) bool { + for _, tp := range lvgCh.lvg.Status.ThinPools { + if tp.Name == thinPoolName { + return true + } + } + + return false +} + +func (c *Cache) addNewThickPVC(lvgCh *lvgCache, pvc *v1.PersistentVolumeClaim) { pvcKey := configurePVCKey(pvc) - lvgCh.pvcs.Store(pvcKey, &pvcCache{pvc: pvc, selectedNode: pvc.Annotations[SelectedNodeAnnotation]}) + lvgCh.thickPVCs.Store(pvcKey, &pvcCache{pvc: pvc, selectedNode: pvc.Annotations[SelectedNodeAnnotation]}) + c.addLVGToPVC(lvgCh.lvg.Name, pvcKey) +} + +func (c *Cache) addNewThinPVC(lvgCh *lvgCache, pvc *v1.PersistentVolumeClaim, thinPoolName string) error { + pvcKey := configurePVCKey(pvc) + + err := c.addThinPoolIfNotExists(lvgCh, thinPoolName) + if err != nil { + c.log.Error(err, fmt.Sprintf("[addNewThinPVC] unable to add Thin pool %s in the LVMVolumeGroup %s cache for PVC %s", thinPoolName, lvgCh.lvg.Name, pvc.Name)) + return err + } + + thinPoolCh, found := lvgCh.thinPools.Load(thinPoolName) + if !found { + err = fmt.Errorf("thin pool %s not found", thinPoolName) + c.log.Error(err, fmt.Sprintf("[addNewThinPVC] unable to add Thin PVC %s to the cache", pvcKey)) + return err + } + + thinPoolCh.(*thinPoolCache).pvcs.Store(pvcKey, &pvcCache{pvc: pvc, selectedNode: pvc.Annotations[SelectedNodeAnnotation]}) + c.log.Debug(fmt.Sprintf("[addNewThinPVC] THIN PVC %s was added to the cache to Thin Pool %s", pvcKey, thinPoolName)) + + c.addLVGToPVC(lvgCh.lvg.Name, pvcKey) + return nil +} + +func (c *Cache) addLVGToPVC(lvgName, pvcKey string) { lvgsForPVC, found := c.pvcLVGs.Load(pvcKey) if !found || lvgsForPVC == nil { lvgsForPVC = make([]string, 0, lvgsPerPVCCount) } - c.log.Trace(fmt.Sprintf("[addNewPVC] LVMVolumeGroups from the cache for PVC %s before append: %v", pvcKey, lvgsForPVC)) - lvgsForPVC = append(lvgsForPVC.([]string), lvgCh.lvg.Name) - c.log.Trace(fmt.Sprintf("[addNewPVC] LVMVolumeGroups from the cache for PVC %s after append: %v", pvcKey, lvgsForPVC)) + c.log.Trace(fmt.Sprintf("[addLVGToPVC] LVMVolumeGroups from the cache for PVC %s before append: %v", pvcKey, lvgsForPVC)) + lvgsForPVC = append(lvgsForPVC.([]string), lvgName) + c.log.Trace(fmt.Sprintf("[addLVGToPVC] LVMVolumeGroups from the cache for PVC %s after append: %v", pvcKey, lvgsForPVC)) c.pvcLVGs.Store(pvcKey, lvgsForPVC) } -// UpdatePVC updates selected PVC in selected LVMVolumeGroup resource. If no such PVC is stored in the cache, adds it. -func (c *Cache) UpdatePVC(lvgName string, pvc *v1.PersistentVolumeClaim) error { +// UpdateThickPVC updates selected PVC in selected LVMVolumeGroup resource. If no such PVC is stored in the cache, adds it. +func (c *Cache) UpdateThickPVC(lvgName string, pvc *v1.PersistentVolumeClaim) error { pvcKey := configurePVCKey(pvc) lvgCh, found := c.lvgs.Load(lvgName) @@ -244,12 +386,12 @@ func (c *Cache) UpdatePVC(lvgName string, pvc *v1.PersistentVolumeClaim) error { return fmt.Errorf("the LVMVolumeGroup %s was not found in the cache", lvgName) } - pvcCh, found := lvgCh.(*lvgCache).pvcs.Load(pvcKey) + pvcCh, found := lvgCh.(*lvgCache).thickPVCs.Load(pvcKey) if !found { - c.log.Warning(fmt.Sprintf("[UpdatePVC] PVC %s was not found in the cache for the LVMVolumeGroup %s. It will be added", pvcKey, lvgName)) - err := c.AddPVC(lvgName, pvc) + c.log.Warning(fmt.Sprintf("[UpdateThickPVC] PVC %s was not found in the cache for the LVMVolumeGroup %s. It will be added", pvcKey, lvgName)) + err := c.AddThickPVC(lvgName, pvc) if err != nil { - c.log.Error(err, fmt.Sprintf("[UpdatePVC] an error occurred while trying to update the PVC %s", pvcKey)) + c.log.Error(err, fmt.Sprintf("[UpdateThickPVC] an error occurred while trying to update the PVC %s", pvcKey)) return err } return nil @@ -257,11 +399,64 @@ func (c *Cache) UpdatePVC(lvgName string, pvc *v1.PersistentVolumeClaim) error { pvcCh.(*pvcCache).pvc = pvc pvcCh.(*pvcCache).selectedNode = pvc.Annotations[SelectedNodeAnnotation] - c.log.Debug(fmt.Sprintf("[UpdatePVC] successfully updated PVC %s with selected node %s in the cache for LVMVolumeGroup %s", pvcKey, pvc.Annotations[SelectedNodeAnnotation], lvgName)) + c.log.Debug(fmt.Sprintf("[UpdateThickPVC] successfully updated PVC %s with selected node %s in the cache for LVMVolumeGroup %s", pvcKey, pvc.Annotations[SelectedNodeAnnotation], lvgName)) return nil } +func (c *Cache) UpdateThinPVC(lvgName, thinPoolName string, pvc *v1.PersistentVolumeClaim) error { + pvcKey := configurePVCKey(pvc) + + lvgCh, found := c.lvgs.Load(lvgName) + if !found { + return fmt.Errorf("the LVMVolumeGroup %s was not found in the cache", lvgName) + } + + thinPoolCh, found := lvgCh.(*lvgCache).thinPools.Load(thinPoolName) + if !found { + c.log.Debug(fmt.Sprintf("[UpdateThinPVC] Thin Pool %s was not found in the LVMVolumeGroup %s, add it.", thinPoolName, lvgName)) + err := c.addThinPoolIfNotExists(lvgCh.(*lvgCache), thinPoolName) + if err != nil { + return err + } + thinPoolCh, _ = lvgCh.(*lvgCache).thinPools.Load(thinPoolName) + } + + pvcCh, found := thinPoolCh.(*thinPoolCache).pvcs.Load(pvcKey) + if !found { + c.log.Warning(fmt.Sprintf("[UpdateThinPVC] Thin PVC %s was not found in Thin pool %s in the cache for the LVMVolumeGroup %s. It will be added", pvcKey, thinPoolName, lvgName)) + err := c.addNewThinPVC(lvgCh.(*lvgCache), pvc, thinPoolName) + if err != nil { + c.log.Error(err, fmt.Sprintf("[UpdateThinPVC] an error occurred while trying to update the PVC %s", pvcKey)) + return err + } + return nil + } + + pvcCh.(*pvcCache).pvc = pvc + pvcCh.(*pvcCache).selectedNode = pvc.Annotations[SelectedNodeAnnotation] + c.log.Debug(fmt.Sprintf("[UpdateThinPVC] successfully updated THIN PVC %s with selected node %s in the cache for LVMVolumeGroup %s", pvcKey, pvc.Annotations[SelectedNodeAnnotation], lvgName)) + + return nil +} + +func (c *Cache) addThinPoolIfNotExists(lvgCh *lvgCache, thinPoolName string) error { + if len(thinPoolName) == 0 { + err := errors.New("no thin pool name specified") + c.log.Error(err, fmt.Sprintf("[addThinPoolIfNotExists] unable to add thin pool in the LVMVolumeGroup %s", lvgCh.lvg.Name)) + return err + } + + _, found := lvgCh.thinPools.Load(thinPoolName) + if found { + c.log.Debug(fmt.Sprintf("[addThinPoolIfNotExists] Thin pool %s is already created in the LVMVolumeGroup %s. No need to add a new one", thinPoolName, lvgCh.lvg.Name)) + return nil + } + + lvgCh.thinPools.Store(thinPoolName, &thinPoolCache{}) + return nil +} + // GetAllPVCForLVG returns slice of PVC belonging to selected LVMVolumeGroup resource. If such LVMVolumeGroup is not stored in the cache, returns an error. func (c *Cache) GetAllPVCForLVG(lvgName string) ([]*v1.PersistentVolumeClaim, error) { lvgCh, found := c.lvgs.Load(lvgName) @@ -271,53 +466,92 @@ func (c *Cache) GetAllPVCForLVG(lvgName string) ([]*v1.PersistentVolumeClaim, er return nil, err } - result := make([]*v1.PersistentVolumeClaim, 0, pvcPerLVGCount) - lvgCh.(*lvgCache).pvcs.Range(func(pvcName, pvcCh any) bool { + // TODO: fix this to struct size field after refactoring + size := 0 + lvgCh.(*lvgCache).thickPVCs.Range(func(key, value any) bool { + size++ + return true + }) + lvgCh.(*lvgCache).thinPools.Range(func(tpName, tpCh any) bool { + tpCh.(*thinPoolCache).pvcs.Range(func(key, value any) bool { + size++ + return true + }) + return true + }) + + result := make([]*v1.PersistentVolumeClaim, 0, size) + // collect Thick PVC for the LVG + lvgCh.(*lvgCache).thickPVCs.Range(func(pvcName, pvcCh any) bool { result = append(result, pvcCh.(*pvcCache).pvc) return true }) + // collect Thin PVC for the LVG + lvgCh.(*lvgCache).thinPools.Range(func(tpName, tpCh any) bool { + tpCh.(*thinPoolCache).pvcs.Range(func(pvcName, pvcCh any) bool { + result = append(result, pvcCh.(*pvcCache).pvc) + return true + }) + return true + }) + return result, nil } -// GetLVGNamesForPVC returns a slice of LVMVolumeGroup resources names, where selected PVC has been stored in. If no such LVMVolumeGroup found, returns empty slice. -func (c *Cache) GetLVGNamesForPVC(pvc *v1.PersistentVolumeClaim) []string { - pvcKey := configurePVCKey(pvc) - lvgNames, found := c.pvcLVGs.Load(pvcKey) +// GetAllThickPVCLVG returns slice of PVC belonging to selected LVMVolumeGroup resource. If such LVMVolumeGroup is not stored in the cache, returns an error. +func (c *Cache) GetAllThickPVCLVG(lvgName string) ([]*v1.PersistentVolumeClaim, error) { + lvgCh, found := c.lvgs.Load(lvgName) if !found { - c.log.Warning(fmt.Sprintf("[GetLVGNamesForPVC] no cached LVMVolumeGroups were found for PVC %s", pvcKey)) - return nil + err := fmt.Errorf("cache was not found for the LVMVolumeGroup %s", lvgName) + c.log.Error(err, fmt.Sprintf("[GetAllPVCForLVG] an error occured while trying to get all PVC for the LVMVolumeGroup %s", lvgName)) + return nil, err } - return lvgNames.([]string) -} + result := make([]*v1.PersistentVolumeClaim, 0, pvcPerLVGCount) + // collect Thick PVC for the LVG + lvgCh.(*lvgCache).thickPVCs.Range(func(pvcName, pvcCh any) bool { + result = append(result, pvcCh.(*pvcCache).pvc) + return true + }) -// RemoveBoundedPVCSpaceReservation removes selected bounded PVC space reservation from a target LVMVolumeGroup resource. If no such LVMVolumeGroup found or PVC -// is not in a Status Bound, returns an error. -func (c *Cache) RemoveBoundedPVCSpaceReservation(lvgName string, pvc *v1.PersistentVolumeClaim) error { - if pvc.Status.Phase != v1.ClaimBound { - return fmt.Errorf("PVC %s/%s not in a Status.Phase Bound", pvc.Namespace, pvc.Name) - } + return result, nil +} - pvcKey := configurePVCKey(pvc) +// GetAllPVCFromLVGThinPool returns slice of PVC belonging to selected LVMVolumeGroup resource. If such LVMVolumeGroup is not stored in the cache, returns an error. +func (c *Cache) GetAllPVCFromLVGThinPool(lvgName, thinPoolName string) ([]*v1.PersistentVolumeClaim, error) { lvgCh, found := c.lvgs.Load(lvgName) if !found { - err := fmt.Errorf("LVMVolumeGroup %s was not found in the cache", lvgName) - c.log.Error(err, fmt.Sprintf("[RemoveBoundedPVCSpaceReservation] an error occured while trying to remove space reservation for PVC %s in the LVMVolumeGroup %s", pvcKey, lvgName)) - return err + err := fmt.Errorf("cache was not found for the LVMVolumeGroup %s", lvgName) + c.log.Error(err, fmt.Sprintf("[GetAllPVCFromLVGThinPool] an error occured while trying to get all PVC for the LVMVolumeGroup %s", lvgName)) + return nil, err } - pvcCh, found := lvgCh.(*lvgCache).pvcs.Load(pvcKey) - if !found || pvcCh == nil { - err := fmt.Errorf("cache for PVC %s was not found", pvcKey) - c.log.Error(err, fmt.Sprintf("[RemoveBoundedPVCSpaceReservation] an error occured while trying to remove space reservation for PVC %s in the LVMVolumeGroup %s", pvcKey, lvgName)) - return err + thinPoolCh, found := lvgCh.(*lvgCache).thinPools.Load(thinPoolName) + if !found || thinPoolCh == nil { + c.log.Debug(fmt.Sprintf("[GetAllPVCFromLVGThinPool] no Thin pool %s in the LVMVolumeGroup %s was found. Returns nil slice", thinPoolName, lvgName)) + return nil, nil } - lvgCh.(*lvgCache).pvcs.Delete(pvcKey) - c.pvcLVGs.Delete(pvcKey) + result := make([]*v1.PersistentVolumeClaim, 0, pvcPerLVGCount) + thinPoolCh.(*thinPoolCache).pvcs.Range(func(pvcName, pvcCh any) bool { + result = append(result, pvcCh.(*pvcCache).pvc) + return true + }) - return nil + return result, nil +} + +// GetLVGNamesForPVC returns a slice of LVMVolumeGroup resources names, where selected PVC has been stored in. If no such LVMVolumeGroup found, returns empty slice. +func (c *Cache) GetLVGNamesForPVC(pvc *v1.PersistentVolumeClaim) []string { + pvcKey := configurePVCKey(pvc) + lvgNames, found := c.pvcLVGs.Load(pvcKey) + if !found { + c.log.Warning(fmt.Sprintf("[GetLVGNamesForPVC] no cached LVMVolumeGroups were found for PVC %s", pvcKey)) + return nil + } + + return lvgNames.([]string) } // CheckIsPVCStored checks if selected PVC has been already stored in the cache. @@ -331,8 +565,9 @@ func (c *Cache) CheckIsPVCStored(pvc *v1.PersistentVolumeClaim) bool { } // RemoveSpaceReservationForPVCWithSelectedNode removes space reservation for selected PVC for every LVMVolumeGroup resource, which is not bound to the PVC selected node. -func (c *Cache) RemoveSpaceReservationForPVCWithSelectedNode(pvc *v1.PersistentVolumeClaim) error { +func (c *Cache) RemoveSpaceReservationForPVCWithSelectedNode(pvc *v1.PersistentVolumeClaim, deviceType string) error { pvcKey := configurePVCKey(pvc) + // the LVG which is used to store PVC selectedLVGName := "" lvgNamesForPVC, found := c.pvcLVGs.Load(pvcKey) @@ -349,19 +584,41 @@ func (c *Cache) RemoveSpaceReservationForPVCWithSelectedNode(pvc *v1.PersistentV return err } - pvcCh, found := lvgCh.(*lvgCache).pvcs.Load(pvcKey) - if !found { - c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] PVC %s space reservation in the LVMVolumeGroup %s has been already removed", pvcKey, lvgName)) - continue - } + switch deviceType { + case consts.Thin: + lvgCh.(*lvgCache).thinPools.Range(func(thinPoolName, thinPoolCh any) bool { + pvcCh, found := thinPoolCh.(*thinPoolCache).pvcs.Load(pvcKey) + if !found { + c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] PVC %s space reservation in the LVMVolumeGroup %s has been already removed", pvcKey, lvgName)) + return true + } - selectedNode := pvcCh.(*pvcCache).selectedNode - if selectedNode == "" { - lvgCh.(*lvgCache).pvcs.Delete(pvcKey) - c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] removed space reservation for PVC %s in the LVMVolumeGroup %s due the PVC got selected to the node %s", pvcKey, lvgName, pvc.Annotations[SelectedNodeAnnotation])) - } else { - selectedLVGName = lvgName - c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] PVC %s got selected to the node %s. It should not be revomed from the LVMVolumeGroup %s", pvcKey, pvc.Annotations[SelectedNodeAnnotation], lvgName)) + selectedNode := pvcCh.(*pvcCache).selectedNode + if selectedNode == "" { + thinPoolCh.(*thinPoolCache).pvcs.Delete(pvcKey) + c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] removed space reservation for PVC %s in the Thin pool %s of the LVMVolumeGroup %s due the PVC got selected to the node %s", pvcKey, thinPoolName.(string), lvgName, pvc.Annotations[SelectedNodeAnnotation])) + } else { + selectedLVGName = lvgName + c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] PVC %s got selected to the node %s. It should not be revomed from the LVMVolumeGroup %s", pvcKey, pvc.Annotations[SelectedNodeAnnotation], lvgName)) + } + + return true + }) + case consts.Thick: + pvcCh, found := lvgCh.(*lvgCache).thickPVCs.Load(pvcKey) + if !found { + c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] PVC %s space reservation in the LVMVolumeGroup %s has been already removed", pvcKey, lvgName)) + continue + } + + selectedNode := pvcCh.(*pvcCache).selectedNode + if selectedNode == "" { + lvgCh.(*lvgCache).thickPVCs.Delete(pvcKey) + c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] removed space reservation for PVC %s in the LVMVolumeGroup %s due the PVC got selected to the node %s", pvcKey, lvgName, pvc.Annotations[SelectedNodeAnnotation])) + } else { + selectedLVGName = lvgName + c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] PVC %s got selected to the node %s. It should not be revomed from the LVMVolumeGroup %s", pvcKey, pvc.Annotations[SelectedNodeAnnotation], lvgName)) + } } } c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] PVC %s space reservation has been removed from LVMVolumeGroup cache", pvcKey)) @@ -392,7 +649,11 @@ func (c *Cache) RemovePVCFromTheCache(pvc *v1.PersistentVolumeClaim) { for _, lvgName := range lvgArray.([]string) { lvgCh, found := c.lvgs.Load(lvgName) if found { - lvgCh.(*lvgCache).pvcs.Delete(pvcKey.(string)) + lvgCh.(*lvgCache).thickPVCs.Delete(pvcKey.(string)) + lvgCh.(*lvgCache).thinPools.Range(func(tpName, tpCh any) bool { + tpCh.(*thinPoolCache).pvcs.Delete(pvcKey) + return true + }) } } } @@ -440,8 +701,17 @@ func (c *Cache) PrintTheCacheLog() { c.lvgs.Range(func(lvgName, lvgCh any) bool { c.log.Cache(fmt.Sprintf("[%s]", lvgName)) - lvgCh.(*lvgCache).pvcs.Range(func(pvcName, pvcCh any) bool { - c.log.Cache(fmt.Sprintf(" PVC %s, selected node: %s", pvcName, pvcCh.(*pvcCache).selectedNode)) + lvgCh.(*lvgCache).thickPVCs.Range(func(pvcName, pvcCh any) bool { + c.log.Cache(fmt.Sprintf(" THICK PVC %s, selected node: %s", pvcName, pvcCh.(*pvcCache).selectedNode)) + return true + }) + + lvgCh.(*lvgCache).thinPools.Range(func(thinPoolName, thinPoolCh any) bool { + thinPoolCh.(*thinPoolCache).pvcs.Range(func(pvcName, pvcCh any) bool { + c.log.Cache(fmt.Sprintf(" THIN POOL %s PVC %s, selected node: %s", thinPoolName, pvcName, pvcCh.(*pvcCache).selectedNode)) + return true + }) + return true }) diff --git a/images/sds-local-volume-scheduler-extender/pkg/cache/cache_test.go b/images/sds-local-volume-scheduler-extender/pkg/cache/cache_test.go index bc367531..9d4d742f 100644 --- a/images/sds-local-volume-scheduler-extender/pkg/cache/cache_test.go +++ b/images/sds-local-volume-scheduler-extender/pkg/cache/cache_test.go @@ -80,7 +80,7 @@ func BenchmarkCache_GetLVGReservedSpace(b *testing.B) { } for _, pvc := range pvcs { - err := cache.AddPVC(lvg.Name, &pvc) + err := cache.AddThickPVC(lvg.Name, &pvc) if err != nil { b.Error(err) } @@ -88,7 +88,7 @@ func BenchmarkCache_GetLVGReservedSpace(b *testing.B) { b.RunParallel(func(pb *testing.PB) { for pb.Next() { - _, err := cache.GetLVGReservedSpace(lvg.Name) + _, err := cache.GetLVGThickReservedSpace(lvg.Name) if err != nil { b.Error(err) } @@ -145,15 +145,15 @@ func BenchmarkCache_AddPVC(b *testing.B) { }, } - err := cache.AddPVC(lvg1.Name, pvc) + err := cache.AddThickPVC(lvg1.Name, pvc) if err != nil { b.Error(err) } - err = cache.AddPVC(lvg2.Name, pvc) + err = cache.AddThickPVC(lvg2.Name, pvc) if err != nil { b.Error(err) } - err = cache.AddPVC(lvg3.Name, pvc) + err = cache.AddThickPVC(lvg3.Name, pvc) if err != nil { b.Error(err) } @@ -400,11 +400,11 @@ func BenchmarkCache_UpdatePVC(b *testing.B) { }, }, } - err := cache.UpdatePVC(lvg.Name, pvc) + err := cache.UpdateThickPVC(lvg.Name, pvc) if err != nil { b.Error(err) } - err = cache.UpdatePVC(lvg.Name, updatedPVC) + err = cache.UpdateThickPVC(lvg.Name, updatedPVC) if err != nil { b.Error(err) } @@ -490,7 +490,7 @@ func BenchmarkCache_FullLoad(b *testing.B) { } for _, pvc := range pvcs { - err := cache.AddPVC(lvg.Name, pvc) + err := cache.AddThickPVC(lvg.Name, pvc) if err != nil { b.Error(err) } @@ -562,9 +562,17 @@ func BenchmarkCache_FullLoad(b *testing.B) { }, } - for _, pvc := range pvcs { + for d, pvc := range pvcs { + for err != nil { + err = cache.UpdateThickPVC(lvg.Name, pvc) + } + + for err != nil { + err = cache.AddThinPVC(lvg.Name, fmt.Sprintf("test-thin-%d", d), pvc) + } + for err != nil { - err = cache.UpdatePVC(lvg.Name, pvc) + err = cache.UpdateThinPVC(lvg.Name, fmt.Sprintf("test-thin-%d", d), pvc) } cache.GetLVGNamesForPVC(pvc) @@ -577,7 +585,11 @@ func BenchmarkCache_FullLoad(b *testing.B) { if err != nil { b.Error(err) } - _, err = cache.GetLVGReservedSpace(lvgName) + _, err = cache.GetLVGThickReservedSpace(lvgName) + if err != nil { + b.Error(err) + } + _, err = cache.GetLVGThinReservedSpace(lvgName, "test-thin") if err != nil { b.Error(err) } diff --git a/images/sds-local-volume-scheduler-extender/pkg/consts/consts.go b/images/sds-local-volume-scheduler-extender/pkg/consts/consts.go new file mode 100644 index 00000000..947c77ab --- /dev/null +++ b/images/sds-local-volume-scheduler-extender/pkg/consts/consts.go @@ -0,0 +1,11 @@ +package consts + +const ( + SdsLocalVolumeProvisioner = "local.csi.storage.deckhouse.io" + + LvmTypeParamKey = "local.csi.storage.deckhouse.io/lvm-type" + LvmVolumeGroupsParamKey = "local.csi.storage.deckhouse.io/lvm-volume-groups" + + Thick = "Thick" + Thin = "Thin" +) diff --git a/images/sds-local-volume-scheduler-extender/pkg/controller/lvg_watcher_cache.go b/images/sds-local-volume-scheduler-extender/pkg/controller/lvg_watcher_cache.go index 1d7b1202..ffc8d69c 100644 --- a/images/sds-local-volume-scheduler-extender/pkg/controller/lvg_watcher_cache.go +++ b/images/sds-local-volume-scheduler-extender/pkg/controller/lvg_watcher_cache.go @@ -6,6 +6,7 @@ import ( "fmt" v1 "k8s.io/api/core/v1" "k8s.io/client-go/util/workqueue" + "reflect" "sds-local-volume-scheduler-extender/api/v1alpha1" "sds-local-volume-scheduler-extender/pkg/cache" "sds-local-volume-scheduler-extender/pkg/logger" @@ -116,8 +117,7 @@ func RunLVGWatcherCacheController( log.Trace(fmt.Sprintf("[RunLVGWatcherCacheController] old state LVMVolumeGroup %s has size %s", oldLvg.Name, oldLvg.Status.AllocatedSize.String())) log.Trace(fmt.Sprintf("[RunLVGWatcherCacheController] new state LVMVolumeGroup %s has size %s", newLvg.Name, newLvg.Status.AllocatedSize.String())) - if newLvg.DeletionTimestamp != nil || - oldLvg.Status.AllocatedSize.Value() == newLvg.Status.AllocatedSize.Value() { + if !shouldReconcileLVG(oldLvg, newLvg) { log.Debug(fmt.Sprintf("[RunLVGWatcherCacheController] the LVMVolumeGroup %s should not be reconciled", newLvg.Name)) return } @@ -127,17 +127,13 @@ func RunLVGWatcherCacheController( if err != nil { log.Error(err, fmt.Sprintf("[RunLVGWatcherCacheController] unable to get all PVC for the LVMVolumeGroup %s", newLvg.Name)) } + for _, pvc := range cachedPVCs { log.Trace(fmt.Sprintf("[RunLVGWatcherCacheController] PVC %s/%s from the cache belongs to LVMVolumeGroup %s", pvc.Namespace, pvc.Name, newLvg.Name)) log.Trace(fmt.Sprintf("[RunLVGWatcherCacheController] PVC %s/%s has status phase %s", pvc.Namespace, pvc.Name, pvc.Status.Phase)) if pvc.Status.Phase == v1.ClaimBound { log.Debug(fmt.Sprintf("[RunLVGWatcherCacheController] PVC %s/%s from the cache has Status.Phase Bound. It will be removed from the reserved space in the LVMVolumeGroup %s", pvc.Namespace, pvc.Name, newLvg.Name)) - err = cache.RemoveBoundedPVCSpaceReservation(newLvg.Name, pvc) - if err != nil { - log.Error(err, fmt.Sprintf("[RunLVGWatcherCacheController] unable to remove PVC %s/%s from the cache in the LVMVolumeGroup %s", pvc.Namespace, pvc.Name, newLvg.Name)) - continue - } - + cache.RemovePVCFromTheCache(pvc) log.Debug(fmt.Sprintf("[RunLVGWatcherCacheController] PVC %s/%s was removed from the LVMVolumeGroup %s in the cache", pvc.Namespace, pvc.Name, newLvg.Name)) } } @@ -163,3 +159,16 @@ func RunLVGWatcherCacheController( return c, nil } + +func shouldReconcileLVG(oldLVG, newLVG *v1alpha1.LvmVolumeGroup) bool { + if newLVG.DeletionTimestamp != nil { + return false + } + + if oldLVG.Status.AllocatedSize.Value() == newLVG.Status.AllocatedSize.Value() && + reflect.DeepEqual(oldLVG.Status.ThinPools, newLVG.Status.ThinPools) { + return false + } + + return true +} diff --git a/images/sds-local-volume-scheduler-extender/pkg/controller/pvc_watcher_cache.go b/images/sds-local-volume-scheduler-extender/pkg/controller/pvc_watcher_cache.go index c96ee95d..af1561c1 100644 --- a/images/sds-local-volume-scheduler-extender/pkg/controller/pvc_watcher_cache.go +++ b/images/sds-local-volume-scheduler-extender/pkg/controller/pvc_watcher_cache.go @@ -9,6 +9,7 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/utils/strings/slices" "sds-local-volume-scheduler-extender/pkg/cache" + "sds-local-volume-scheduler-extender/pkg/consts" "sds-local-volume-scheduler-extender/pkg/logger" "sds-local-volume-scheduler-extender/pkg/scheduler" "sigs.k8s.io/controller-runtime/pkg/client" @@ -21,8 +22,7 @@ import ( ) const ( - PVCWatcherCacheCtrlName = "pvc-watcher-cache-controller" - sdsLocalVolumeProvisioner = "local.csi.storage.deckhouse.io" + PVCWatcherCacheCtrlName = "pvc-watcher-cache-controller" ) func RunPVCWatcherCacheController( @@ -118,33 +118,30 @@ func RunPVCWatcherCacheController( } func reconcilePVC(ctx context.Context, mgr manager.Manager, log logger.Logger, schedulerCache *cache.Cache, pvc *v1.PersistentVolumeClaim, selectedNodeName string) { - log.Debug(fmt.Sprintf("[reconcilePVC] starts to find common LVMVolumeGroup for the selected node %s and PVC %s/%s", selectedNodeName, pvc.Namespace, pvc.Name)) - lvgsOnTheNode := schedulerCache.GetLVGNamesByNodeName(selectedNodeName) - for _, lvgName := range lvgsOnTheNode { - log.Trace(fmt.Sprintf("[reconcilePVC] LVMVolumeGroup %s belongs to the node %s", lvgName, selectedNodeName)) + sc := &v12.StorageClass{} + err := mgr.GetClient().Get(ctx, client.ObjectKey{ + Name: *pvc.Spec.StorageClassName, + }, sc) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcilePVC] unable to get Storage Class %s for PVC %s/%s", *pvc.Spec.StorageClassName, pvc.Namespace, pvc.Name)) + return + } + + if sc.Provisioner != consts.SdsLocalVolumeProvisioner { + log.Debug(fmt.Sprintf("[reconcilePVC] Storage Class %s for PVC %s/%s is not managed by sds-local-volume-provisioner. Ends the reconciliation", sc.Name, pvc.Namespace, pvc.Name)) + return } + log.Debug(fmt.Sprintf("[reconcilePVC] tries to extract LVGs from the Storage Class %s for PVC %s/%s", sc.Name, pvc.Namespace, pvc.Name)) + lvgsFromSc, err := scheduler.ExtractLVGsFromSC(sc) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcilePVC] unable to extract LVMVolumeGroups from the Storage Class %s", sc.Name)) + } + log.Debug(fmt.Sprintf("[reconcilePVC] successfully extracted LVGs from the Storage Class %s for PVC %s/%s", sc.Name, pvc.Namespace, pvc.Name)) + lvgsForPVC := schedulerCache.GetLVGNamesForPVC(pvc) if lvgsForPVC == nil || len(lvgsForPVC) == 0 { log.Debug(fmt.Sprintf("[reconcilePVC] no LVMVolumeGroups were found in the cache for PVC %s/%s. Use Storage Class %s instead", pvc.Namespace, pvc.Name, *pvc.Spec.StorageClassName)) - sc := &v12.StorageClass{} - err := mgr.GetClient().Get(ctx, client.ObjectKey{ - Name: *pvc.Spec.StorageClassName, - }, sc) - if err != nil { - log.Error(err, fmt.Sprintf("[reconcilePVC] unable to get Storage Class %s for PVC %s/%s", *pvc.Spec.StorageClassName, pvc.Namespace, pvc.Name)) - return - } - - if sc.Provisioner != sdsLocalVolumeProvisioner { - log.Debug(fmt.Sprintf("[reconcilePVC] Storage Class %s for PVC %s/%s is not managed by sds-local-volume-provisioner. Ends the reconciliation", sc.Name, pvc.Namespace, pvc.Name)) - return - } - - lvgsFromSc, err := scheduler.ExtractLVGsFromSC(sc) - if err != nil { - log.Error(err, fmt.Sprintf("[reconcilePVC] unable to extract LVMVolumeGroups from the Storage Class %s", sc.Name)) - } for _, lvg := range lvgsFromSc { lvgsForPVC = append(lvgsForPVC, lvg.Name) @@ -154,10 +151,17 @@ func reconcilePVC(ctx context.Context, mgr manager.Manager, log logger.Logger, s log.Trace(fmt.Sprintf("[reconcilePVC] LVMVolumeGroup %s belongs to PVC %s/%s", lvgName, pvc.Namespace, pvc.Name)) } + log.Debug(fmt.Sprintf("[reconcilePVC] starts to find common LVMVolumeGroup for the selected node %s and PVC %s/%s", selectedNodeName, pvc.Namespace, pvc.Name)) + lvgsOnTheNode := schedulerCache.GetLVGNamesByNodeName(selectedNodeName) + for _, lvgName := range lvgsOnTheNode { + log.Trace(fmt.Sprintf("[reconcilePVC] LVMVolumeGroup %s belongs to the node %s", lvgName, selectedNodeName)) + } + var commonLVGName string for _, pvcLvg := range lvgsForPVC { if slices.Contains(lvgsOnTheNode, pvcLvg) { commonLVGName = pvcLvg + break } } if commonLVGName == "" { @@ -167,18 +171,33 @@ func reconcilePVC(ctx context.Context, mgr manager.Manager, log logger.Logger, s log.Debug(fmt.Sprintf("[reconcilePVC] successfully found common LVMVolumeGroup %s for the selected node %s and PVC %s/%s", commonLVGName, selectedNodeName, pvc.Namespace, pvc.Name)) log.Debug(fmt.Sprintf("[reconcilePVC] starts to update PVC %s/%s in the cache", pvc.Namespace, pvc.Name)) - log.Trace(fmt.Sprintf("[reconcilePVC] PVC %s/%s has status phase: %s", pvc.Namespace, pvc.Name, pvc.Status.Phase)) - err := schedulerCache.UpdatePVC(commonLVGName, pvc) - if err != nil { - log.Error(err, fmt.Sprintf("[reconcilePVC] unable to update PVC %s/%s in the cache", pvc.Namespace, pvc.Name)) - return + + log.Trace(fmt.Sprintf("[reconcilePVC] %s PVC %s/%s has status phase: %s", sc.Parameters[consts.LvmTypeParamKey], pvc.Namespace, pvc.Name, pvc.Status.Phase)) + switch sc.Parameters[consts.LvmTypeParamKey] { + case consts.Thick: + err = schedulerCache.UpdateThickPVC(commonLVGName, pvc) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcilePVC] unable to update Thick PVC %s/%s in the cache", pvc.Namespace, pvc.Name)) + return + } + case consts.Thin: + for _, lvg := range lvgsFromSc { + if lvg.Name == commonLVGName { + err = schedulerCache.UpdateThinPVC(commonLVGName, lvg.Thin.PoolName, pvc) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcilePVC] unable to update Thin PVC %s/%s in the cache", pvc.Namespace, pvc.Name)) + return + } + break + } + } } - log.Debug(fmt.Sprintf("[reconcilePVC] successfully updated PVC %s/%s in the cache", pvc.Namespace, pvc.Name)) + log.Debug(fmt.Sprintf("[reconcilePVC] successfully updated %s PVC %s/%s in the cache", sc.Parameters[consts.LvmTypeParamKey], pvc.Namespace, pvc.Name)) log.Cache(fmt.Sprintf("[reconcilePVC] cache state BEFORE the removal space reservation for PVC %s/%s", pvc.Namespace, pvc.Name)) schedulerCache.PrintTheCacheLog() log.Debug(fmt.Sprintf("[reconcilePVC] starts to remove space reservation for PVC %s/%s with selected node from the cache", pvc.Namespace, pvc.Name)) - err = schedulerCache.RemoveSpaceReservationForPVCWithSelectedNode(pvc) + err = schedulerCache.RemoveSpaceReservationForPVCWithSelectedNode(pvc, sc.Parameters[consts.LvmTypeParamKey]) if err != nil { log.Error(err, fmt.Sprintf("[reconcilePVC] unable to remove PVC %s/%s space reservation in the cache", pvc.Namespace, pvc.Name)) return diff --git a/images/sds-local-volume-scheduler-extender/pkg/scheduler/filter.go b/images/sds-local-volume-scheduler-extender/pkg/scheduler/filter.go index 3291bad4..8e1e916b 100644 --- a/images/sds-local-volume-scheduler-extender/pkg/scheduler/filter.go +++ b/images/sds-local-volume-scheduler-extender/pkg/scheduler/filter.go @@ -28,22 +28,13 @@ import ( "net/http" "sds-local-volume-scheduler-extender/api/v1alpha1" "sds-local-volume-scheduler-extender/pkg/cache" + "sds-local-volume-scheduler-extender/pkg/consts" "sds-local-volume-scheduler-extender/pkg/logger" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/yaml" "sync" ) -const ( - sdsLocalVolumeProvisioner = "local.csi.storage.deckhouse.io" - - lvmTypeParamKey = "local.csi.storage.deckhouse.io/lvm-type" - lvmVolumeGroupsParamKey = "local.csi.storage.deckhouse.io/lvm-volume-groups" - - thick = "Thick" - thin = "Thin" -) - func (s *scheduler) filter(w http.ResponseWriter, r *http.Request) { s.log.Debug("[filter] starts the serving") var input ExtenderArgs @@ -101,6 +92,11 @@ func (s *scheduler) filter(w http.ResponseWriter, r *http.Request) { s.log.Trace(fmt.Sprintf("[filter] filtered managed PVC %s/%s", pvc.Namespace, pvc.Name)) } + if len(managedPVCs) == 0 { + s.log.Warning(fmt.Sprintf("[filter] Pod %s/%s uses PVCs which are not managed by the module. Unable to filter and score the nodes", input.Pod.Namespace, input.Pod.Name)) + return + } + s.log.Debug(fmt.Sprintf("[filter] starts to extract PVC requested sizes for a Pod %s/%s", input.Pod.Namespace, input.Pod.Name)) pvcRequests, err := extractRequestedSize(s.ctx, s.client, s.log, managedPVCs, scs) if err != nil { @@ -146,7 +142,7 @@ func filterNotManagedPVC(log logger.Logger, pvcs map[string]*corev1.PersistentVo filteredPVCs := make(map[string]*corev1.PersistentVolumeClaim, len(pvcs)) for _, pvc := range pvcs { sc := scs[*pvc.Spec.StorageClassName] - if sc.Provisioner != sdsLocalVolumeProvisioner { + if sc.Provisioner != consts.SdsLocalVolumeProvisioner { log.Debug(fmt.Sprintf("[filterNotManagedPVC] filter out PVC %s/%s due to used Storage class %s is not managed by sds-local-volume-provisioner", pvc.Name, pvc.Namespace, sc.Name)) continue } @@ -167,25 +163,36 @@ func populateCache(log logger.Logger, nodes []corev1.Node, pod *corev1.Pod, sche pvc := pvcs[volume.PersistentVolumeClaim.ClaimName] sc := scs[*pvc.Spec.StorageClassName] - if sc.Parameters[lvmTypeParamKey] == thick { - log.Debug(fmt.Sprintf("[populateCache] Storage Class %s has device type Thick, so the cache will be populated by PVC space requests", sc.Name)) - lvgsForPVC, err := ExtractLVGsFromSC(sc) - if err != nil { - return err - } + lvgsForPVC, err := ExtractLVGsFromSC(sc) + if err != nil { + return err + } + switch sc.Parameters[consts.LvmTypeParamKey] { + case consts.Thick: + log.Debug(fmt.Sprintf("[populateCache] Storage Class %s has device type Thick, so the cache will be populated by PVC space requests", sc.Name)) log.Trace(fmt.Sprintf("[populateCache] LVMVolumeGroups from Storage Class %s for PVC %s/%s: %+v", sc.Name, pvc.Namespace, pvc.Name, lvgsForPVC)) for _, lvg := range lvgsForPVC { if slices.Contains(lvgNamesForTheNode, lvg.Name) { log.Trace(fmt.Sprintf("[populateCache] PVC %s/%s will reserve space in LVMVolumeGroup %s cache", pvc.Namespace, pvc.Name, lvg.Name)) - err = schedulerCache.AddPVC(lvg.Name, pvc) + err = schedulerCache.AddThickPVC(lvg.Name, pvc) + if err != nil { + return err + } + } + } + case consts.Thin: + log.Debug(fmt.Sprintf("[populateCache] Storage Class %s has device type Thin, so the cache will be populated by PVC space requests", sc.Name)) + log.Trace(fmt.Sprintf("[populateCache] LVMVolumeGroups from Storage Class %s for PVC %s/%s: %+v", sc.Name, pvc.Namespace, pvc.Name, lvgsForPVC)) + for _, lvg := range lvgsForPVC { + if slices.Contains(lvgNamesForTheNode, lvg.Name) { + log.Trace(fmt.Sprintf("[populateCache] PVC %s/%s will reserve space in LVMVolumeGroup %s Thin Pool %s cache", pvc.Namespace, pvc.Name, lvg.Name, lvg.Thin.PoolName)) + err = schedulerCache.AddThinPVC(lvg.Name, lvg.Thin.PoolName, pvc) if err != nil { return err } } } - } else { - log.Debug(fmt.Sprintf("[populateCache] Storage Class %s has device type Thin, so the cache should NOT be populated by PVC space requests", sc.Name)) } } } @@ -217,30 +224,30 @@ func extractRequestedSize( log.Debug(fmt.Sprintf("[extractRequestedSize] PVC %s/%s has status phase: %s", pvc.Namespace, pvc.Name, pvc.Status.Phase)) switch pvc.Status.Phase { case corev1.ClaimPending: - switch sc.Parameters[lvmTypeParamKey] { - case thick: + switch sc.Parameters[consts.LvmTypeParamKey] { + case consts.Thick: pvcRequests[pvc.Name] = PVCRequest{ - DeviceType: thick, + DeviceType: consts.Thick, RequestedSize: pvc.Spec.Resources.Requests.Storage().Value(), } - case thin: + case consts.Thin: pvcRequests[pvc.Name] = PVCRequest{ - DeviceType: thin, + DeviceType: consts.Thin, RequestedSize: pvc.Spec.Resources.Requests.Storage().Value(), } } case corev1.ClaimBound: pv := pvs[pvc.Spec.VolumeName] - switch sc.Parameters[lvmTypeParamKey] { - case thick: + switch sc.Parameters[consts.LvmTypeParamKey] { + case consts.Thick: pvcRequests[pvc.Name] = PVCRequest{ - DeviceType: thick, + DeviceType: consts.Thick, RequestedSize: pvc.Spec.Resources.Requests.Storage().Value() - pv.Spec.Capacity.Storage().Value(), } - case thin: + case consts.Thin: pvcRequests[pvc.Name] = PVCRequest{ - DeviceType: thin, + DeviceType: consts.Thin, RequestedSize: pvc.Spec.Resources.Requests.Storage().Value() - pv.Spec.Capacity.Storage().Value(), } } @@ -263,7 +270,7 @@ func filterNodes( scs map[string]*v1.StorageClass, pvcRequests map[string]PVCRequest, ) (*ExtenderFilterResult, error) { - // Param "pvcRequests" is a total amount of the pvcRequests space (both thick and thin) for Pod (i.e. from every PVC) + // Param "pvcRequests" is a total amount of the pvcRequests space (both Thick and Thin) for Pod (i.e. from every PVC) if len(pvcRequests) == 0 { return &ExtenderFilterResult{ Nodes: nodes, @@ -292,22 +299,34 @@ func filterNodes( log.Trace(fmt.Sprintf("[filterNodes] the LVMVolumeGroup %s is actually used. VG size: %s, allocatedSize: %s", lvg.Name, lvg.Status.VGSize.String(), lvg.Status.AllocatedSize.String())) } - lvgsThickFree := getLVGThickFreeSpaces(log, usedLVGs) + lvgsThickFree := getLVGThickFreeSpaces(usedLVGs) log.Trace(fmt.Sprintf("[filterNodes] for a Pod %s/%s current LVMVolumeGroups Thick FreeSpace on the node: %+v", pod.Namespace, pod.Name, lvgsThickFree)) - for lvgName, freeSpace := range lvgsThickFree { log.Trace(fmt.Sprintf("[filterNodes] current LVMVolumeGroup %s Thick free space %s", lvgName, resource.NewQuantity(freeSpace, resource.BinarySI))) - reservedSize, err := schedulerCache.GetLVGReservedSpace(lvgName) + reservedSpace, err := schedulerCache.GetLVGThickReservedSpace(lvgName) if err != nil { - log.Error(err, fmt.Sprintf("[filterNodes] unable to cound cache reserved size for the LVMVolumeGroup %s", lvgName)) + log.Error(err, fmt.Sprintf("[filterNodes] unable to count cache reserved space for the LVMVolumeGroup %s", lvgName)) continue } - log.Trace(fmt.Sprintf("[filterNodes] current LVMVolumeGroup %s reserved PVC space %s", lvgName, resource.NewQuantity(reservedSize, resource.BinarySI))) - lvgsThickFree[lvgName] -= reservedSize + log.Trace(fmt.Sprintf("[filterNodes] current LVMVolumeGroup %s reserved PVC space %s", lvgName, resource.NewQuantity(reservedSpace, resource.BinarySI))) + lvgsThickFree[lvgName] -= reservedSpace } log.Trace(fmt.Sprintf("[filterNodes] for a Pod %s/%s current LVMVolumeGroups Thick FreeSpace with reserved PVC: %+v", pod.Namespace, pod.Name, lvgsThickFree)) - lvgsThickFreeMutex := &sync.RWMutex{} + lvgsThinFree := getLVGThinFreeSpaces(usedLVGs) + log.Trace(fmt.Sprintf("[filterNodes] for a Pod %s/%s current LVMVolumeGroups Thin FreeSpace on the node: %+v", pod.Namespace, pod.Name, lvgsThinFree)) + for lvgName, thinPools := range lvgsThinFree { + for tpName, freeSpace := range thinPools { + log.Trace(fmt.Sprintf("[filterNodes] current LVMVolumeGroup %s Thin Pool %s free space %s", lvgName, tpName, resource.NewQuantity(freeSpace, resource.BinarySI))) + reservedSpace, err := schedulerCache.GetLVGThinReservedSpace(lvgName, tpName) + if err != nil { + log.Error(err, fmt.Sprintf("[filterNodes] unable to count cache reserved space for the Thin pool %s of the LVMVolumeGroup %s", tpName, lvgName)) + continue + } + log.Trace(fmt.Sprintf("[filterNodes] current LVMVolumeGroup %s Thin pool %s reserved PVC space %s", lvgName, tpName, resource.NewQuantity(reservedSpace, resource.BinarySI))) + lvgsThinFree[lvgName][tpName] -= reservedSpace + } + } nodeLVGs := SortLVGsByNodeName(usedLVGs) for n, ls := range nodeLVGs { @@ -316,7 +335,12 @@ func filterNodes( } } + // these are the nodes which might store every PVC from the Pod commonNodes, err := getCommonNodesByStorageClasses(scs, nodeLVGs) + if err != nil { + log.Error(err, fmt.Sprintf("[filterNodes] unable to get common nodes for PVCs from the Pod %s/%s", pod.Namespace, pod.Name)) + return nil, err + } for nodeName := range commonNodes { log.Trace(fmt.Sprintf("[filterNodes] Node %s is a common for every storage class", nodeName)) } @@ -325,7 +349,10 @@ func filterNodes( Nodes: &corev1.NodeList{}, FailedNodes: FailedNodesMap{}, } - failedNodesMutex := &sync.Mutex{} + + thickMapMtx := &sync.RWMutex{} + thinMapMtx := &sync.RWMutex{} + failedNodesMapMtx := &sync.Mutex{} wg := &sync.WaitGroup{} wg.Add(len(nodes.Items)) @@ -333,26 +360,32 @@ func filterNodes( for i, node := range nodes.Items { go func(i int, node corev1.Node) { - log.Debug(fmt.Sprintf("[filterNodes] gourutine %d starts the work", i)) + log.Trace(fmt.Sprintf("[filterNodes] gourutine %d starts the work", i)) defer func() { - log.Debug(fmt.Sprintf("[filterNodes] gourutine %d ends the work", i)) + log.Trace(fmt.Sprintf("[filterNodes] gourutine %d ends the work", i)) wg.Done() }() if _, common := commonNodes[node.Name]; !common { log.Debug(fmt.Sprintf("[filterNodes] node %s is not common for used Storage Classes", node.Name)) - failedNodesMutex.Lock() + failedNodesMapMtx.Lock() result.FailedNodes[node.Name] = "node is not common for used Storage Classes" - failedNodesMutex.Unlock() + failedNodesMapMtx.Unlock() return } + // we get all LVMVolumeGroups from the node-applicant (which is common for all the PVCs) lvgsFromNode := commonNodes[node.Name] hasEnoughSpace := true + // now we iterate all over the PVCs to see if we can place all of them on the node (does the node have enough space) for _, pvc := range pvcs { pvcReq := pvcRequests[pvc.Name] + + // we get LVGs which might be used by the PVC lvgsFromSC := scLVGs[*pvc.Spec.StorageClassName] + + // we get the specific LVG which the PVC can use on the node as we support only one specified LVG in the Storage Class on each node commonLVG := findMatchedLVG(lvgsFromNode, lvgsFromSC) if commonLVG == nil { err = errors.New(fmt.Sprintf("unable to match Storage Class's LVMVolumeGroup with the node's one, Storage Class: %s, node: %s", *pvc.Spec.StorageClassName, node.Name)) @@ -361,53 +394,59 @@ func filterNodes( } log.Trace(fmt.Sprintf("[scoreNodes] LVMVolumeGroup %s is common for storage class %s and node %s", commonLVG.Name, *pvc.Spec.StorageClassName, node.Name)) + // see what kind of space does the PVC need switch pvcReq.DeviceType { - case thick: - lvg := lvgs[commonLVG.Name] - lvgsThickFreeMutex.RLock() - freeSpace := lvgsThickFree[lvg.Name] - lvgsThickFreeMutex.RUnlock() + case consts.Thick: + thickMapMtx.RLock() + freeSpace := lvgsThickFree[commonLVG.Name] + thickMapMtx.RUnlock() - log.Trace(fmt.Sprintf("[filterNodes] LVMVolumeGroup %s Thick free space: %s, PVC requested space: %s", lvg.Name, resource.NewQuantity(freeSpace, resource.BinarySI), resource.NewQuantity(pvcReq.RequestedSize, resource.BinarySI))) + log.Trace(fmt.Sprintf("[filterNodes] LVMVolumeGroup %s Thick free space: %s, PVC requested space: %s", commonLVG.Name, resource.NewQuantity(freeSpace, resource.BinarySI), resource.NewQuantity(pvcReq.RequestedSize, resource.BinarySI))) if freeSpace < pvcReq.RequestedSize { hasEnoughSpace = false break } - lvgsThickFreeMutex.Lock() - lvgsThickFree[lvg.Name] -= pvcReq.RequestedSize - lvgsThickFreeMutex.Unlock() - case thin: + thickMapMtx.Lock() + lvgsThickFree[commonLVG.Name] -= pvcReq.RequestedSize + thickMapMtx.Unlock() + case consts.Thin: lvg := lvgs[commonLVG.Name] + + // we try to find specific ThinPool which the PVC can use in the LVMVolumeGroup targetThinPool := findMatchedThinPool(lvg.Status.ThinPools, commonLVG.Thin.PoolName) if targetThinPool == nil { - err = fmt.Errorf("unable to match Storage Class's ThinPools with the node's one, Storage Class: %s; node: %s; lvg thin pools: %+v; thin.poolName from StorageClass: %s", *pvc.Spec.StorageClassName, node.Name, lvg.Status.ThinPools, commonLVG.Thin.PoolName) + err = fmt.Errorf("unable to match Storage Class's ThinPools with the node's one, Storage Class: %s; node: %s; lvg Thin pools: %+v; Thin.poolName from StorageClass: %s", *pvc.Spec.StorageClassName, node.Name, lvg.Status.ThinPools, commonLVG.Thin.PoolName) errs <- err return } - // TODO: add after overCommit implementation - // freeSpace, err := getThinPoolFreeSpace(targetThinPool) - // if err != nil { - // errs <- err - // return - // } - - // log.Trace(fmt.Sprintf("[filterNodes] ThinPool free space: %d, PVC requested space: %d", freeSpace.Value(), pvcReq.RequestedSize)) - - // if freeSpace.Value() < pvcReq.RequestedSize { - // hasEnoughSpace = false - // } + + thinMapMtx.RLock() + freeSpace := lvgsThinFree[lvg.Name][targetThinPool.Name] + thinMapMtx.RUnlock() + + log.Trace(fmt.Sprintf("[filterNodes] LVMVolumeGroup %s Thin Pool %s free space: %s, PVC requested space: %s", lvg.Name, targetThinPool.Name, resource.NewQuantity(freeSpace, resource.BinarySI), resource.NewQuantity(pvcReq.RequestedSize, resource.BinarySI))) + + if freeSpace < pvcReq.RequestedSize { + hasEnoughSpace = false + break + } + + thinMapMtx.Lock() + lvgsThinFree[lvg.Name][targetThinPool.Name] -= pvcReq.RequestedSize + thinMapMtx.Unlock() } if !hasEnoughSpace { + // we break as if only one PVC can't get enough space, the node does not fit break } } if !hasEnoughSpace { - failedNodesMutex.Lock() + failedNodesMapMtx.Lock() result.FailedNodes[node.Name] = "not enough space" - failedNodesMutex.Unlock() + failedNodesMapMtx.Unlock() return } @@ -423,6 +462,7 @@ func filterNodes( } close(errs) if err != nil { + log.Error(err, fmt.Sprintf("[filterNodes] unable to filter nodes for the Pod %s/%s, last error: %s", pod.Namespace, pod.Name, err.Error())) return nil, err } @@ -437,21 +477,33 @@ func filterNodes( return result, nil } -func getLVGThickFreeSpaces(log logger.Logger, lvgs map[string]*v1alpha1.LvmVolumeGroup) map[string]int64 { - result := make(map[string]int64, len(lvgs)) +func getLVGThinFreeSpaces(lvgs map[string]*v1alpha1.LvmVolumeGroup) map[string]map[string]int64 { + result := make(map[string]map[string]int64, len(lvgs)) for _, lvg := range lvgs { - log.Debug(fmt.Sprintf("[getLVGThickFreeSpaces] tries to count free VG space for LVMVolumeGroup %s", lvg.Name)) - free := getVGFreeSpace(lvg) - log.Debug(fmt.Sprintf("[getLVGThickFreeSpaces] successfully counted free VG space for LVMVolumeGroup %s", lvg.Name)) + if result[lvg.Name] == nil { + result[lvg.Name] = make(map[string]int64, len(lvg.Status.ThinPools)) + } - result[lvg.Name] = free.Value() + for _, tp := range lvg.Status.ThinPools { + result[lvg.Name][tp.Name] = tp.AvailableSpace.Value() + } + } + + return result +} + +func getLVGThickFreeSpaces(lvgs map[string]*v1alpha1.LvmVolumeGroup) map[string]int64 { + result := make(map[string]int64, len(lvgs)) + + for _, lvg := range lvgs { + result[lvg.Name] = lvg.Status.VGFree.Value() } return result } -func findMatchedThinPool(thinPools []v1alpha1.StatusThinPool, name string) *v1alpha1.StatusThinPool { +func findMatchedThinPool(thinPools []v1alpha1.LvmVolumeGroupThinPoolStatus, name string) *v1alpha1.LvmVolumeGroupThinPoolStatus { for _, tp := range thinPools { if tp.Name == name { return &tp @@ -554,13 +606,13 @@ type LVMVolumeGroup struct { Name string `yaml:"name"` Thin struct { PoolName string `yaml:"poolName"` - } `yaml:"thin"` + } `yaml:"Thin"` } type LVMVolumeGroups []LVMVolumeGroup func ExtractLVGsFromSC(sc *v1.StorageClass) (LVMVolumeGroups, error) { var lvmVolumeGroups LVMVolumeGroups - err := yaml.Unmarshal([]byte(sc.Parameters[lvmVolumeGroupsParamKey]), &lvmVolumeGroups) + err := yaml.Unmarshal([]byte(sc.Parameters[consts.LvmVolumeGroupsParamKey]), &lvmVolumeGroups) if err != nil { return nil, err } @@ -578,21 +630,6 @@ func SortLVGsByNodeName(lvgs map[string]*v1alpha1.LvmVolumeGroup) map[string][]* return sorted } -func getVGFreeSpace(lvg *v1alpha1.LvmVolumeGroup) resource.Quantity { - // notice that .Sub method uses pointer but not a copy of the quantity - free := lvg.Status.VGSize - free.Sub(lvg.Status.AllocatedSize) - return free -} - -func getThinPoolFreeSpace(tp *v1alpha1.StatusThinPool) resource.Quantity { - // notice that .Sub method uses pointer but not a copy of the quantity - free := tp.ActualSize - free.Sub(tp.UsedSize) - - return free -} - func getPersistentVolumes(ctx context.Context, cl client.Client) (map[string]corev1.PersistentVolume, error) { pvs := &corev1.PersistentVolumeList{} err := cl.List(ctx, pvs) diff --git a/images/sds-local-volume-scheduler-extender/pkg/scheduler/filter_test.go b/images/sds-local-volume-scheduler-extender/pkg/scheduler/filter_test.go index 29b4d5c8..f937d032 100644 --- a/images/sds-local-volume-scheduler-extender/pkg/scheduler/filter_test.go +++ b/images/sds-local-volume-scheduler-extender/pkg/scheduler/filter_test.go @@ -5,6 +5,7 @@ import ( v1 "k8s.io/api/core/v1" v12 "k8s.io/api/storage/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sds-local-volume-scheduler-extender/pkg/consts" "sds-local-volume-scheduler-extender/pkg/logger" "testing" ) @@ -20,13 +21,13 @@ func TestFilter(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: sc1, }, - Provisioner: sdsLocalVolumeProvisioner, + Provisioner: consts.SdsLocalVolumeProvisioner, }, sc2: { ObjectMeta: metav1.ObjectMeta{ Name: sc2, }, - Provisioner: sdsLocalVolumeProvisioner, + Provisioner: consts.SdsLocalVolumeProvisioner, }, sc3: { ObjectMeta: metav1.ObjectMeta{ diff --git a/images/sds-local-volume-scheduler-extender/pkg/scheduler/prioritize.go b/images/sds-local-volume-scheduler-extender/pkg/scheduler/prioritize.go index 672ec980..c4e3f4b1 100644 --- a/images/sds-local-volume-scheduler-extender/pkg/scheduler/prioritize.go +++ b/images/sds-local-volume-scheduler-extender/pkg/scheduler/prioritize.go @@ -23,6 +23,7 @@ import ( "math" "net/http" "sds-local-volume-scheduler-extender/pkg/cache" + "sds-local-volume-scheduler-extender/pkg/consts" "sds-local-volume-scheduler-extender/pkg/logger" "sync" @@ -156,10 +157,10 @@ func scoreNodes( var freeSpace resource.Quantity lvg := lvgs[commonLVG.Name] switch pvcReq.DeviceType { - case thick: - freeSpace = getVGFreeSpace(lvg) - log.Trace(fmt.Sprintf("[scoreNodes] LVMVolumeGroup %s free thick space before PVC reservation: %s", lvg.Name, freeSpace.String())) - reserved, err := schedulerCache.GetLVGReservedSpace(lvg.Name) + case consts.Thick: + freeSpace = lvg.Status.VGFree + log.Trace(fmt.Sprintf("[scoreNodes] LVMVolumeGroup %s free Thick space before PVC reservation: %s", lvg.Name, freeSpace.String())) + reserved, err := schedulerCache.GetLVGThickReservedSpace(lvg.Name) if err != nil { log.Error(err, fmt.Sprintf("[scoreNodes] unable to count reserved space for the LVMVolumeGroup %s", lvg.Name)) continue @@ -167,8 +168,8 @@ func scoreNodes( log.Trace(fmt.Sprintf("[scoreNodes] LVMVolumeGroup %s PVC Space reservation: %s", lvg.Name, resource.NewQuantity(reserved, resource.BinarySI))) spaceWithReserved := freeSpace.Value() - reserved freeSpace = *resource.NewQuantity(spaceWithReserved, resource.BinarySI) - log.Trace(fmt.Sprintf("[scoreNodes] LVMVolumeGroup %s free thick space after PVC reservation: %s", lvg.Name, freeSpace.String())) - case thin: + log.Trace(fmt.Sprintf("[scoreNodes] LVMVolumeGroup %s free Thick space after PVC reservation: %s", lvg.Name, freeSpace.String())) + case consts.Thin: thinPool := findMatchedThinPool(lvg.Status.ThinPools, commonLVG.Thin.PoolName) if thinPool == nil { err = errors.New(fmt.Sprintf("unable to match Storage Class's ThinPools with the node's one, Storage Class: %s, node: %s", *pvc.Spec.StorageClassName, node.Name)) @@ -177,7 +178,7 @@ func scoreNodes( return } - freeSpace = getThinPoolFreeSpace(thinPool) + freeSpace = thinPool.AvailableSpace } log.Trace(fmt.Sprintf("[scoreNodes] LVMVolumeGroup %s total size: %s", lvg.Name, lvg.Status.VGSize.String())) diff --git a/images/sds-local-volume-scheduler-extender/pkg/scheduler/route.go b/images/sds-local-volume-scheduler-extender/pkg/scheduler/route.go index 85488ce3..788ebc7c 100644 --- a/images/sds-local-volume-scheduler-extender/pkg/scheduler/route.go +++ b/images/sds-local-volume-scheduler-extender/pkg/scheduler/route.go @@ -73,7 +73,7 @@ func NewHandler(ctx context.Context, cl client.Client, log logger.Logger, lvgCac }, nil } -func status(w http.ResponseWriter, r *http.Request) { +func status(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) _, err := w.Write([]byte("ok")) if err != nil { @@ -81,45 +81,14 @@ func status(w http.ResponseWriter, r *http.Request) { } } -func (s *scheduler) getCache(w http.ResponseWriter, r *http.Request) { +func (s *scheduler) getCache(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) s.cache.PrintTheCacheLog() - result := make(map[string][]struct { - pvcName string - selectedNode string - status string - size string - }) - lvgs := s.cache.GetAllLVG() for _, lvg := range lvgs { - pvcs, err := s.cache.GetAllPVCForLVG(lvg.Name) - if err != nil { - w.WriteHeader(http.StatusInternalServerError) - s.log.Error(err, "something bad") - } - - result[lvg.Name] = make([]struct { - pvcName string - selectedNode string - status string - size string - }, 0) - - for _, pvc := range pvcs { - result[lvg.Name] = append(result[lvg.Name], struct { - pvcName string - selectedNode string - status string - size string - }{pvcName: pvc.Name, selectedNode: pvc.Annotations[cache.SelectedNodeAnnotation], status: string(pvc.Status.Phase), size: pvc.Spec.Resources.Requests.Storage().String()}) - } - } - - for lvgName, pvcs := range result { - reserved, err := s.cache.GetLVGReservedSpace(lvgName) + reserved, err := s.cache.GetLVGThickReservedSpace(lvg.Name) if err != nil { w.WriteHeader(http.StatusInternalServerError) _, err = w.Write([]byte("unable to write the cache")) @@ -128,7 +97,7 @@ func (s *scheduler) getCache(w http.ResponseWriter, r *http.Request) { } } - _, err = w.Write([]byte(fmt.Sprintf("LVMVolumeGroup: %s Reserved: %s\n", lvgName, resource.NewQuantity(reserved, resource.BinarySI)))) + _, err = w.Write([]byte(fmt.Sprintf("LVMVolumeGroup: %s Thick Reserved: %s\n", lvg.Name, resource.NewQuantity(reserved, resource.BinarySI).String()))) if err != nil { w.WriteHeader(http.StatusInternalServerError) _, err = w.Write([]byte("unable to write the cache")) @@ -137,32 +106,45 @@ func (s *scheduler) getCache(w http.ResponseWriter, r *http.Request) { } } - for _, pvc := range pvcs { - _, err = w.Write([]byte(fmt.Sprintf("\tPVC: %s\n", pvc.pvcName))) + thickPvcs, err := s.cache.GetAllThickPVCLVG(lvg.Name) + for _, pvc := range thickPvcs { + _, err = w.Write([]byte(fmt.Sprintf("\t\tThick PVC: %s, reserved: %s, selected node: %s\n", pvc.Name, pvc.Spec.Resources.Requests.Storage().String(), pvc.Annotations[cache.SelectedNodeAnnotation]))) if err != nil { w.WriteHeader(http.StatusInternalServerError) s.log.Error(err, "error write response") } - _, err = w.Write([]byte(fmt.Sprintf("\t\tNodeName: %s\n", pvc.selectedNode))) + } + + for _, tp := range lvg.Status.ThinPools { + thinReserved, err := s.cache.GetLVGThinReservedSpace(lvg.Name, tp.Name) if err != nil { w.WriteHeader(http.StatusInternalServerError) s.log.Error(err, "error write response") } - _, err = w.Write([]byte(fmt.Sprintf("\t\tStatus: %s\n", pvc.status))) + _, err = w.Write([]byte(fmt.Sprintf("\tThinPool: %s, reserved: %s\n", tp.Name, resource.NewQuantity(thinReserved, resource.BinarySI).String()))) if err != nil { w.WriteHeader(http.StatusInternalServerError) s.log.Error(err, "error write response") } - _, err = w.Write([]byte(fmt.Sprintf("\t\tSize: %s\n", pvc.size))) + + thinPvcs, err := s.cache.GetAllPVCFromLVGThinPool(lvg.Name, tp.Name) if err != nil { w.WriteHeader(http.StatusInternalServerError) s.log.Error(err, "error write response") } + + for _, pvc := range thinPvcs { + _, err = w.Write([]byte(fmt.Sprintf("\t\tThin PVC: %s, reserved: %s, selected node:%s\n", pvc.Name, pvc.Spec.Resources.Requests.Storage().String(), pvc.Annotations[cache.SelectedNodeAnnotation]))) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + s.log.Error(err, "error write response") + } + } } } } -func (s *scheduler) getCacheStat(w http.ResponseWriter, r *http.Request) { +func (s *scheduler) getCacheStat(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusOK) pvcTotalCount := 0