From eb910cf7ee45352aaffce82e1ed47515fedfd716 Mon Sep 17 00:00:00 2001 From: Viktor Kram <92625690+ViktorKram@users.noreply.github.com> Date: Fri, 16 Feb 2024 19:30:49 +0300 Subject: [PATCH] [controller] New version of extender scheduler (#5) Signed-off-by: Viktor Kramarenko --- images/sds-lvm-controller/Makefile | 41 ++ .../cmd/cmd/root.go | 3 +- .../pkg/scheduler/filter.go | 475 ++++++++++++++---- .../pkg/scheduler/prioritize.go | 179 +++++-- .../pkg/scheduler/route.go | 11 +- 5 files changed, 570 insertions(+), 139 deletions(-) create mode 100644 images/sds-lvm-controller/Makefile diff --git a/images/sds-lvm-controller/Makefile b/images/sds-lvm-controller/Makefile new file mode 100644 index 00000000..492f3644 --- /dev/null +++ b/images/sds-lvm-controller/Makefile @@ -0,0 +1,41 @@ +SHELL := /bin/bash +TEST-ARGS=-race -timeout 30s -count 1 +base_golang_20_alpine := golang:1.20-alpine3.18 +repo_fox := registry.flant.com/deckhouse/storage +image_name := sds-lvm + + +deployment_name=sds-lvm-controller +deployment_container_name=sds-lvm-controller +namespace=d8-sds-lvm +system_namespace=d8-system +pull_secret_name=fox-registry + +USER := $(shell whoami) +image_tag := sds-lvm-controller-$(USER) + +run: ## run go + go run -race ./cmd/main.go + +test: + go test $(TEST-ARGS) ./... + +fox_build_and_push: + docker build --build-arg="BASE_GOLANG_20_ALPINE_BUILDER=$(base_golang_20_alpine)" . -t $(repo_fox)/$(image_name):$(image_tag) + docker push $(repo_fox)/$(image_name):$(image_tag) + +redeploy: + kubectl -n $(namespace) rollout restart deployment $(deployment_name) + kubectl -n $(namespace) rollout restart daemonset $(daemonset_name) + +switch_to_local_dev: + kubectl -n $(system_namespace) scale deployment deckhouse --replicas=0 + kubectl -n $(namespace) patch deployment $(deployment_name) -p \ + '{"spec": {"template": {"spec": {"containers": [{"name": "$(deployment_container_name)", "image": "$(repo_fox)/$(image_name):$(image_tag)"}]}}}}' + kubectl -n $(namespace) patch deployment $(deployment_name) -p \ + '{"spec": {"template": {"spec": {"containers": [{"name": "$(deployment_container_name)", "imagePullPolicy": "Always"}]}}}}' + kubectl -n $(namespace) patch deployment $(deployment_name) --type='json' \ + -p='[{"op": "add", "path": "/spec/template/spec/imagePullSecrets", "value": [{"name": "$(pull_secret_name)"}]}]' + + +.PHONY: switch_to_local_dev diff --git a/images/sds-lvm-scheduler-extender/cmd/cmd/root.go b/images/sds-lvm-scheduler-extender/cmd/cmd/root.go index f38efc41..80265eb9 100644 --- a/images/sds-lvm-scheduler-extender/cmd/cmd/root.go +++ b/images/sds-lvm-scheduler-extender/cmd/cmd/root.go @@ -83,6 +83,7 @@ func subMain(parentCtx context.Context) error { } } + ctx := context.Background() log, err := logger.NewLogger(logger.Verbosity(config.LogLevel)) if err != nil { fmt.Println(fmt.Sprintf("[subMain] unable to initialize logger, err: %s", err.Error())) @@ -111,7 +112,7 @@ func subMain(parentCtx context.Context) error { WarningHandler: client.WarningHandlerOptions{}, }) - h, err := scheduler.NewHandler(cl, *log, config.DefaultDivisor) + h, err := scheduler.NewHandler(ctx, cl, *log, config.DefaultDivisor) if err != nil { return err } diff --git a/images/sds-lvm-scheduler-extender/pkg/scheduler/filter.go b/images/sds-lvm-scheduler-extender/pkg/scheduler/filter.go index b48533f4..249192e0 100644 --- a/images/sds-lvm-scheduler-extender/pkg/scheduler/filter.go +++ b/images/sds-lvm-scheduler-extender/pkg/scheduler/filter.go @@ -3,21 +3,26 @@ package scheduler import ( "context" "encoding/json" + "errors" "fmt" - corev1 "k8s.io/api/core/v1" - v1 "k8s.io/api/storage/v1" - "k8s.io/apimachinery/pkg/api/resource" "net/http" "sds-lvm-scheduler-extender/api/v1alpha1" "sds-lvm-scheduler-extender/pkg/logger" - "sigs.k8s.io/controller-runtime/pkg/client" "sync" + + "gopkg.in/yaml.v3" + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/storage/v1" + "k8s.io/apimachinery/pkg/api/resource" + "sigs.k8s.io/controller-runtime/pkg/client" ) const ( - lvmTypeParamKey = "local-lvm.csi.storage.deckhouse.io/lvm-type" - thick = "Thick" - thin = "Thin" + lvmTypeParamKey = "lvm.csi.storage.deckhouse.io/lvm-type" + lvmVolumeGroupsParamKey = "lvm.csi.storage.deckhouse.io/lvm-volume-groups" + + thick = "Thick" + thin = "Thin" ) func (s scheduler) filter(w http.ResponseWriter, r *http.Request) { @@ -35,21 +40,41 @@ func (s scheduler) filter(w http.ResponseWriter, r *http.Request) { s.log.Trace(fmt.Sprintf("[filter] a node from request, name :%s", n.Name)) } - s.log.Debug("[filter] starts to extract requested size") - requested, err := extractRequestedSize(s.client, s.log, input.Pod) + pvcs, err := getUsedPVC(s.ctx, s.client, input.Pod) + if err != nil { + s.log.Error(err, "[filter] unable to get PVC from the Pod") + http.Error(w, "bad request", http.StatusBadRequest) + return + } + for _, pvc := range pvcs { + s.log.Trace(fmt.Sprintf("[filter] used PVC: %s", pvc.Name)) + } + + scs, err := getStorageClassesUsedByPVCs(s.ctx, s.client, pvcs) + if err != nil { + s.log.Error(err, "[filter] unable to get StorageClasses from the PVC") + http.Error(w, "bad request", http.StatusBadRequest) + return + } + for _, sc := range scs { + s.log.Trace(fmt.Sprintf("[filter] used StorageClasses: %s", sc.Name)) + } + + s.log.Debug("[filter] starts to extract pvcRequests size") + pvcRequests, err := extractRequestedSize(s.ctx, s.client, s.log, pvcs, scs) if err != nil { s.log.Error(err, fmt.Sprintf("[filter] unable to extract request size for a pod %s", input.Pod.Name)) http.Error(w, "bad request", http.StatusBadRequest) } - s.log.Debug("[filter] successfully extracted the requested size") + s.log.Debug("[filter] successfully extracted the pvcRequests size") - s.log.Debug("[filter] starts to filter requested nodes") - result, err := filterNodes(s.client, s.log, *input.Nodes, requested) + s.log.Debug("[filter] starts to filter the nodes") + result, err := filterNodes(s.ctx, s.client, s.log, input.Nodes, pvcs, scs, pvcRequests) if err != nil { - s.log.Error(err, "[filter] unable to filter requested nodes") + s.log.Error(err, "[filter] unable to filter the nodes") http.Error(w, "bad request", http.StatusBadRequest) } - s.log.Debug("[filter] successfully filtered the requested nodes") + s.log.Debug("[filter] successfully filtered the nodes") w.Header().Set("content-type", "application/json") err = json.NewEncoder(w).Encode(result) @@ -60,82 +85,101 @@ func (s scheduler) filter(w http.ResponseWriter, r *http.Request) { s.log.Debug("[filter] ends the serving") } -func extractRequestedSize(cl client.Client, log logger.Logger, pod *corev1.Pod) (map[string]int64, error) { - ctx := context.Background() - usedPvc := make([]string, 0, len(pod.Spec.Volumes)) - for _, v := range pod.Spec.Volumes { - if v.PersistentVolumeClaim != nil { - usedPvc = append(usedPvc, v.PersistentVolumeClaim.ClaimName) - } - } - - pvcs := &corev1.PersistentVolumeClaimList{} - err := cl.List(ctx, pvcs) - if err != nil { - return nil, err - } - - pvcMap := make(map[string]corev1.PersistentVolumeClaim, len(pvcs.Items)) - for _, pvc := range pvcs.Items { - pvcMap[pvc.Name] = pvc - } +type PVCRequest struct { + DeviceType string + RequestedSize int64 +} - scs := &v1.StorageClassList{} - err = cl.List(ctx, scs) +func extractRequestedSize( + ctx context.Context, + cl client.Client, + log logger.Logger, + pvcs map[string]corev1.PersistentVolumeClaim, + scs map[string]v1.StorageClass, +) (map[string]PVCRequest, error) { + pvs, err := getPersistentVolumes(ctx, cl) if err != nil { return nil, err } - scMap := make(map[string]v1.StorageClass, len(scs.Items)) - for _, sc := range scs.Items { - scMap[sc.Name] = sc - } - - result := make(map[string]int64, 2) - - for _, pvName := range usedPvc { - pv := pvcMap[pvName] + pvcRequests := make(map[string]PVCRequest, len(pvcs)) + for _, pvc := range pvcs { + sc := scs[*pvc.Spec.StorageClassName] + switch pvc.Status.Phase { + case corev1.ClaimPending: + switch sc.Parameters[lvmTypeParamKey] { + case thick: + pvcRequests[pvc.Name] = PVCRequest{ + DeviceType: thick, + RequestedSize: pvc.Spec.Resources.Requests.Storage().Value(), + } + case thin: + pvcRequests[pvc.Name] = PVCRequest{ + DeviceType: thin, + RequestedSize: pvc.Spec.Resources.Requests.Storage().Value(), + } + } - scName := pv.Spec.StorageClassName - sc := scMap[*scName] - log.Trace(fmt.Sprintf("[extractRequestedSize] StorageClass %s has LVMType %s", sc.Name, sc.Parameters[lvmTypeParamKey])) - switch sc.Parameters[lvmTypeParamKey] { - case thick: - result[thick] += pv.Spec.Resources.Requests.Storage().Value() - case thin: - result[thin] += pv.Spec.Resources.Requests.Storage().Value() + case corev1.ClaimBound: + pv := pvs[pvc.Spec.VolumeName] + switch sc.Parameters[lvmTypeParamKey] { + case thick: + pvcRequests[pvc.Name] = PVCRequest{ + DeviceType: thick, + RequestedSize: pvc.Spec.Resources.Requests.Storage().Value() - pv.Spec.Capacity.Storage().Value(), + } + case thin: + pvcRequests[pvc.Name] = PVCRequest{ + DeviceType: thick, + RequestedSize: pvc.Spec.Resources.Requests.Storage().Value() - pv.Spec.Capacity.Storage().Value(), + } + } } } - for t, s := range result { - log.Trace(fmt.Sprintf("[extractRequestedSize] pod %s has requested type: %s, size: %d", pod.Name, t, s)) + for name, req := range pvcRequests { + log.Trace(fmt.Sprintf("[extractRequestedSize] pvc %s has requested size: %d, device type: %s", name, req.RequestedSize, req.DeviceType)) } - return result, nil + return pvcRequests, nil } -func filterNodes(cl client.Client, log logger.Logger, nodes corev1.NodeList, requested map[string]int64) (*ExtenderFilterResult, error) { - if len(requested) == 0 { +func filterNodes( + ctx context.Context, + cl client.Client, + log logger.Logger, + nodes *corev1.NodeList, + pvcs map[string]corev1.PersistentVolumeClaim, + scs map[string]v1.StorageClass, + pvcRequests map[string]PVCRequest, +) (*ExtenderFilterResult, error) { + // Param "pvcRequests" is a total amount of the pvcRequests space (both thick and thin) for Pod (i.e. from every PVC) + if len(pvcRequests) == 0 { return &ExtenderFilterResult{ - Nodes: &nodes, + Nodes: nodes, }, nil } - ctx := context.Background() - lvgl := &v1alpha1.LvmVolumeGroupList{} - err := cl.List(ctx, lvgl) + lvgs, err := getLVMVolumeGroups(ctx, cl) if err != nil { return nil, err } - lvgByNodes := make(map[string][]v1alpha1.LvmVolumeGroup, len(lvgl.Items)) - for _, lvg := range lvgl.Items { - for _, node := range lvg.Status.Nodes { - lvgByNodes[node.Name] = append(lvgByNodes[node.Name], lvg) + scLVGs, err := getLVGsFromStorageClasses(scs) + if err != nil { + return nil, err + } + + usedLVGs := removeUnusedLVGs(lvgs, scLVGs) + + nodeLVGs := sortLVGsByNodeName(usedLVGs) + for n, ls := range nodeLVGs { + for _, l := range ls { + log.Trace(fmt.Sprintf("[filterNodes] the LVMVolumeGroup %s belongs to node %s", l.Name, n)) } } - log.Trace(fmt.Sprintf("[filterNodes] sorted LVG by nodes: %+v", lvgByNodes)) + generalNodes, err := getGeneralNodesByStorageClasses(scs, nodeLVGs) result := &ExtenderFilterResult{ Nodes: &corev1.NodeList{}, @@ -144,28 +188,97 @@ func filterNodes(cl client.Client, log logger.Logger, nodes corev1.NodeList, req wg := &sync.WaitGroup{} wg.Add(len(nodes.Items)) - - for _, node := range nodes.Items { - go func(node corev1.Node) { - defer wg.Done() - - lvgs := lvgByNodes[node.Name] - freeSpace, err := getNodeFreeSpace(lvgs) - if err != nil { - log.Error(err, fmt.Sprintf("[filterNodes] unable to get node free space, node: %s, lvgs: %+v", node.Name, lvgs)) - result.FailedNodes[node.Name] = "error occurred while counting free space" + errs := make(chan error, len(nodes.Items)*len(pvcs)) + + for i, node := range nodes.Items { + go func(i int, node corev1.Node) { + log.Debug(fmt.Sprintf("[filterNodes] gourutine %d starts the work", i)) + defer func() { + log.Debug(fmt.Sprintf("[filterNodes] gourutine %d ends the work", i)) + wg.Done() + }() + + if _, general := generalNodes[node.Name]; !general { + log.Debug(fmt.Sprintf("[filterNodes] node %s is not general for used Storage Classes", node.Name)) + result.FailedNodes[node.Name] = "node is not general for used Storage Classes" return } - if freeSpace[thick] < requested[thick] || - freeSpace[thin] < requested[thin] { + + lvgsFromNode := generalNodes[node.Name] + hasEnoughSpace := true + + for _, pvc := range pvcs { + pvcReq := pvcRequests[pvc.Name] + lvgsFromSC := scLVGs[*pvc.Spec.StorageClassName] + matchedLVG := findMatchedLVG(lvgsFromNode, lvgsFromSC) + if matchedLVG == nil { + err = errors.New(fmt.Sprintf("unable to match Storage Class's LVMVolumeGroup with the node's one, Storage Class: %s, node: %s", *pvc.Spec.StorageClassName, node.Name)) + errs <- err + return + } + + switch pvcReq.DeviceType { + case thick: + lvg := lvgs[matchedLVG.Name] + freeSpace, err := getVGFreeSpace(&lvg) + if err != nil { + errs <- err + return + } + + log.Trace(fmt.Sprintf("[filterNodes] ThinPool free space: %d, PVC requested space: %d", freeSpace.Value(), pvcReq.RequestedSize)) + if freeSpace.Value() < pvcReq.RequestedSize { + hasEnoughSpace = false + } + + case thin: + lvg := lvgs[matchedLVG.Name] + targetThinPool := findMatchedThinPool(lvg.Status.ThinPools, matchedLVG.Thin.PoolName) + if targetThinPool == nil { + err = errors.New(fmt.Sprintf("unable to match Storage Class's ThinPools with the node's one, Storage Class: %s, node: %s", *pvc.Spec.StorageClassName, node.Name)) + errs <- err + return + } + // TODO: add after overCommit implementation + // freeSpace, err := getThinPoolFreeSpace(targetThinPool) + // if err != nil { + // errs <- err + // return + // } + + // log.Trace(fmt.Sprintf("[filterNodes] ThinPool free space: %d, PVC requested space: %d", freeSpace.Value(), pvcReq.RequestedSize)) + + // if freeSpace.Value() < pvcReq.RequestedSize { + // hasEnoughSpace = false + // } + } + + if !hasEnoughSpace { + break + } + } + + if !hasEnoughSpace { result.FailedNodes[node.Name] = "not enough space" return } + // TODO: add logic to filter nodes when pvcs has same storage class + result.Nodes.Items = append(result.Nodes.Items, node) - }(node) + }(i, node) } wg.Wait() + log.Debug("[filterNodes] goroutines work is done") + if len(errs) != 0 { + for err = range errs { + log.Error(err, "[filterNodes] an error occurs while filtering the nodes") + } + } + close(errs) + if err != nil { + return nil, err + } for _, node := range result.Nodes.Items { log.Trace(fmt.Sprintf("[filterNodes] suitable node: %s", node.Name)) @@ -178,32 +291,146 @@ func filterNodes(cl client.Client, log logger.Logger, nodes corev1.NodeList, req return result, nil } -func getNodeFreeSpace(lvgs []v1alpha1.LvmVolumeGroup) (map[string]int64, error) { - freeSpaces := make(map[string]int64, 2) +func findMatchedThinPool(thinPools []v1alpha1.StatusThinPool, name string) *v1alpha1.StatusThinPool { + for _, tp := range thinPools { + if tp.Name == name { + return &tp + } + } - for _, lvg := range lvgs { - for _, tp := range lvg.Status.ThinPools { - thinSpace, err := getThinPoolFreeSpace(tp) + return nil +} + +func findMatchedLVG(nodeLVGs []v1alpha1.LvmVolumeGroup, scLVGs LVMVolumeGroups) *LVMVolumeGroup { + nodeLVGNames := make(map[string]struct{}, len(nodeLVGs)) + for _, lvg := range nodeLVGs { + nodeLVGNames[lvg.Name] = struct{}{} + } + + for _, lvg := range scLVGs { + if _, match := nodeLVGNames[lvg.Name]; match { + return &lvg + } + } + + return nil +} + +func getGeneralNodesByStorageClasses(scs map[string]v1.StorageClass, nodesWithLVGs map[string][]v1alpha1.LvmVolumeGroup) (map[string][]v1alpha1.LvmVolumeGroup, error) { + result := make(map[string][]v1alpha1.LvmVolumeGroup, len(nodesWithLVGs)) + + for nodeName, lvgs := range nodesWithLVGs { + lvgNames := make(map[string]struct{}, len(lvgs)) + for _, l := range lvgs { + lvgNames[l.Name] = struct{}{} + } + + nodeIncludesLVG := true + for _, sc := range scs { + scLvgs, err := extractLVGsFromSC(sc) if err != nil { return nil, err } - if freeSpaces[thin] < thinSpace.Value() { - freeSpaces[thin] = thinSpace.Value() + contains := false + for _, lvg := range scLvgs { + if _, exist := lvgNames[lvg.Name]; exist { + contains = true + break + } + } + + if !contains { + nodeIncludesLVG = false + break } } - thickSpace, err := getVGFreeSpace(&lvg) + if nodeIncludesLVG { + result[nodeName] = lvgs + } + } + + return result, nil +} + +func removeUnusedLVGs(lvgs map[string]v1alpha1.LvmVolumeGroup, scsLVGs map[string]LVMVolumeGroups) map[string]v1alpha1.LvmVolumeGroup { + result := make(map[string]v1alpha1.LvmVolumeGroup, len(lvgs)) + usedLvgs := make(map[string]struct{}, len(lvgs)) + + for _, scLvgs := range scsLVGs { + for _, lvg := range scLvgs { + usedLvgs[lvg.Name] = struct{}{} + } + } + + for _, lvg := range lvgs { + if _, used := usedLvgs[lvg.Name]; used { + result[lvg.Name] = lvg + } + } + + return result +} + +func getLVGsFromStorageClasses(scs map[string]v1.StorageClass) (map[string]LVMVolumeGroups, error) { + result := make(map[string]LVMVolumeGroups, len(scs)) + + for _, sc := range scs { + lvgs, err := extractLVGsFromSC(sc) if err != nil { return nil, err } - if freeSpaces[thick] < thickSpace.Value() { - freeSpaces[thick] = thickSpace.Value() + for _, lvg := range lvgs { + result[sc.Name] = append(result[sc.Name], lvg) } } - return freeSpaces, nil + return result, nil +} + +type LVMVolumeGroup struct { + Name string `yaml:"name"` + Thin struct { + PoolName string `yaml:"poolName"` + } `yaml:"thin"` +} +type LVMVolumeGroups []LVMVolumeGroup + +func extractLVGsFromSC(sc v1.StorageClass) (LVMVolumeGroups, error) { + var lvmVolumeGroups LVMVolumeGroups + err := yaml.Unmarshal([]byte(sc.Parameters[lvmVolumeGroupsParamKey]), &lvmVolumeGroups) + if err != nil { + return nil, err + } + return lvmVolumeGroups, nil +} + +func sortLVGsByNodeName(lvgs map[string]v1alpha1.LvmVolumeGroup) map[string][]v1alpha1.LvmVolumeGroup { + sorted := make(map[string][]v1alpha1.LvmVolumeGroup, len(lvgs)) + for _, lvg := range lvgs { + for _, node := range lvg.Status.Nodes { + sorted[node.Name] = append(sorted[node.Name], lvg) + } + } + + return sorted +} + +func getLVMVolumeGroups(ctx context.Context, cl client.Client) (map[string]v1alpha1.LvmVolumeGroup, error) { + lvgl := &v1alpha1.LvmVolumeGroupList{} + err := cl.List(ctx, lvgl) + if err != nil { + return nil, err + } + + lvgMap := make(map[string]v1alpha1.LvmVolumeGroup, len(lvgl.Items)) + for _, lvg := range lvgl.Items { + lvgMap[lvg.Name] = lvg + } + + return lvgMap, nil } func getVGFreeSpace(lvg *v1alpha1.LvmVolumeGroup) (resource.Quantity, error) { @@ -221,7 +448,7 @@ func getVGFreeSpace(lvg *v1alpha1.LvmVolumeGroup) (resource.Quantity, error) { return free, nil } -func getThinPoolFreeSpace(tp v1alpha1.StatusThinPool) (resource.Quantity, error) { +func getThinPoolFreeSpace(tp *v1alpha1.StatusThinPool) (resource.Quantity, error) { free := tp.ActualSize used, err := resource.ParseQuantity(tp.UsedSize) if err != nil { @@ -231,3 +458,69 @@ func getThinPoolFreeSpace(tp v1alpha1.StatusThinPool) (resource.Quantity, error) return free, nil } + +func getPersistentVolumes(ctx context.Context, cl client.Client) (map[string]corev1.PersistentVolume, error) { + pvs := &corev1.PersistentVolumeList{} + err := cl.List(ctx, pvs) + if err != nil { + return nil, err + } + + pvMap := make(map[string]corev1.PersistentVolume, len(pvs.Items)) + for _, pv := range pvs.Items { + pvMap[pv.Name] = pv + } + + return pvMap, nil +} + +func getStorageClassesUsedByPVCs(ctx context.Context, cl client.Client, pvcs map[string]corev1.PersistentVolumeClaim) (map[string]v1.StorageClass, error) { + scs := &v1.StorageClassList{} + err := cl.List(ctx, scs) + if err != nil { + return nil, err + } + + scMap := make(map[string]v1.StorageClass, len(scs.Items)) + for _, sc := range scs.Items { + scMap[sc.Name] = sc + } + + result := make(map[string]v1.StorageClass, len(pvcs)) + for _, pvc := range pvcs { + if pvc.Spec.StorageClassName == nil { + err = errors.New(fmt.Sprintf("not StorageClass specified for PVC %s", pvc.Name)) + return nil, err + } + + scName := *pvc.Spec.StorageClassName + if sc, match := scMap[scName]; match { + result[sc.Name] = sc + } + } + + return result, nil +} + +func getUsedPVC(ctx context.Context, cl client.Client, pod *corev1.Pod) (map[string]corev1.PersistentVolumeClaim, error) { + usedPvc := make(map[string]corev1.PersistentVolumeClaim, len(pod.Spec.Volumes)) + + pvcs := &corev1.PersistentVolumeClaimList{} + err := cl.List(ctx, pvcs) + if err != nil { + return nil, err + } + + pvcMap := make(map[string]corev1.PersistentVolumeClaim, len(pvcs.Items)) + for _, pvc := range pvcs.Items { + pvcMap[pvc.Name] = pvc + } + + for _, volume := range pod.Spec.Volumes { + if volume.PersistentVolumeClaim != nil { + usedPvc[volume.PersistentVolumeClaim.ClaimName] = pvcMap[volume.PersistentVolumeClaim.ClaimName] + } + } + + return usedPvc, nil +} diff --git a/images/sds-lvm-scheduler-extender/pkg/scheduler/prioritize.go b/images/sds-lvm-scheduler-extender/pkg/scheduler/prioritize.go index f4726ff8..c56fa9fc 100644 --- a/images/sds-lvm-scheduler-extender/pkg/scheduler/prioritize.go +++ b/images/sds-lvm-scheduler-extender/pkg/scheduler/prioritize.go @@ -3,14 +3,16 @@ package scheduler import ( "context" "encoding/json" + "errors" "fmt" - corev1 "k8s.io/api/core/v1" "math" "net/http" - "sds-lvm-scheduler-extender/api/v1alpha1" "sds-lvm-scheduler-extender/pkg/logger" - "sigs.k8s.io/controller-runtime/pkg/client" "sync" + + corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/storage/v1" + "sigs.k8s.io/controller-runtime/pkg/client" ) func (s scheduler) prioritize(w http.ResponseWriter, r *http.Request) { @@ -24,12 +26,42 @@ func (s scheduler) prioritize(w http.ResponseWriter, r *http.Request) { return } - result, err := scoreNodes(s.client, s.log, input.Nodes.Items, s.defaultDivisor) + pvcs, err := getUsedPVC(s.ctx, s.client, input.Pod) + if err != nil { + s.log.Error(err, "[prioritize] unable to get PVC from the Pod") + http.Error(w, "bad request", http.StatusBadRequest) + return + } + for _, pvc := range pvcs { + s.log.Trace(fmt.Sprintf("[prioritize] used PVC: %s", pvc.Name)) + } + + scs, err := getStorageClassesUsedByPVCs(s.ctx, s.client, pvcs) + if err != nil { + s.log.Error(err, "[prioritize] unable to get StorageClasses from the PVC") + http.Error(w, "bad request", http.StatusBadRequest) + return + } + for _, sc := range scs { + s.log.Trace(fmt.Sprintf("[prioritize] used StorageClasses: %s", sc.Name)) + } + + s.log.Debug("[prioritize] starts to extract pvcRequests size") + pvcRequests, err := extractRequestedSize(s.ctx, s.client, s.log, pvcs, scs) + if err != nil { + s.log.Error(err, fmt.Sprintf("[filter] unable to extract request size for a pod %s", input.Pod.Name)) + http.Error(w, "bad request", http.StatusBadRequest) + } + s.log.Debug("[filter] successfully extracted the pvcRequests size") + + s.log.Debug("[prioritize] starts to score the nodes") + result, err := scoreNodes(s.ctx, s.client, s.log, input.Nodes, pvcs, scs, pvcRequests, s.defaultDivisor) if err != nil { s.log.Error(err, "[prioritize] unable to score nodes") http.Error(w, "Bad Request.", http.StatusBadRequest) return } + s.log.Debug("[prioritize] successfully scored the nodes") w.Header().Set("content-type", "application/json") err = json.NewEncoder(w).Encode(result) @@ -40,46 +72,112 @@ func (s scheduler) prioritize(w http.ResponseWriter, r *http.Request) { s.log.Debug("[prioritize] ends serving") } -func scoreNodes(cl client.Client, log logger.Logger, nodes []corev1.Node, divisor float64) ([]HostPriority, error) { - ctx := context.Background() - lvgl := &v1alpha1.LvmVolumeGroupList{} - err := cl.List(ctx, lvgl) +func scoreNodes( + ctx context.Context, + cl client.Client, + log logger.Logger, + nodes *corev1.NodeList, + pvcs map[string]corev1.PersistentVolumeClaim, + scs map[string]v1.StorageClass, + pvcRequests map[string]PVCRequest, + divisor float64, +) ([]HostPriority, error) { + lvgs, err := getLVMVolumeGroups(ctx, cl) if err != nil { return nil, err } - lvgByNodes := make(map[string][]v1alpha1.LvmVolumeGroup, len(lvgl.Items)) - for _, lvg := range lvgl.Items { - for _, node := range lvg.Status.Nodes { - lvgByNodes[node.Name] = append(lvgByNodes[node.Name], lvg) - } + scLVGs, err := getLVGsFromStorageClasses(scs) + if err != nil { + return nil, err } - log.Trace(fmt.Sprintf("[scoreNodes] sorted LVG by nodes: %+v", lvgByNodes)) + usedLVGs := removeUnusedLVGs(lvgs, scLVGs) - result := make([]HostPriority, 0, len(nodes)) + nodeLVGs := sortLVGsByNodeName(usedLVGs) + for n, ls := range nodeLVGs { + for _, l := range ls { + log.Trace(fmt.Sprintf("[filterNodes] the LVMVolumeGroup %s belongs to node %s", l.Name, n)) + } + } - // TODO: probably should score the nodes exactly to their free space + result := make([]HostPriority, 0, len(nodes.Items)) wg := &sync.WaitGroup{} - wg.Add(len(nodes)) - - for _, node := range nodes { - go func(node corev1.Node) { - defer wg.Done() - - lvgs := lvgByNodes[node.Name] - freeSpace, err := getNodeFreeSpace(lvgs) - if err != nil { - log.Error(err, fmt.Sprintf("[scoreNodes] unable to get node free space, node: %s, lvgs: %+v", node.Name, lvgs)) - return + wg.Add(len(nodes.Items)) + errs := make(chan error, len(pvcs)*len(nodes.Items)) + + for i, node := range nodes.Items { + go func(i int, node corev1.Node) { + log.Debug(fmt.Sprintf("[filterNodes] gourutine %d starts the work", i)) + defer func() { + log.Debug(fmt.Sprintf("[filterNodes] gourutine %d ends the work", i)) + wg.Done() + }() + + lvgsFromNode := nodeLVGs[node.Name] + var totalFreeSpaceLeft int64 + // TODO: change pvs to vgs + for _, pvc := range pvcs { + pvcReq := pvcRequests[pvc.Name] + lvgsFromSC := scLVGs[*pvc.Spec.StorageClassName] + matchedLVG := findMatchedLVG(lvgsFromNode, lvgsFromSC) + if matchedLVG == nil { + err = errors.New(fmt.Sprintf("unable to match Storage Class's LVMVolumeGroup with the node's one, Storage Class: %s, node: %s", *pvc.Spec.StorageClassName, node.Name)) + errs <- err + return + } + + switch pvcReq.DeviceType { + case thick: + lvg := lvgs[matchedLVG.Name] + freeSpace, err := getVGFreeSpace(&lvg) + if err != nil { + errs <- err + return + } + + totalFreeSpaceLeft = getFreeSpaceLeftPercent(freeSpace.Value(), pvcReq.RequestedSize) + case thin: + lvg := lvgs[matchedLVG.Name] + thinPool := findMatchedThinPool(lvg.Status.ThinPools, matchedLVG.Thin.PoolName) + if thinPool == nil { + err = errors.New(fmt.Sprintf("unable to match Storage Class's ThinPools with the node's one, Storage Class: %s, node: %s", *pvc.Spec.StorageClassName, node.Name)) + log.Error(err, "an error occurs while searching for target LVMVolumeGroup") + errs <- err + return + } + + freeSpace, err := getThinPoolFreeSpace(thinPool) + if err != nil { + errs <- err + return + } + + totalFreeSpaceLeft = getFreeSpaceLeftPercent(freeSpace.Value(), pvcReq.RequestedSize) + } } - score := getNodeScore(freeSpace, divisor) - result = append(result, HostPriority{Host: node.Name, Score: score}) - }(node) + averageFreeSpace := totalFreeSpaceLeft / int64(len(pvcs)) + score := getNodeScore(averageFreeSpace, divisor) + result = append(result, HostPriority{ + Host: node.Name, + Score: score, + }) + }(i, node) + } wg.Wait() + if len(errs) != 0 { + for err = range errs { + log.Error(err, "[scoreNodes] an error occurs while scoring the nodes") + } + } + close(errs) + if err != nil { + return nil, err + } + for _, n := range result { log.Trace(fmt.Sprintf("[scoreNodes] host: %s", n.Host)) log.Trace(fmt.Sprintf("[scoreNodes] score: %d", n.Score)) @@ -88,22 +186,13 @@ func scoreNodes(cl client.Client, log logger.Logger, nodes []corev1.Node, diviso return result, nil } -func getNodeScore(freeSpace map[string]int64, divisor float64) int { - capacity := freeSpace[thin] + freeSpace[thick] - gb := capacity >> 30 - - // Avoid logarithm of zero, which diverges to negative infinity. - if gb == 0 { - // If there is a non-nil capacity, but we don't have at least one gigabyte, we score it with one. - // This is because the capacityToScore precision is at the gigabyte level. - if capacity > 0 { - return 1 - } - - return 0 - } +func getFreeSpaceLeftPercent(freeSpace int64, requestedSpace int64) int64 { + left := freeSpace - requestedSpace + return left * 100 / freeSpace +} - converted := int(math.Log2(float64(gb) / divisor)) +func getNodeScore(freeSpace int64, divisor float64) int { + converted := int(math.Log2(float64(freeSpace) / divisor)) switch { case converted < 1: return 1 diff --git a/images/sds-lvm-scheduler-extender/pkg/scheduler/route.go b/images/sds-lvm-scheduler-extender/pkg/scheduler/route.go index d99ddf54..c6cd38bf 100644 --- a/images/sds-lvm-scheduler-extender/pkg/scheduler/route.go +++ b/images/sds-lvm-scheduler-extender/pkg/scheduler/route.go @@ -1,6 +1,7 @@ package scheduler import ( + "context" "fmt" "net/http" "sds-lvm-scheduler-extender/pkg/logger" @@ -12,6 +13,7 @@ type scheduler struct { defaultDivisor float64 log logger.Logger client client.Client + ctx context.Context } func (s scheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { @@ -34,8 +36,13 @@ func (s scheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { } // NewHandler return new http.Handler of the scheduler extender -func NewHandler(cl client.Client, log logger.Logger, defaultDiv float64) (http.Handler, error) { - return scheduler{defaultDiv, log, cl}, nil +func NewHandler(ctx context.Context, cl client.Client, log logger.Logger, defaultDiv float64) (http.Handler, error) { + return scheduler{ + defaultDivisor: defaultDiv, + log: log, + client: cl, + ctx: ctx, + }, nil } func status(w http.ResponseWriter, r *http.Request) {