Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[controller] Add logs and upgrading node scoring #21

Merged
merged 4 commits into from
Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion images/sds-local-volume-scheduler-extender/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ require (
github.com/go-logr/logr v1.4.1
github.com/go-logr/zapr v1.3.0
github.com/spf13/cobra v1.8.0
github.com/stretchr/testify v1.8.4
go.uber.org/zap v1.26.0
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.29.1
k8s.io/apimachinery v0.29.1
k8s.io/client-go v0.29.0
Expand Down Expand Up @@ -42,6 +44,7 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_golang v1.18.0 // indirect
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.45.0 // indirect
Expand All @@ -59,7 +62,6 @@ require (
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/apiextensions-apiserver v0.29.0 // indirect
k8s.io/component-base v0.29.0 // indirect
Expand Down
34 changes: 22 additions & 12 deletions images/sds-local-volume-scheduler-extender/pkg/scheduler/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,8 @@ func filterNodes(
return nil, err
}
log.Trace(fmt.Sprintf("[filterNodes] LVGs Thick FreeSpace: %+v", lvgsThickFree))
lvgsThickFreeMutex := &sync.RWMutex{}

scLVGs, err := getSortedLVGsFromStorageClasses(scs)
if err != nil {
return nil, err
Expand All @@ -201,13 +203,16 @@ func filterNodes(
}

commonNodes, err := getCommonNodesByStorageClasses(scs, nodeLVGs)
for nodeName := range commonNodes {
log.Trace(fmt.Sprintf("[filterNodes] common node %s", nodeName))
}

result := &ExtenderFilterResult{
Nodes: &corev1.NodeList{},
FailedNodes: FailedNodesMap{},
}
failedNodesMapMutex := &sync.Mutex{}

mutex := &sync.RWMutex{}
wg := &sync.WaitGroup{}
wg.Add(len(nodes.Items))
errs := make(chan error, len(nodes.Items)*len(pvcs))
Expand All @@ -222,7 +227,9 @@ func filterNodes(

if _, common := commonNodes[node.Name]; !common {
log.Debug(fmt.Sprintf("[filterNodes] node %s is not common for used Storage Classes", node.Name))
failedNodesMapMutex.Lock()
result.FailedNodes[node.Name] = "node is not common for used Storage Classes"
failedNodesMapMutex.Unlock()
return
}

Expand All @@ -232,34 +239,35 @@ func filterNodes(
for _, pvc := range pvcs {
pvcReq := pvcRequests[pvc.Name]
lvgsFromSC := scLVGs[*pvc.Spec.StorageClassName]
matchedLVG := findMatchedLVG(lvgsFromNode, lvgsFromSC)
if matchedLVG == nil {
commonLVG := findMatchedLVG(lvgsFromNode, lvgsFromSC)
if commonLVG == nil {
err = errors.New(fmt.Sprintf("unable to match Storage Class's LVMVolumeGroup with the node's one, Storage Class: %s, node: %s", *pvc.Spec.StorageClassName, node.Name))
errs <- err
return
}
log.Trace(fmt.Sprintf("[scoreNodes] LVMVolumeGroup %s is common for storage class %s and node %s", commonLVG.Name, *pvc.Spec.StorageClassName, node.Name))

switch pvcReq.DeviceType {
case thick:
lvg := lvgs[matchedLVG.Name]
mutex.RLock()
lvg := lvgs[commonLVG.Name]
lvgsThickFreeMutex.RLock()
freeSpace := lvgsThickFree[lvg.Name]
mutex.RUnlock()
lvgsThickFreeMutex.RUnlock()

log.Trace(fmt.Sprintf("[filterNodes] Thick free space: %d, PVC requested space: %d", freeSpace, pvcReq.RequestedSize))
log.Trace(fmt.Sprintf("[filterNodes] LVMVolumeGroup %s Thick free space: %s, PVC requested space: %s", lvg.Name, resource.NewQuantity(freeSpace, resource.BinarySI), resource.NewQuantity(pvcReq.RequestedSize, resource.BinarySI)))
if freeSpace < pvcReq.RequestedSize {
hasEnoughSpace = false
break
}

mutex.Lock()
lvgsThickFreeMutex.Lock()
lvgsThickFree[lvg.Name] -= pvcReq.RequestedSize
mutex.Unlock()
lvgsThickFreeMutex.Unlock()
case thin:
lvg := lvgs[matchedLVG.Name]
targetThinPool := findMatchedThinPool(lvg.Status.ThinPools, matchedLVG.Thin.PoolName)
lvg := lvgs[commonLVG.Name]
targetThinPool := findMatchedThinPool(lvg.Status.ThinPools, commonLVG.Thin.PoolName)
if targetThinPool == nil {
err = fmt.Errorf("unable to match Storage Class's ThinPools with the node's one, Storage Class: %s; node: %s; lvg thin pools: %+v; thin.poolName from StorageClass: %s", *pvc.Spec.StorageClassName, node.Name, lvg.Status.ThinPools, matchedLVG.Thin.PoolName)
err = fmt.Errorf("unable to match Storage Class's ThinPools with the node's one, Storage Class: %s; node: %s; lvg thin pools: %+v; thin.poolName from StorageClass: %s", *pvc.Spec.StorageClassName, node.Name, lvg.Status.ThinPools, commonLVG.Thin.PoolName)
errs <- err
return
}
Expand All @@ -283,7 +291,9 @@ func filterNodes(
}

if !hasEnoughSpace {
failedNodesMapMutex.Lock()
result.FailedNodes[node.Name] = "not enough space"
failedNodesMapMutex.Unlock()
return
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/storage/v1"
"k8s.io/apimachinery/pkg/api/resource"
"sigs.k8s.io/controller-runtime/pkg/client"
)

Expand Down Expand Up @@ -109,11 +110,14 @@ func scoreNodes(
}

usedLVGs := removeUnusedLVGs(lvgs, scLVGs)
for lvgName := range usedLVGs {
log.Trace(fmt.Sprintf("[scoreNodes] used LVMVolumeGroup %s", lvgName))
}

nodeLVGs := sortLVGsByNodeName(usedLVGs)
for n, ls := range nodeLVGs {
for _, l := range ls {
log.Trace(fmt.Sprintf("[filterNodes] the LVMVolumeGroup %s belongs to node %s", l.Name, n))
log.Trace(fmt.Sprintf("[scoreNodes] the LVMVolumeGroup %s belongs to node %s", l.Name, n))
}
}

Expand All @@ -124,9 +128,9 @@ func scoreNodes(

for i, node := range nodes.Items {
go func(i int, node corev1.Node) {
log.Debug(fmt.Sprintf("[filterNodes] gourutine %d starts the work", i))
log.Debug(fmt.Sprintf("[scoreNodes] gourutine %d starts the work", i))
defer func() {
log.Debug(fmt.Sprintf("[filterNodes] gourutine %d ends the work", i))
log.Debug(fmt.Sprintf("[scoreNodes] gourutine %d ends the work", i))
wg.Done()
}()

Expand All @@ -136,51 +140,58 @@ func scoreNodes(
for _, pvc := range pvcs {
pvcReq := pvcRequests[pvc.Name]
lvgsFromSC := scLVGs[*pvc.Spec.StorageClassName]
matchedLVG := findMatchedLVG(lvgsFromNode, lvgsFromSC)
if matchedLVG == nil {
commonLVG := findMatchedLVG(lvgsFromNode, lvgsFromSC)
if commonLVG == nil {
err = errors.New(fmt.Sprintf("unable to match Storage Class's LVMVolumeGroup with the node's one, Storage Class: %s, node: %s", *pvc.Spec.StorageClassName, node.Name))
errs <- err
return
}
log.Trace(fmt.Sprintf("[scoreNodes] LVMVolumeGroup %s is common for storage class %s and node %s", commonLVG.Name, *pvc.Spec.StorageClassName, node.Name))

var freeSpace resource.Quantity
lvg := lvgs[commonLVG.Name]
switch pvcReq.DeviceType {
case thick:
lvg := lvgs[matchedLVG.Name]
freeSpace, err := getVGFreeSpace(&lvg)
freeSpace, err = getVGFreeSpace(&lvg)
if err != nil {
errs <- err
return
}
log.Trace(fmt.Sprintf("[scoreNodes] LVMVolumeGroup %s free thick space %s", lvg.Name, freeSpace.String()))

totalFreeSpaceLeft = getFreeSpaceLeftPercent(freeSpace.Value(), pvcReq.RequestedSize)
case thin:
lvg := lvgs[matchedLVG.Name]
thinPool := findMatchedThinPool(lvg.Status.ThinPools, matchedLVG.Thin.PoolName)
thinPool := findMatchedThinPool(lvg.Status.ThinPools, commonLVG.Thin.PoolName)
if thinPool == nil {
err = errors.New(fmt.Sprintf("unable to match Storage Class's ThinPools with the node's one, Storage Class: %s, node: %s", *pvc.Spec.StorageClassName, node.Name))
log.Error(err, "an error occurs while searching for target LVMVolumeGroup")
log.Error(err, "[scoreNodes] an error occurs while searching for target LVMVolumeGroup")
errs <- err
return
}

freeSpace, err := getThinPoolFreeSpace(thinPool)
freeSpace, err = getThinPoolFreeSpace(thinPool)
if err != nil {
errs <- err
return
}

totalFreeSpaceLeft = getFreeSpaceLeftPercent(freeSpace.Value(), pvcReq.RequestedSize)
}
lvgTotalSize, err := resource.ParseQuantity(lvg.Status.VGSize)
if err != nil {
errs <- err
return
}
totalFreeSpaceLeft += getFreeSpaceLeftPercent(freeSpace.Value(), pvcReq.RequestedSize, lvgTotalSize.Value())
}

averageFreeSpace := totalFreeSpaceLeft / int64(len(pvcs))
score := getNodeScore(averageFreeSpace, divisor)
log.Trace(fmt.Sprintf("[scoreNodes] node %s has score %d with average free space left (after all PVC bounded), percent %d", node.Name, score, averageFreeSpace))

result = append(result, HostPriority{
Host: node.Name,
Score: score,
})
}(i, node)

}
wg.Wait()

Expand All @@ -194,6 +205,7 @@ func scoreNodes(
return nil, err
}

log.Trace("[scoreNodes] final result")
for _, n := range result {
log.Trace(fmt.Sprintf("[scoreNodes] host: %s", n.Host))
log.Trace(fmt.Sprintf("[scoreNodes] score: %d", n.Score))
Expand All @@ -202,13 +214,15 @@ func scoreNodes(
return result, nil
}

func getFreeSpaceLeftPercent(freeSpace int64, requestedSpace int64) int64 {
left := freeSpace - requestedSpace
return left * 100 / freeSpace
func getFreeSpaceLeftPercent(freeSize, requestedSpace, totalSize int64) int64 {
leftFreeSize := freeSize - requestedSpace
fraction := float64(leftFreeSize) / float64(totalSize)
percent := fraction * 100
return int64(percent)
}

func getNodeScore(freeSpace int64, divisor float64) int {
converted := int(math.Log2(float64(freeSpace) / divisor))
converted := int(math.Round(math.Log2(float64(freeSpace) / divisor)))
switch {
case converted < 1:
return 1
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package scheduler

import (
"math"
"testing"

"k8s.io/apimachinery/pkg/api/resource"
)

func TestPrioritize(t *testing.T) {
t.Run("getFreeSpaceLeftPercent", func(t *testing.T) {
requested := resource.MustParse("1Gi")
devisor := 1.0

totalSizeString := "327676Mi"
totalSize := resource.MustParse(totalSizeString)
allocated := resource.MustParse("211Gi")
freeSize := resource.MustParse(totalSizeString)
freeSize.Sub(allocated)
// t.Logf("freeSize=%s, requested=%s, totalSize=%s", freeSize.String(), requested.String(), totalSize.String())
// t.Logf("freeSize=%d, requested=%d, totalSize=%d", freeSize.Value(), requested.Value(), totalSize.Value())

percent := getFreeSpaceLeftPercent(freeSize.Value(), requested.Value(), totalSize.Value())
t.Logf("First freeSpacePercent %d", percent)

rawScore := int(math.Round(math.Log2(float64(percent) / devisor)))
t.Logf("rawScore1=%d", rawScore)

totalSizeString2 := "327676Mi"
totalSize2 := resource.MustParse(totalSizeString2)
allocated2 := resource.MustParse("301Gi")
freeSize2 := resource.MustParse(totalSizeString2)
freeSize2.Sub(allocated2)

percent2 := getFreeSpaceLeftPercent(freeSize2.Value(), requested.Value(), totalSize2.Value())
t.Logf("Second freeSpacePercent2 %d", percent2)

rawScore2 := int(math.Round(math.Log2(float64(percent2) / devisor)))
t.Logf("rawScore2=%d", rawScore2)
})

t.Run("getNodeScore", func(t *testing.T) {

})
}
2 changes: 1 addition & 1 deletion openapi/config-values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,4 @@ properties:
- DEBUG
- TRACE
description: Module log level
default: INFO
default: DEBUG
10 changes: 10 additions & 0 deletions templates/sds-local-volume-scheduler-extender/configmap.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,17 @@ data:
scheduler-extender-config.yaml: |-
listen: "localhost:8099"
default-divisor: 1
{{- if eq .Values.sdsLocalVolume.logLevel "ERROR" }}
log-level: "0"
{{- else if eq .Values.sdsLocalVolume.logLevel "WARN" }}
log-level: "1"
{{- else if eq .Values.sdsLocalVolume.logLevel "INFO" }}
log-level: "2"
{{- else if eq .Values.sdsLocalVolume.logLevel "DEBUG" }}
log-level: "3"
{{- else if eq .Values.sdsLocalVolume.logLevel "TRACE" }}
log-level: "4"
{{- end }}
{{- if semverCompare ">= 1.22" .Values.global.discovery.kubernetesVersion }}
scheduler-config.yaml: |-
{{- if semverCompare ">= 1.23" .Values.global.discovery.kubernetesVersion }}
Expand Down