Skip to content

Commit

Permalink
[controller] Add thin pool reserved space to the cache
Browse files Browse the repository at this point in the history
Signed-off-by: Viktor Kramarenko <viktor.kramarenko@flant.com>
  • Loading branch information
ViktorKram committed Jun 28, 2024
1 parent 77b627c commit 7e8a4a2
Show file tree
Hide file tree
Showing 10 changed files with 201 additions and 119 deletions.
1 change: 1 addition & 0 deletions images/sds-local-volume-csi/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
)

// TODO: сделать поддержку расчета свободного места при Immediate
func (d *Driver) CreateVolume(ctx context.Context, request *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) {
d.log.Info("method CreateVolume")

Expand Down
116 changes: 87 additions & 29 deletions images/sds-local-volume-scheduler-extender/pkg/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
v1 "k8s.io/api/core/v1"
slices2 "k8s.io/utils/strings/slices"
"sds-local-volume-scheduler-extender/api/v1alpha1"
"sds-local-volume-scheduler-extender/pkg/consts"
"sds-local-volume-scheduler-extender/pkg/logger"
"sync"
)
Expand Down Expand Up @@ -280,7 +281,7 @@ func (c *Cache) shouldAddPVC(pvc *v1.PersistentVolumeClaim, lvgCh *lvgCache, pvc

_, found = lvgCh.thickPVCs.Load(pvcKey)
if found {
c.log.Warning(fmt.Sprintf("[shouldAddPVC] PVC %s cache has been already added as thick to the LVMVolumeGroup %s", pvcKey, lvgName))
c.log.Debug(fmt.Sprintf("[shouldAddPVC] PVC %s was found in the cache of the LVMVolumeGroup %s", pvcKey, lvgName))
return false, nil
}

Expand All @@ -292,7 +293,7 @@ func (c *Cache) shouldAddPVC(pvc *v1.PersistentVolumeClaim, lvgCh *lvgCache, pvc
}

if _, found = thinPoolCh.(*thinPoolCache).pvcs.Load(pvcKey); found {
c.log.Debug(fmt.Sprintf("[shouldAddPVC] PVC %s was found in the Thin pool %s cache. No need to add", pvcKey, thinPoolName))
c.log.Debug(fmt.Sprintf("[shouldAddPVC] PVC %s was found in the Thin pool %s cache of the LVMVolumeGroup %s. No need to add", pvcKey, thinPoolName, lvgName))
return false, nil
}
}
Expand All @@ -316,6 +317,13 @@ func (c *Cache) AddThinPVC(lvgName, thinPoolName string, pvc *v1.PersistentVolum
return err
}

thinPoolBelongs := c.checkIfThinPoolBelongsToLVG(lvgCh.(*lvgCache), thinPoolName)
if !thinPoolBelongs {
err := fmt.Errorf("thin pool %s was not found in the LVMVolumeGroup %s", thinPoolName, lvgName)
c.log.Error(err, fmt.Sprintf("[AddThinPVC] unable to add Thin pool %s of the LVMVolumeGroup %s for the PVC %s", thinPoolName, lvgName, pvcKey))
return err
}

// this case might be triggered if the extender recovers after fail and finds some pending thickPVCs with selected nodes
c.log.Trace(fmt.Sprintf("[AddThinPVC] PVC %s/%s annotations: %v", pvc.Namespace, pvc.Name, pvc.Annotations))
shouldAdd, err := c.shouldAddPVC(pvc, lvgCh.(*lvgCache), pvcKey, lvgName, thinPoolName)
Expand All @@ -329,14 +337,28 @@ func (c *Cache) AddThinPVC(lvgName, thinPoolName string, pvc *v1.PersistentVolum
}

c.log.Debug(fmt.Sprintf("[AddThinPVC] new PVC %s cache will be added to the LVMVolumeGroup %s", pvcKey, lvgName))
c.addNewThinPVC(lvgCh.(*lvgCache), pvc, thinPoolName)
err = c.addNewThinPVC(lvgCh.(*lvgCache), pvc, thinPoolName)
if err != nil {
c.log.Error(err, fmt.Sprintf("[AddThinPVC] unable to add PVC %s to Thin Pool %s of the LVMVolumeGroup %s", pvcKey, thinPoolName, lvgName))
return err
}

return nil
}

func (c *Cache) checkIfThinPoolBelongsToLVG(lvgCh *lvgCache, thinPoolName string) bool {
for _, tp := range lvgCh.lvg.Status.ThinPools {
if tp.Name == thinPoolName {
return true
}
}

return false
}

func (c *Cache) addNewThickPVC(lvgCh *lvgCache, pvc *v1.PersistentVolumeClaim) {
pvcKey := configurePVCKey(pvc)
lvgCh.thinPools.Store(pvcKey, &pvcCache{pvc: pvc, selectedNode: pvc.Annotations[SelectedNodeAnnotation]})
lvgCh.thickPVCs.Store(pvcKey, &pvcCache{pvc: pvc, selectedNode: pvc.Annotations[SelectedNodeAnnotation]})

c.addLVGToPVC(lvgCh.lvg.Name, pvcKey)
}
Expand All @@ -350,14 +372,20 @@ func (c *Cache) addNewThinPVC(lvgCh *lvgCache, pvc *v1.PersistentVolumeClaim, th
return err
}

err := c.addNewThinPool(lvgCh, pvc, thinPoolName)
if err != nil {
c.log.Error(err, fmt.Sprintf("[addNewThinPVC] unable to add Thin pool %s in the LVMVolumeGroup %s cache for PVC %s", thinPoolName, lvgCh.lvg.Name, pvc.Name))
return err
}

thinPoolCh, found := lvgCh.thinPools.Load(thinPoolName)
if !found {
err := fmt.Errorf("thin pool %s not found", thinPoolName)
err = fmt.Errorf("thin pool %s not found", thinPoolName)
c.log.Error(err, fmt.Sprintf("[addNewThinPVC] unable to add Thin PVC %s to the cache", pvcKey))
return err
}

thinPoolCh.(*thinPoolCache).pvcs.Store(pvcKey, pvc)
thinPoolCh.(*thinPoolCache).pvcs.Store(pvcKey, &pvcCache{pvc: pvc, selectedNode: pvc.Annotations[SelectedNodeAnnotation]})
c.log.Debug(fmt.Sprintf("[addNewThinPVC] THIN PVC %s was added to the cache to Thin Pool %s", pvcKey, thinPoolName))

c.addLVGToPVC(lvgCh.lvg.Name, pvcKey)
Expand Down Expand Up @@ -459,21 +487,42 @@ func (c *Cache) addNewThinPool(lvgCh *lvgCache, pvc *v1.PersistentVolumeClaim, t
return nil
}

// GetAllThickPVCForLVG returns slice of PVC belonging to selected LVMVolumeGroup resource. If such LVMVolumeGroup is not stored in the cache, returns an error.
func (c *Cache) GetAllThickPVCForLVG(lvgName string) ([]*v1.PersistentVolumeClaim, error) {
// GetAllPVCForLVG returns slice of PVC belonging to selected LVMVolumeGroup resource. If such LVMVolumeGroup is not stored in the cache, returns an error.
func (c *Cache) GetAllPVCForLVG(lvgName string) ([]*v1.PersistentVolumeClaim, error) {
lvgCh, found := c.lvgs.Load(lvgName)
if !found {
err := fmt.Errorf("cache was not found for the LVMVolumeGroup %s", lvgName)
c.log.Error(err, fmt.Sprintf("[GetAllThickPVCForLVG] an error occured while trying to get all PVC for the LVMVolumeGroup %s", lvgName))
c.log.Error(err, fmt.Sprintf("[GetAllPVCForLVG] an error occured while trying to get all PVC for the LVMVolumeGroup %s", lvgName))
return nil, err
}

result := make([]*v1.PersistentVolumeClaim, 0, pvcPerLVGCount)
size := 0
lvgCh.(*lvgCache).thickPVCs.Range(func(key, value any) bool {
size++
return true
})
lvgCh.(*lvgCache).thinPools.Range(func(tpName, tpCh any) bool {
tpCh.(*thinPoolCache).pvcs.Range(func(key, value any) bool {
size++
return true
})
return true
})

result := make([]*v1.PersistentVolumeClaim, 0, size)
lvgCh.(*lvgCache).thickPVCs.Range(func(pvcName, pvcCh any) bool {
result = append(result, pvcCh.(*pvcCache).pvc)
return true
})

lvgCh.(*lvgCache).thinPools.Range(func(tpName, tpCh any) bool {
tpCh.(*thinPoolCache).pvcs.Range(func(pvcName, pvcCh any) bool {
result = append(result, pvcCh.(*pvcCache).pvc)
return true
})
return true
})

return result, nil
}

Expand Down Expand Up @@ -585,75 +634,75 @@ func (c *Cache) CheckIsPVCStored(pvc *v1.PersistentVolumeClaim) bool {
return false
}

// RemoveSpaceReservationForThickPVCWithSelectedNode removes space reservation for selected PVC for every LVMVolumeGroup resource, which is not bound to the PVC selected node.
func (c *Cache) RemoveSpaceReservationForThickPVCWithSelectedNode(pvc *v1.PersistentVolumeClaim, thin bool) error {
// RemoveSpaceReservationForPVCWithSelectedNode removes space reservation for selected PVC for every LVMVolumeGroup resource, which is not bound to the PVC selected node.
func (c *Cache) RemoveSpaceReservationForPVCWithSelectedNode(pvc *v1.PersistentVolumeClaim, deviceType string) error {
pvcKey := configurePVCKey(pvc)
selectedLVGName := ""

lvgNamesForPVC, found := c.pvcLVGs.Load(pvcKey)
if !found {
c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForThickPVCWithSelectedNode] cache for PVC %s has been already removed", pvcKey))
c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] cache for PVC %s has been already removed", pvcKey))
return nil
}

for _, lvgName := range lvgNamesForPVC.([]string) {
lvgCh, found := c.lvgs.Load(lvgName)
if !found || lvgCh == nil {
err := fmt.Errorf("no cache found for the LVMVolumeGroup %s", lvgName)
c.log.Error(err, fmt.Sprintf("[RemoveSpaceReservationForThickPVCWithSelectedNode] an error occured while trying to remove space reservation for PVC %s", pvcKey))
c.log.Error(err, fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] an error occured while trying to remove space reservation for PVC %s", pvcKey))
return err
}

switch thin {
case true:
switch deviceType {
case consts.Thin:
lvgCh.(*lvgCache).thinPools.Range(func(thinPoolName, thinPoolCh any) bool {
pvcCh, found := thinPoolCh.(*thinPoolCache).pvcs.Load(pvcKey)
if !found {
c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForThickPVCWithSelectedNode] PVC %s space reservation in the LVMVolumeGroup %s has been already removed", pvcKey, lvgName))
c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] PVC %s space reservation in the LVMVolumeGroup %s has been already removed", pvcKey, lvgName))
return true
}

selectedNode := pvcCh.(*pvcCache).selectedNode
if selectedNode == "" {
thinPoolCh.(*thinPoolCache).pvcs.Delete(pvcKey)
c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForThickPVCWithSelectedNode] removed space reservation for PVC %s in the Thin pool %s of the LVMVolumeGroup %s due the PVC got selected to the node %s", pvcKey, thinPoolName.(string), lvgName, pvc.Annotations[SelectedNodeAnnotation]))
c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] removed space reservation for PVC %s in the Thin pool %s of the LVMVolumeGroup %s due the PVC got selected to the node %s", pvcKey, thinPoolName.(string), lvgName, pvc.Annotations[SelectedNodeAnnotation]))
} else {
selectedLVGName = lvgName
c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForThickPVCWithSelectedNode] PVC %s got selected to the node %s. It should not be revomed from the LVMVolumeGroup %s", pvcKey, pvc.Annotations[SelectedNodeAnnotation], lvgName))
c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] PVC %s got selected to the node %s. It should not be revomed from the LVMVolumeGroup %s", pvcKey, pvc.Annotations[SelectedNodeAnnotation], lvgName))
}

return true
})
case false:
case consts.Thick:
pvcCh, found := lvgCh.(*lvgCache).thickPVCs.Load(pvcKey)
if !found {
c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForThickPVCWithSelectedNode] PVC %s space reservation in the LVMVolumeGroup %s has been already removed", pvcKey, lvgName))
c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] PVC %s space reservation in the LVMVolumeGroup %s has been already removed", pvcKey, lvgName))
continue
}

selectedNode := pvcCh.(*pvcCache).selectedNode
if selectedNode == "" {
lvgCh.(*lvgCache).thickPVCs.Delete(pvcKey)
c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForThickPVCWithSelectedNode] removed space reservation for PVC %s in the LVMVolumeGroup %s due the PVC got selected to the node %s", pvcKey, lvgName, pvc.Annotations[SelectedNodeAnnotation]))
c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] removed space reservation for PVC %s in the LVMVolumeGroup %s due the PVC got selected to the node %s", pvcKey, lvgName, pvc.Annotations[SelectedNodeAnnotation]))
} else {
selectedLVGName = lvgName
c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForThickPVCWithSelectedNode] PVC %s got selected to the node %s. It should not be revomed from the LVMVolumeGroup %s", pvcKey, pvc.Annotations[SelectedNodeAnnotation], lvgName))
c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] PVC %s got selected to the node %s. It should not be revomed from the LVMVolumeGroup %s", pvcKey, pvc.Annotations[SelectedNodeAnnotation], lvgName))
}
}
}
c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForThickPVCWithSelectedNode] PVC %s space reservation has been removed from LVMVolumeGroup cache", pvcKey))
c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] PVC %s space reservation has been removed from LVMVolumeGroup cache", pvcKey))

c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForThickPVCWithSelectedNode] cache for PVC %s will be wiped from unused LVMVolumeGroups", pvcKey))
c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] cache for PVC %s will be wiped from unused LVMVolumeGroups", pvcKey))
cleared := make([]string, 0, len(lvgNamesForPVC.([]string)))
for _, lvgName := range lvgNamesForPVC.([]string) {
if lvgName == selectedLVGName {
c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForThickPVCWithSelectedNode] the LVMVolumeGroup %s will be saved for PVC %s cache as used", lvgName, pvcKey))
c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] the LVMVolumeGroup %s will be saved for PVC %s cache as used", lvgName, pvcKey))
cleared = append(cleared, lvgName)
} else {
c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForThickPVCWithSelectedNode] the LVMVolumeGroup %s will be removed from PVC %s cache as not used", lvgName, pvcKey))
c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] the LVMVolumeGroup %s will be removed from PVC %s cache as not used", lvgName, pvcKey))
}
}
c.log.Trace(fmt.Sprintf("[RemoveSpaceReservationForThickPVCWithSelectedNode] cleared LVMVolumeGroups for PVC %s: %v", pvcKey, cleared))
c.log.Trace(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] cleared LVMVolumeGroups for PVC %s: %v", pvcKey, cleared))
c.pvcLVGs.Store(pvcKey, cleared)

return nil
Expand Down Expand Up @@ -722,7 +771,16 @@ func (c *Cache) PrintTheCacheLog() {
c.log.Cache(fmt.Sprintf("[%s]", lvgName))

lvgCh.(*lvgCache).thickPVCs.Range(func(pvcName, pvcCh any) bool {
c.log.Cache(fmt.Sprintf(" PVC %s, selected node: %s", pvcName, pvcCh.(*pvcCache).selectedNode))
c.log.Cache(fmt.Sprintf(" THICK PVC %s, selected node: %s", pvcName, pvcCh.(*pvcCache).selectedNode))
return true
})

lvgCh.(*lvgCache).thinPools.Range(func(thinPoolName, thinPoolCh any) bool {
thinPoolCh.(*thinPoolCache).pvcs.Range(func(pvcName, pvcCh any) bool {
c.log.Cache(fmt.Sprintf(" THIN POOL %s PVC %s, selected node: %s", thinPoolName, pvcName, pvcCh.(*pvcCache).selectedNode))
return true
})

return true
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ func BenchmarkCache_FullLoad(b *testing.B) {

lvgMp := cache.GetAllLVG()
for lvgName := range lvgMp {
_, err := cache.GetAllThickPVCForLVG(lvgName)
_, err := cache.GetAllPVCForLVG(lvgName)
if err != nil {
b.Error(err)
}
Expand Down
11 changes: 11 additions & 0 deletions images/sds-local-volume-scheduler-extender/pkg/consts/consts.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package consts

const (
SdsLocalVolumeProvisioner = "local.csi.storage.deckhouse.io"

LvmTypeParamKey = "local.csi.storage.deckhouse.io/lvm-type"
LvmVolumeGroupsParamKey = "local.csi.storage.deckhouse.io/lvm-volume-groups"

Thick = "Thick"
Thin = "Thin"
)
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
v1 "k8s.io/api/core/v1"
"k8s.io/client-go/util/workqueue"
"reflect"
"sds-local-volume-scheduler-extender/api/v1alpha1"
"sds-local-volume-scheduler-extender/pkg/cache"
"sds-local-volume-scheduler-extender/pkg/logger"
Expand Down Expand Up @@ -71,7 +72,7 @@ func RunLVGWatcherCacheController(
}

log.Debug(fmt.Sprintf("[RunLVGWatcherCacheController] starts to clear the cache for the LVMVolumeGroup %s", lvg.Name))
pvcs, err := cache.GetAllThickPVCForLVG(lvg.Name)
pvcs, err := cache.GetAllPVCForLVG(lvg.Name)
if err != nil {
log.Error(err, fmt.Sprintf("[RunLVGWatcherCacheController] unable to get all PVC for the LVMVolumeGroup %s", lvg.Name))
}
Expand Down Expand Up @@ -116,27 +117,31 @@ func RunLVGWatcherCacheController(
log.Trace(fmt.Sprintf("[RunLVGWatcherCacheController] old state LVMVolumeGroup %s has size %s", oldLvg.Name, oldLvg.Status.AllocatedSize.String()))
log.Trace(fmt.Sprintf("[RunLVGWatcherCacheController] new state LVMVolumeGroup %s has size %s", newLvg.Name, newLvg.Status.AllocatedSize.String()))

if newLvg.DeletionTimestamp != nil ||
oldLvg.Status.AllocatedSize.Value() == newLvg.Status.AllocatedSize.Value() {
if !shouldReconcileLVG(oldLvg, newLvg) {
log.Debug(fmt.Sprintf("[RunLVGWatcherCacheController] the LVMVolumeGroup %s should not be reconciled", newLvg.Name))
return
}
log.Debug(fmt.Sprintf("[RunLVGWatcherCacheController] the LVMVolumeGroup %s should be reconciled by Update Func", newLvg.Name))

cachedPVCs, err := cache.GetAllThickPVCForLVG(newLvg.Name)
cachedPVCs, err := cache.GetAllPVCForLVG(newLvg.Name)
if err != nil {
log.Error(err, fmt.Sprintf("[RunLVGWatcherCacheController] unable to get all PVC for the LVMVolumeGroup %s", newLvg.Name))
}

for _, pvc := range cachedPVCs {
log.Trace(fmt.Sprintf("[RunLVGWatcherCacheController] PVC %s/%s from the cache belongs to LVMVolumeGroup %s", pvc.Namespace, pvc.Name, newLvg.Name))
log.Trace(fmt.Sprintf("[RunLVGWatcherCacheController] PVC %s/%s has status phase %s", pvc.Namespace, pvc.Name, pvc.Status.Phase))
if pvc.Status.Phase == v1.ClaimBound {
log.Debug(fmt.Sprintf("[RunLVGWatcherCacheController] PVC %s/%s from the cache has Status.Phase Bound. It will be removed from the reserved space in the LVMVolumeGroup %s", pvc.Namespace, pvc.Name, newLvg.Name))
err = cache.RemoveBoundedThickPVCSpaceReservation(newLvg.Name, pvc)
if err != nil {
log.Error(err, fmt.Sprintf("[RunLVGWatcherCacheController] unable to remove PVC %s/%s from the cache in the LVMVolumeGroup %s", pvc.Namespace, pvc.Name, newLvg.Name))
continue
}
//err = cache.RemoveBoundedThickPVCSpaceReservation(newLvg.Name, pvc)
//if err != nil {
// log.Error(err, fmt.Sprintf("[RunLVGWatcherCacheController] unable to remove PVC %s/%s from the cache in the LVMVolumeGroup %s", pvc.Namespace, pvc.Name, newLvg.Name))
// continue
//}

cache.RemovePVCFromTheCache(pvc)

//err = cache.RemoveBoundedThinPVCSpaceReservation()

log.Debug(fmt.Sprintf("[RunLVGWatcherCacheController] PVC %s/%s was removed from the LVMVolumeGroup %s in the cache", pvc.Namespace, pvc.Name, newLvg.Name))
}
Expand All @@ -163,3 +168,16 @@ func RunLVGWatcherCacheController(

return c, nil
}

func shouldReconcileLVG(oldLVG, newLVG *v1alpha1.LvmVolumeGroup) bool {
if newLVG.DeletionTimestamp != nil {
return false
}

if oldLVG.Status.AllocatedSize.Value() == newLVG.Status.AllocatedSize.Value() &&
reflect.DeepEqual(oldLVG.Status.ThinPools, newLVG.Status.ThinPools) {
return false
}

return true
}
Loading

0 comments on commit 7e8a4a2

Please sign in to comment.