Skip to content

Commit

Permalink
first commit
Browse files Browse the repository at this point in the history
Signed-off-by: Aleksandr Zimin <alexandr.zimin@flant.com>
  • Loading branch information
AleksZimin committed Aug 16, 2024
1 parent 3b3c8e6 commit c08bede
Show file tree
Hide file tree
Showing 13 changed files with 301 additions and 123 deletions.
43 changes: 43 additions & 0 deletions hooks/generate_scheduler_extender_certs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#!/usr/bin/env python3
#
# Copyright 2023 Flant JSC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.


from lib.hooks.internal_tls import GenerateCertificateHook, TlsSecret, default_sans
from lib.module import values as module_values
from deckhouse import hook
from typing import Callable
import common

def main():
hook = GenerateCertificateHook(
TlsSecret(
cn="sds-local-volume-scheduler-extender",
name="scheduler-extender-https-certs",
sansGenerator=default_sans([
"sds-local-volume-scheduler-extender",
f"sds-local-volume-scheduler-extender.{common.NAMESPACE}",
f"sds-local-volume-scheduler-extender.{common.NAMESPACE}.svc",
f"sds-local-volume-scheduler-extender.{common.NAMESPACE}.svc.cluster.local"]),
values_path_prefix=f"{common.MODULE_NAME}.internal.customSchedulerExtenderCert"
),
cn="sds-local-volume-scheduler-extender",
common_ca=True,
namespace=common.NAMESPACE)

hook.run()

if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ const (
defaultDivisor = 1
defaultListenAddr = ":8000"
defaultCacheSize = 10
defaultcertFile = "/etc/sds-local-volume-scheduler-extender/certs/tls.crt"
defaultkeyFile = "/etc/sds-local-volume-scheduler-extender/certs/tls.key"
)

type Config struct {
Expand All @@ -67,13 +69,17 @@ type Config struct {
LogLevel string `json:"log-level"`
CacheSize int `json:"cache-size"`
HealthProbeBindAddress string `json:"health-probe-bind-address"`
CertFile string `json:"cert-file"`
KeyFile string `json:"key-file"`
}

var config = &Config{
ListenAddr: defaultListenAddr,
DefaultDivisor: defaultDivisor,
LogLevel: "2",
CacheSize: defaultCacheSize,
CertFile: defaultcertFile,
KeyFile: defaultkeyFile,
}

var rootCmd = &cobra.Command{
Expand Down Expand Up @@ -205,7 +211,7 @@ func subMain(parentCtx context.Context) error {
}()

log.Info(fmt.Sprintf("[subMain] starts serving on: %s", config.ListenAddr))
err = serv.ListenAndServe()
err = serv.ListenAndServeTLS(config.CertFile, config.KeyFile)
if !errors.Is(err, http.ErrServerClosed) {
log.Error(err, "[subMain] unable to run the server")
return err
Expand Down
119 changes: 68 additions & 51 deletions images/sds-local-volume-scheduler-extender/src/pkg/scheduler/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package scheduler
import (
"context"
"encoding/json"
"errors"
"fmt"
"net/http"
"sync"
Expand All @@ -38,35 +39,51 @@ import (

func (s *scheduler) filter(w http.ResponseWriter, r *http.Request) {
s.log.Debug("[filter] starts the serving")
var input ExtenderArgs
var inputData ExtenderArgs
reader := http.MaxBytesReader(w, r.Body, 10<<20)
err := json.NewDecoder(reader).Decode(&input)
if err != nil || input.Nodes == nil || input.Pod == nil {
err := json.NewDecoder(reader).Decode(&inputData)
if err != nil {
s.log.Error(err, "[filter] unable to decode a request")
http.Error(w, "bad request", http.StatusBadRequest)
return
}
s.log.Trace(fmt.Sprintf("[filter] input data: %+v", inputData))

if inputData.NodeNames == nil || len(*inputData.NodeNames) == 0 {
s.log.Error(errors.New("no node names in the request"), "[filter] unable to get node names from the request")
http.Error(w, "bad request", http.StatusBadRequest)
return
}
nodeNames := inputData.NodeNames
s.log.Trace(fmt.Sprintf("[filter] node names from the request: %v", nodeNames))

s.log.Debug(fmt.Sprintf("[filter] starts the filtering for Pod %s/%s", input.Pod.Namespace, input.Pod.Name))
if inputData.Pod == nil {
s.log.Error(errors.New("no pod in the request"), "[filter] unable to get a Pod from the request")
http.Error(w, "bad request", http.StatusBadRequest)
return
}
pod := inputData.Pod
s.log.Debug(fmt.Sprintf("[filter] starts the filtering for Pod %s/%s", pod.Namespace, pod.Name))

for _, n := range input.Nodes.Items {
s.log.Trace(fmt.Sprintf("[filter] Pod %s/%s has node %s from the request", input.Pod.Namespace, input.Pod.Name, n.Name))
s.log.Trace(fmt.Sprintf("[filter] Pod from the request: %+v", pod))
for _, nodeName := range *nodeNames {
s.log.Debug(fmt.Sprintf("[filter] Pod %s/%s has node %s from the request", pod.Namespace, pod.Name, nodeName))
}

pvcs, err := getUsedPVC(s.ctx, s.client, s.log, input.Pod)
pvcs, err := getUsedPVC(s.ctx, s.client, s.log, pod)
if err != nil {
s.log.Error(err, fmt.Sprintf("[filter] unable to get used PVC for a Pod %s in the namespace %s", input.Pod.Name, input.Pod.Namespace))
s.log.Error(err, fmt.Sprintf("[filter] unable to get used PVC for a Pod %s in the namespace %s", pod.Name, pod.Namespace))
http.Error(w, "internal server error", http.StatusInternalServerError)
return
}
if len(pvcs) == 0 {
s.log.Error(fmt.Errorf("no PVC was found for pod %s in namespace %s", input.Pod.Name, input.Pod.Namespace), fmt.Sprintf("[filter] unable to get used PVC for Pod %s", input.Pod.Name))
s.log.Error(fmt.Errorf("no PVC was found for pod %s in namespace %s", pod.Name, pod.Namespace), fmt.Sprintf("[filter] unable to get used PVC for Pod %s", pod.Name))
http.Error(w, "bad request", http.StatusBadRequest)
return
}

for _, pvc := range pvcs {
s.log.Trace(fmt.Sprintf("[filter] Pod %s/%s uses PVC: %s", input.Pod.Namespace, input.Pod.Name, pvc.Name))
s.log.Trace(fmt.Sprintf("[filter] Pod %s/%s uses PVC: %s", pod.Namespace, pod.Name, pvc.Name))

// this might happen when the extender-scheduler recovers after failure, populates the cache with PVC-watcher controller and then
// the kube scheduler post a request to schedule the pod with the PVC.
Expand All @@ -85,7 +102,7 @@ func (s *scheduler) filter(w http.ResponseWriter, r *http.Request) {
return
}
for _, sc := range scs {
s.log.Trace(fmt.Sprintf("[filter] Pod %s/%s uses StorageClass: %s", input.Pod.Namespace, input.Pod.Name, sc.Name))
s.log.Trace(fmt.Sprintf("[filter] Pod %s/%s uses StorageClass: %s", pod.Namespace, pod.Name, sc.Name))
}

managedPVCs := filterNotManagedPVC(s.log, pvcs, scs)
Expand All @@ -94,39 +111,39 @@ func (s *scheduler) filter(w http.ResponseWriter, r *http.Request) {
}

if len(managedPVCs) == 0 {
s.log.Warning(fmt.Sprintf("[filter] Pod %s/%s uses PVCs which are not managed by the module. Unable to filter and score the nodes", input.Pod.Namespace, input.Pod.Name))
s.log.Warning(fmt.Sprintf("[filter] Pod %s/%s uses PVCs which are not managed by the module. Unable to filter and score the nodes", pod.Namespace, pod.Name))
return
}

s.log.Debug(fmt.Sprintf("[filter] starts to extract PVC requested sizes for a Pod %s/%s", input.Pod.Namespace, input.Pod.Name))
s.log.Debug(fmt.Sprintf("[filter] starts to extract PVC requested sizes for a Pod %s/%s", pod.Namespace, pod.Name))
pvcRequests, err := extractRequestedSize(s.ctx, s.client, s.log, managedPVCs, scs)
if err != nil {
s.log.Error(err, fmt.Sprintf("[filter] unable to extract request size for a Pod %s/%s", input.Pod.Namespace, input.Pod.Name))
s.log.Error(err, fmt.Sprintf("[filter] unable to extract request size for a Pod %s/%s", pod.Namespace, pod.Name))
http.Error(w, "bad request", http.StatusBadRequest)
return
}
s.log.Debug(fmt.Sprintf("[filter] successfully extracted the PVC requested sizes of a Pod %s/%s", input.Pod.Namespace, input.Pod.Name))
s.log.Debug(fmt.Sprintf("[filter] successfully extracted the PVC requested sizes of a Pod %s/%s", pod.Namespace, pod.Name))

s.log.Debug(fmt.Sprintf("[filter] starts to filter the nodes from the request for a Pod %s/%s", input.Pod.Namespace, input.Pod.Name))
filteredNodes, err := filterNodes(s.log, s.cache, input.Nodes, input.Pod, managedPVCs, scs, pvcRequests)
s.log.Debug(fmt.Sprintf("[filter] starts to filter the nodes from the request for a Pod %s/%s", pod.Namespace, pod.Name))
filteredNodes, err := filterNodes(s.log, s.cache, nodeNames, pod, managedPVCs, scs, pvcRequests)
if err != nil {
s.log.Error(err, "[filter] unable to filter the nodes")
http.Error(w, "bad request", http.StatusBadRequest)
return
}
s.log.Debug(fmt.Sprintf("[filter] successfully filtered the nodes from the request for a Pod %s/%s", input.Pod.Namespace, input.Pod.Name))
s.log.Debug(fmt.Sprintf("[filter] successfully filtered the nodes from the request for a Pod %s/%s", pod.Namespace, pod.Name))

s.log.Debug(fmt.Sprintf("[filter] starts to populate the cache for a Pod %s/%s", input.Pod.Namespace, input.Pod.Name))
s.log.Cache(fmt.Sprintf("[filter] cache before the PVC reservation for a Pod %s/%s", input.Pod.Namespace, input.Pod.Name))
s.log.Debug(fmt.Sprintf("[filter] starts to populate the cache for a Pod %s/%s", pod.Namespace, pod.Name))
s.log.Cache(fmt.Sprintf("[filter] cache before the PVC reservation for a Pod %s/%s", pod.Namespace, pod.Name))
s.cache.PrintTheCacheLog()
err = populateCache(s.log, filteredNodes.Nodes.Items, input.Pod, s.cache, managedPVCs, scs)
err = populateCache(s.log, filteredNodes.NodeNames, pod, s.cache, managedPVCs, scs)
if err != nil {
s.log.Error(err, "[filter] unable to populate cache")
http.Error(w, "bad request", http.StatusBadRequest)
return
}
s.log.Debug(fmt.Sprintf("[filter] successfully populated the cache for a Pod %s/%s", input.Pod.Namespace, input.Pod.Name))
s.log.Cache(fmt.Sprintf("[filter] cache after the PVC reservation for a Pod %s/%s", input.Pod.Namespace, input.Pod.Name))
s.log.Debug(fmt.Sprintf("[filter] successfully populated the cache for a Pod %s/%s", pod.Namespace, pod.Name))
s.log.Cache(fmt.Sprintf("[filter] cache after the PVC reservation for a Pod %s/%s", pod.Namespace, pod.Name))
s.cache.PrintTheCacheLog()

w.Header().Set("content-type", "application/json")
Expand All @@ -136,7 +153,7 @@ func (s *scheduler) filter(w http.ResponseWriter, r *http.Request) {
http.Error(w, "internal error", http.StatusInternalServerError)
return
}
s.log.Debug(fmt.Sprintf("[filter] ends the serving the request for a Pod %s/%s", input.Pod.Namespace, input.Pod.Name))
s.log.Debug(fmt.Sprintf("[filter] ends the serving the request for a Pod %s/%s", pod.Namespace, pod.Name))
}

func filterNotManagedPVC(log logger.Logger, pvcs map[string]*corev1.PersistentVolumeClaim, scs map[string]*v1.StorageClass) map[string]*corev1.PersistentVolumeClaim {
Expand All @@ -154,13 +171,13 @@ func filterNotManagedPVC(log logger.Logger, pvcs map[string]*corev1.PersistentVo
return filteredPVCs
}

func populateCache(log logger.Logger, nodes []corev1.Node, pod *corev1.Pod, schedulerCache *cache.Cache, pvcs map[string]*corev1.PersistentVolumeClaim, scs map[string]*v1.StorageClass) error {
for _, node := range nodes {
func populateCache(log logger.Logger, nodeNames *[]string, pod *corev1.Pod, schedulerCache *cache.Cache, pvcs map[string]*corev1.PersistentVolumeClaim, scs map[string]*v1.StorageClass) error {
for _, nodeName := range *nodeNames {
for _, volume := range pod.Spec.Volumes {
if volume.PersistentVolumeClaim != nil {
log.Debug(fmt.Sprintf("[populateCache] reconcile the PVC %s for Pod %s/%s on node %s", volume.PersistentVolumeClaim.ClaimName, pod.Namespace, pod.Name, node.Name))
lvgNamesForTheNode := schedulerCache.GetLVGNamesByNodeName(node.Name)
log.Trace(fmt.Sprintf("[populateCache] LVMVolumeGroups from cache for the node %s: %v", node.Name, lvgNamesForTheNode))
log.Debug(fmt.Sprintf("[populateCache] reconcile the PVC %s for Pod %s/%s on node %s", volume.PersistentVolumeClaim.ClaimName, pod.Namespace, pod.Name, nodeName))
lvgNamesForTheNode := schedulerCache.GetLVGNamesByNodeName(nodeName)
log.Trace(fmt.Sprintf("[populateCache] LVMVolumeGroups from cache for the node %s: %v", nodeName, lvgNamesForTheNode))
pvc := pvcs[volume.PersistentVolumeClaim.ClaimName]
sc := scs[*pvc.Spec.StorageClassName]

Expand Down Expand Up @@ -265,7 +282,7 @@ func extractRequestedSize(
func filterNodes(
log logger.Logger,
schedulerCache *cache.Cache,
nodes *corev1.NodeList,
nodeNames *[]string,
pod *corev1.Pod,
pvcs map[string]*corev1.PersistentVolumeClaim,
scs map[string]*v1.StorageClass,
Expand All @@ -274,7 +291,7 @@ func filterNodes(
// Param "pvcRequests" is a total amount of the pvcRequests space (both Thick and Thin) for Pod (i.e. from every PVC)
if len(pvcRequests) == 0 {
return &ExtenderFilterResult{
Nodes: nodes,
NodeNames: nodeNames,
}, nil
}

Expand Down Expand Up @@ -347,7 +364,7 @@ func filterNodes(
}

result := &ExtenderFilterResult{
Nodes: &corev1.NodeList{},
NodeNames: &[]string{},
FailedNodes: FailedNodesMap{},
}

Expand All @@ -356,27 +373,27 @@ func filterNodes(
failedNodesMapMtx := &sync.Mutex{}

wg := &sync.WaitGroup{}
wg.Add(len(nodes.Items))
errs := make(chan error, len(nodes.Items)*len(pvcs))
wg.Add(len(*nodeNames))
errs := make(chan error, len(*nodeNames)*len(pvcs))

for i, node := range nodes.Items {
go func(i int, node corev1.Node) {
log.Trace(fmt.Sprintf("[filterNodes] gourutine %d starts the work", i))
for i, nodeName := range *nodeNames {
go func(i int, nodeName string) {
log.Trace(fmt.Sprintf("[filterNodes] gourutine %d starts the work with node %s", i, nodeName))
defer func() {
log.Trace(fmt.Sprintf("[filterNodes] gourutine %d ends the work", i))
log.Trace(fmt.Sprintf("[filterNodes] gourutine %d ends the work with node %s", i, nodeName))
wg.Done()
}()

if _, common := commonNodes[node.Name]; !common {
log.Debug(fmt.Sprintf("[filterNodes] node %s is not common for used Storage Classes", node.Name))
if _, common := commonNodes[nodeName]; !common {
log.Debug(fmt.Sprintf("[filterNodes] node %s is not common for used Storage Classes %+v", nodeName, scs))
failedNodesMapMtx.Lock()
result.FailedNodes[node.Name] = "node is not common for used Storage Classes"
result.FailedNodes[nodeName] = fmt.Sprintf("node %s is not common for used Storage Classes", nodeName)
failedNodesMapMtx.Unlock()
return
}

// we get all LVMVolumeGroups from the node-applicant (which is common for all the PVCs)
lvgsFromNode := commonNodes[node.Name]
lvgsFromNode := commonNodes[nodeName]
hasEnoughSpace := true

// now we iterate all over the PVCs to see if we can place all of them on the node (does the node have enough space)
Expand All @@ -389,11 +406,11 @@ func filterNodes(
// we get the specific LVG which the PVC can use on the node as we support only one specified LVG in the Storage Class on each node
commonLVG := findMatchedLVG(lvgsFromNode, lvgsFromSC)
if commonLVG == nil {
err = fmt.Errorf("unable to match Storage Class's LVMVolumeGroup with the node's one, Storage Class: %s, node: %s", *pvc.Spec.StorageClassName, node.Name)
err = fmt.Errorf("unable to match Storage Class's LVMVolumeGroup with the node's one, Storage Class: %s, node: %s", *pvc.Spec.StorageClassName, nodeName)
errs <- err
return
}
log.Trace(fmt.Sprintf("[scoreNodes] LVMVolumeGroup %s is common for storage class %s and node %s", commonLVG.Name, *pvc.Spec.StorageClassName, node.Name))
log.Trace(fmt.Sprintf("[scoreNodes] LVMVolumeGroup %s is common for storage class %s and node %s", commonLVG.Name, *pvc.Spec.StorageClassName, nodeName))

// see what kind of space does the PVC need
switch pvcReq.DeviceType {
Expand All @@ -417,7 +434,7 @@ func filterNodes(
// we try to find specific ThinPool which the PVC can use in the LVMVolumeGroup
targetThinPool := findMatchedThinPool(lvg.Status.ThinPools, commonLVG.Thin.PoolName)
if targetThinPool == nil {
err = fmt.Errorf("unable to match Storage Class's ThinPools with the node's one, Storage Class: %s; node: %s; lvg Thin pools: %+v; Thin.poolName from StorageClass: %s", *pvc.Spec.StorageClassName, node.Name, lvg.Status.ThinPools, commonLVG.Thin.PoolName)
err = fmt.Errorf("unable to match Storage Class's ThinPools with the node's one, Storage Class: %s; node: %s; lvg Thin pools: %+v; Thin.poolName from StorageClass: %s", *pvc.Spec.StorageClassName, nodeName, lvg.Status.ThinPools, commonLVG.Thin.PoolName)
errs <- err
return
}
Expand Down Expand Up @@ -446,13 +463,13 @@ func filterNodes(

if !hasEnoughSpace {
failedNodesMapMtx.Lock()
result.FailedNodes[node.Name] = "not enough space"
result.FailedNodes[nodeName] = "not enough space"
failedNodesMapMtx.Unlock()
return
}

result.Nodes.Items = append(result.Nodes.Items, node)
}(i, node)
*result.NodeNames = append(*result.NodeNames, nodeName)
}(i, nodeName)
}
wg.Wait()
log.Debug("[filterNodes] goroutines work is done")
Expand All @@ -467,8 +484,8 @@ func filterNodes(
return nil, err
}

for _, node := range result.Nodes.Items {
log.Trace(fmt.Sprintf("[filterNodes] for a Pod %s/%s there is a suitable node: %s", pod.Namespace, pod.Name, node.Name))
for _, nodeName := range *result.NodeNames {
log.Trace(fmt.Sprintf("[filterNodes] for a Pod %s/%s there is a suitable node: %s", pod.Namespace, pod.Name, nodeName))
}

for node, reason := range result.FailedNodes {
Expand Down Expand Up @@ -659,7 +676,7 @@ func getStorageClassesUsedByPVCs(ctx context.Context, cl client.Client, pvcs map
result := make(map[string]*v1.StorageClass, len(pvcs))
for _, pvc := range pvcs {
if pvc.Spec.StorageClassName == nil {
err = fmt.Errorf("not StorageClass specified for PVC %s", pvc.Name)
err = fmt.Errorf("no StorageClass specified for PVC %s", pvc.Name)
return nil, err
}

Expand Down
Loading

0 comments on commit c08bede

Please sign in to comment.