From de4cf45275ae5bf5bd39d20041307e6d1f885182 Mon Sep 17 00:00:00 2001 From: Viktor Kram <92625690+ViktorKram@users.noreply.github.com> Date: Fri, 12 Apr 2024 11:57:34 +0300 Subject: [PATCH] [controller] Add cache for extender-scheduler (#23) Signed-off-by: Viktor Kramarenko --- .../sds-local-volume-controller/cmd/main.go | 2 - .../Dockerfile | 9 +- .../cmd/cmd/root.go | 68 +- .../go.mod | 4 +- .../go.sum | 8 +- .../pkg/cache/cache.go | 461 ++++++++++++++ .../pkg/cache/cache_test.go | 589 ++++++++++++++++++ .../pkg/controller/lvg_watcher_cache.go | 178 ++++++ .../pkg/controller/pvc_watcher_cache.go | 190 ++++++ .../pkg/logger/logger.go | 6 + .../pkg/scheduler/filter.go | 291 ++++++--- .../pkg/scheduler/filter_test.go | 75 +++ .../pkg/scheduler/prioritize.go | 85 +-- .../pkg/scheduler/route.go | 125 +++- images/webhooks/src/go.mod | 2 +- .../rbac-for-us.yaml | 5 +- 16 files changed, 1950 insertions(+), 148 deletions(-) create mode 100644 images/sds-local-volume-scheduler-extender/pkg/cache/cache.go create mode 100644 images/sds-local-volume-scheduler-extender/pkg/cache/cache_test.go create mode 100644 images/sds-local-volume-scheduler-extender/pkg/controller/lvg_watcher_cache.go create mode 100644 images/sds-local-volume-scheduler-extender/pkg/controller/pvc_watcher_cache.go create mode 100644 images/sds-local-volume-scheduler-extender/pkg/scheduler/filter_test.go diff --git a/images/sds-local-volume-controller/cmd/main.go b/images/sds-local-volume-controller/cmd/main.go index 5fbc6b97..0204fc55 100644 --- a/images/sds-local-volume-controller/cmd/main.go +++ b/images/sds-local-volume-controller/cmd/main.go @@ -118,6 +118,4 @@ func main() { log.Error(err, "[main] unable to mgr.Start") os.Exit(1) } - - log.Info("[main] successfully starts the manager") } diff --git a/images/sds-local-volume-scheduler-extender/Dockerfile b/images/sds-local-volume-scheduler-extender/Dockerfile index 256a9c3f..12ae60a2 100644 --- a/images/sds-local-volume-scheduler-extender/Dockerfile +++ b/images/sds-local-volume-scheduler-extender/Dockerfile @@ -1,7 +1,7 @@ -ARG BASE_GOLANG_20_ALPINE=registry.deckhouse.io/base_images/golang:1.20.4-alpine3.18@sha256:5f403dd08db2f0b40d4416e29d3080eec41cd6cf53a05d5e4bcece3a5c7a8ce6 -ARG BASE_GOLANG_20_ALPINE_BUILDER=$BASE_GOLANG_20_ALPINE +ARG BASE_GOLANG_22_ALPINE=registry.deckhouse.io/base_images/golang:1.22.1-alpine@sha256:0de6cf7cceab6ecbf0718bdfb675b08b78113c3709c5e4b99456cdb2ae8c2495 +ARG BASE_GOLANG_22_ALPINE_BUILDER=$BASE_GOLANG_22_ALPINE -FROM $BASE_GOLANG_20_ALPINE_BUILDER as builder +FROM $BASE_GOLANG_22_ALPINE_BUILDER as builder WORKDIR /go/src @@ -15,7 +15,8 @@ COPY . . WORKDIR /go/src/cmd RUN GOOS=linux GOARCH=amd64 go build -o sds-local-volume-scheduler-extender -FROM --platform=linux/amd64 $BASE_GOLANG_20_ALPINE +FROM --platform=linux/amd64 $BASE_GOLANG_22_ALPINE COPY --from=builder /go/src/cmd/sds-local-volume-scheduler-extender /go/src/cmd/sds-local-volume-scheduler-extender +RUN apk add curl ENTRYPOINT ["/go/src/cmd/sds-local-volume-scheduler-extender"] diff --git a/images/sds-local-volume-scheduler-extender/cmd/cmd/root.go b/images/sds-local-volume-scheduler-extender/cmd/cmd/root.go index d7c9e1a2..cb872491 100644 --- a/images/sds-local-volume-scheduler-extender/cmd/cmd/root.go +++ b/images/sds-local-volume-scheduler-extender/cmd/cmd/root.go @@ -24,21 +24,23 @@ import ( "os" "os/signal" "sds-local-volume-scheduler-extender/api/v1alpha1" + "sds-local-volume-scheduler-extender/pkg/cache" + "sds-local-volume-scheduler-extender/pkg/controller" "sds-local-volume-scheduler-extender/pkg/kubutils" "sds-local-volume-scheduler-extender/pkg/logger" "sds-local-volume-scheduler-extender/pkg/scheduler" + "sigs.k8s.io/controller-runtime/pkg/healthz" + "sigs.k8s.io/controller-runtime/pkg/manager" "sync" "syscall" "time" + "github.com/spf13/cobra" v1 "k8s.io/api/core/v1" sv1 "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/runtime" - ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" - - "github.com/spf13/cobra" apiruntime "k8s.io/apimachinery/pkg/runtime" + ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/yaml" ) @@ -53,18 +55,21 @@ var resourcesSchemeFuncs = []func(*apiruntime.Scheme) error{ const ( defaultDivisor = 1 defaultListenAddr = ":8000" + defaultCacheSize = 10 ) type Config struct { ListenAddr string `json:"listen"` DefaultDivisor float64 `json:"default-divisor"` LogLevel string `json:"log-level"` + CacheSize int `json:"cache-size"` } var config = &Config{ ListenAddr: defaultListenAddr, DefaultDivisor: defaultDivisor, LogLevel: "2", + CacheSize: defaultCacheSize, } var rootCmd = &cobra.Command{ @@ -122,22 +127,56 @@ func subMain(parentCtx context.Context) error { } log.Info("[subMain] successfully read scheme CR") - cl, err := client.New(kConfig, client.Options{ - Scheme: scheme, - WarningHandler: client.WarningHandlerOptions{}, - }) + managerOpts := manager.Options{ + Scheme: scheme, + Logger: log.GetLogger(), + } + + mgr, err := manager.New(kConfig, managerOpts) + if err != nil { + return err + } + + schedulerCache := cache.NewCache(*log) + log.Info("[subMain] scheduler cache was initialized") - h, err := scheduler.NewHandler(ctx, cl, *log, config.DefaultDivisor) + h, err := scheduler.NewHandler(ctx, mgr.GetClient(), *log, schedulerCache, config.DefaultDivisor) if err != nil { return err } log.Info("[subMain] scheduler handler initialized") + _, err = controller.RunLVGWatcherCacheController(mgr, *log, schedulerCache) + if err != nil { + log.Error(err, fmt.Sprintf("[subMain] unable to run %s controller", controller.LVGWatcherCacheCtrlName)) + } + log.Info(fmt.Sprintf("[subMain] successfully ran %s controller", controller.LVGWatcherCacheCtrlName)) + + err = controller.RunPVCWatcherCacheController(mgr, *log, schedulerCache) + if err != nil { + log.Error(err, fmt.Sprintf("[subMain] unable to run %s controller", controller.PVCWatcherCacheCtrlName)) + } + log.Info(fmt.Sprintf("[subMain] successfully ran %s controller", controller.PVCWatcherCacheCtrlName)) + + if err = mgr.AddHealthzCheck("healthz", healthz.Ping); err != nil { + log.Error(err, "[subMain] unable to mgr.AddHealthzCheck") + os.Exit(1) + } + log.Info("[subMain] successfully AddHealthzCheck") + + if err = mgr.AddReadyzCheck("readyz", healthz.Ping); err != nil { + log.Error(err, "[subMain] unable to mgr.AddReadyzCheck") + os.Exit(1) + } + log.Info("[subMain] successfully AddReadyzCheck") + serv := &http.Server{ Addr: config.ListenAddr, Handler: accessLogHandler(parentCtx, h), ReadTimeout: 30 * time.Second, } + log.Info("[subMain] server was initialized") + var wg sync.WaitGroup defer wg.Wait() ctx, stop := signal.NotifyContext(parentCtx, os.Interrupt, syscall.SIGTERM) @@ -151,11 +190,22 @@ func subMain(parentCtx context.Context) error { } }() + go func() { + log.Info("[subMain] kube manager will start now") + err = mgr.Start(ctx) + if err != nil { + log.Error(err, "[subMain] unable to mgr.Start") + os.Exit(1) + } + }() + log.Info(fmt.Sprintf("[subMain] starts serving on: %s", config.ListenAddr)) err = serv.ListenAndServe() if !errors.Is(err, http.ErrServerClosed) { + log.Error(err, "[subMain] unable to run the server") return err } + return nil } diff --git a/images/sds-local-volume-scheduler-extender/go.mod b/images/sds-local-volume-scheduler-extender/go.mod index ec7b3c81..29a8c643 100644 --- a/images/sds-local-volume-scheduler-extender/go.mod +++ b/images/sds-local-volume-scheduler-extender/go.mod @@ -13,7 +13,8 @@ require ( k8s.io/apimachinery v0.29.1 k8s.io/client-go v0.29.0 k8s.io/klog/v2 v2.110.1 - sigs.k8s.io/controller-runtime v0.17.0 + k8s.io/utils v0.0.0-20240310230437-4693a0247e57 + sigs.k8s.io/controller-runtime v0.17.2 sigs.k8s.io/yaml v1.4.0 ) @@ -66,7 +67,6 @@ require ( k8s.io/apiextensions-apiserver v0.29.0 // indirect k8s.io/component-base v0.29.0 // indirect k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 // indirect - k8s.io/utils v0.0.0-20230726121419-3b25d923346b // indirect sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect ) diff --git a/images/sds-local-volume-scheduler-extender/go.sum b/images/sds-local-volume-scheduler-extender/go.sum index 51b1d86c..a22441c0 100644 --- a/images/sds-local-volume-scheduler-extender/go.sum +++ b/images/sds-local-volume-scheduler-extender/go.sum @@ -194,10 +194,10 @@ k8s.io/klog/v2 v2.110.1 h1:U/Af64HJf7FcwMcXyKm2RPM22WZzyR7OSpYj5tg3cL0= k8s.io/klog/v2 v2.110.1/go.mod h1:YGtd1984u+GgbuZ7e08/yBuAfKLSO0+uR1Fhi6ExXjo= k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 h1:aVUu9fTY98ivBPKR9Y5w/AuzbMm96cd3YHRTU83I780= k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00/go.mod h1:AsvuZPBlUDVuCdzJ87iajxtXuR9oktsTctW/R9wwouA= -k8s.io/utils v0.0.0-20230726121419-3b25d923346b h1:sgn3ZU783SCgtaSJjpcVVlRqd6GSnlTLKgpAAttJvpI= -k8s.io/utils v0.0.0-20230726121419-3b25d923346b/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= -sigs.k8s.io/controller-runtime v0.17.0 h1:fjJQf8Ukya+VjogLO6/bNX9HE6Y2xpsO5+fyS26ur/s= -sigs.k8s.io/controller-runtime v0.17.0/go.mod h1:+MngTvIQQQhfXtwfdGw/UOQ/aIaqsYywfCINOtwMO/s= +k8s.io/utils v0.0.0-20240310230437-4693a0247e57 h1:gbqbevonBh57eILzModw6mrkbwM0gQBEuevE/AaBsHY= +k8s.io/utils v0.0.0-20240310230437-4693a0247e57/go.mod h1:OLgZIPagt7ERELqWJFomSt595RzquPNLL48iOWgYOg0= +sigs.k8s.io/controller-runtime v0.17.2 h1:FwHwD1CTUemg0pW2otk7/U5/i5m2ymzvOXdbeGOUvw0= +sigs.k8s.io/controller-runtime v0.17.2/go.mod h1:+MngTvIQQQhfXtwfdGw/UOQ/aIaqsYywfCINOtwMO/s= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd h1:EDPBXCAspyGV4jQlpZSudPeMmr1bNJefnuqLsRAsHZo= sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd/go.mod h1:B8JuhiUyNFVKdsE8h686QcCxMaH6HrOAZj4vswFpcB0= sigs.k8s.io/structured-merge-diff/v4 v4.4.1 h1:150L+0vs/8DA78h1u02ooW1/fFq/Lwr+sGiqlzvrtq4= diff --git a/images/sds-local-volume-scheduler-extender/pkg/cache/cache.go b/images/sds-local-volume-scheduler-extender/pkg/cache/cache.go new file mode 100644 index 00000000..ecabb60c --- /dev/null +++ b/images/sds-local-volume-scheduler-extender/pkg/cache/cache.go @@ -0,0 +1,461 @@ +package cache + +import ( + "fmt" + v1 "k8s.io/api/core/v1" + slices2 "k8s.io/utils/strings/slices" + "sds-local-volume-scheduler-extender/api/v1alpha1" + "sds-local-volume-scheduler-extender/pkg/logger" + "sync" +) + +const ( + pvcPerLVGCount = 150 + lvgsPerPVCCount = 5 + lvgsPerNodeCount = 5 + SelectedNodeAnnotation = "volume.kubernetes.io/selected-node" +) + +type Cache struct { + lvgs sync.Map //map[string]*lvgCache + pvcLVGs sync.Map //map[string][]string + nodeLVGs sync.Map //map[string][]string + log logger.Logger +} + +type lvgCache struct { + lvg *v1alpha1.LvmVolumeGroup + pvcs sync.Map //map[string]*pvcCache +} + +type pvcCache struct { + pvc *v1.PersistentVolumeClaim + selectedNode string +} + +// NewCache initialize new cache. +func NewCache(logger logger.Logger) *Cache { + return &Cache{ + log: logger, + } +} + +// AddLVG adds selected LVMVolumeGroup resource to the cache. If it is already stored, does nothing. +func (c *Cache) AddLVG(lvg *v1alpha1.LvmVolumeGroup) { + _, loaded := c.lvgs.LoadOrStore(lvg.Name, &lvgCache{ + lvg: lvg, + pvcs: sync.Map{}, + }) + if loaded { + c.log.Debug(fmt.Sprintf("[AddLVG] the LVMVolumeGroup %s has been already added to the cache", lvg.Name)) + return + } + + for _, node := range lvg.Status.Nodes { + lvgsOnTheNode, _ := c.nodeLVGs.Load(node.Name) + if lvgsOnTheNode == nil { + lvgsOnTheNode = make([]string, 0, lvgsPerNodeCount) + } + + lvgsOnTheNode = append(lvgsOnTheNode.([]string), lvg.Name) + c.nodeLVGs.Store(node.Name, lvgsOnTheNode) + } +} + +// UpdateLVG updated selected LVMVolumeGroup resource in the cache. If such LVMVolumeGroup is not stored, returns an error. +func (c *Cache) UpdateLVG(lvg *v1alpha1.LvmVolumeGroup) error { + if cache, found := c.lvgs.Load(lvg.Name); found { + cache.(*lvgCache).lvg = lvg + return nil + } + + return fmt.Errorf("the LVMVolumeGroup %s was not found in the cache", lvg.Name) +} + +// TryGetLVG returns selected LVMVolumeGroup resource if it is stored in the cache, otherwise returns nil. +func (c *Cache) TryGetLVG(name string) *v1alpha1.LvmVolumeGroup { + lvgCh, found := c.lvgs.Load(name) + if !found { + c.log.Debug(fmt.Sprintf("[TryGetLVG] the LVMVolumeGroup %s was not found in the cache. Return nil", name)) + return nil + } + + return lvgCh.(*lvgCache).lvg +} + +// GetLVGNamesByNodeName returns LVMVolumeGroups resources names stored in the cache for the selected node. If none of them exist, returns empty slice. +func (c *Cache) GetLVGNamesByNodeName(nodeName string) []string { + lvgs, found := c.nodeLVGs.Load(nodeName) + if !found { + c.log.Debug(fmt.Sprintf("[GetLVGNamesByNodeName] no LVMVolumeGroup was found in the cache for the node %s. Return empty slice", nodeName)) + return []string{} + } + + return lvgs.([]string) +} + +// GetAllLVG returns all the LVMVolumeGroups resources stored in the cache. +func (c *Cache) GetAllLVG() map[string]*v1alpha1.LvmVolumeGroup { + lvgs := make(map[string]*v1alpha1.LvmVolumeGroup) + c.lvgs.Range(func(lvgName, lvgCh any) bool { + if lvgCh.(*lvgCache).lvg == nil { + c.log.Error(fmt.Errorf("LVMVolumeGroup %s is not initialized", lvgName), fmt.Sprintf("[GetAllLVG] an error occurs while iterating the LVMVolumeGroups")) + return true + } + + lvgs[lvgName.(string)] = lvgCh.(*lvgCache).lvg + return true + }) + + return lvgs +} + +// GetLVGReservedSpace returns a sum of reserved space by every PVC in the selected LVMVolumeGroup resource. If such LVMVolumeGroup resource is not stored, returns an error. +func (c *Cache) GetLVGReservedSpace(lvgName string) (int64, error) { + lvg, found := c.lvgs.Load(lvgName) + if !found { + c.log.Debug(fmt.Sprintf("[GetLVGReservedSpace] the LVMVolumeGroup %s was not found in the cache. Returns 0", lvgName)) + return 0, nil + } + + var space int64 + lvg.(*lvgCache).pvcs.Range(func(pvcName, pvcCh any) bool { + space += pvcCh.(*pvcCache).pvc.Spec.Resources.Requests.Storage().Value() + return true + }) + + return space, nil +} + +// DeleteLVG deletes selected LVMVolumeGroup resource from the cache. +func (c *Cache) DeleteLVG(lvgName string) { + c.lvgs.Delete(lvgName) + + c.nodeLVGs.Range(func(nodeName, lvgNames any) bool { + for i, lvg := range lvgNames.([]string) { + if lvg == lvgName { + lvgNames = append(lvgNames.([]string)[:i], lvgNames.([]string)[i+1:]...) + } + } + + return true + }) + + c.pvcLVGs.Range(func(pvcName, lvgNames any) bool { + for i, lvg := range lvgNames.([]string) { + if lvg == lvgName { + lvgNames = append(lvgNames.([]string)[:i], lvgNames.([]string)[i+1:]...) + } + } + + return true + }) +} + +// AddPVC adds selected PVC to selected LVMVolumeGroup resource. If the LVMVolumeGroup resource is not stored, returns an error. +// If selected PVC is already stored in the cache, does nothing. +func (c *Cache) AddPVC(lvgName string, pvc *v1.PersistentVolumeClaim) error { + if pvc.Status.Phase == v1.ClaimBound { + c.log.Warning(fmt.Sprintf("[AddPVC] PVC %s/%s has status phase BOUND. It will not be added to the cache", pvc.Namespace, pvc.Name)) + return nil + } + + pvcKey := configurePVCKey(pvc) + + lvgCh, found := c.lvgs.Load(lvgName) + if !found { + err := fmt.Errorf("the LVMVolumeGroup %s was not found in the cache", lvgName) + c.log.Error(err, fmt.Sprintf("[AddPVC] an error occured while trying to add PVC %s to the cache", pvcKey)) + return err + } + + // this case might be triggered if the extender recovers after fail and finds some pending pvcs with selected nodes + c.log.Trace(fmt.Sprintf("[AddPVC] PVC %s/%s annotations: %v", pvc.Namespace, pvc.Name, pvc.Annotations)) + if pvc.Annotations[SelectedNodeAnnotation] != "" { + c.log.Debug(fmt.Sprintf("[AddPVC] PVC %s/%s has selected node anotation, selected node: %s", pvc.Namespace, pvc.Name, pvc.Annotations[SelectedNodeAnnotation])) + + lvgsOnTheNode, found := c.nodeLVGs.Load(pvc.Annotations[SelectedNodeAnnotation]) + if !found { + err := fmt.Errorf("no LVMVolumeGroups found for the node %s", pvc.Annotations[SelectedNodeAnnotation]) + c.log.Error(err, fmt.Sprintf("[AddPVC] an error occured while trying to add PVC %s to the cache", pvcKey)) + return err + } + + if !slices2.Contains(lvgsOnTheNode.([]string), lvgName) { + c.log.Debug(fmt.Sprintf("[AddPVC] LVMVolumeGroup %s does not belong to PVC %s/%s selected node %s. It will be skipped", lvgName, pvc.Namespace, pvc.Name, pvc.Annotations[SelectedNodeAnnotation])) + return nil + } + + c.log.Debug(fmt.Sprintf("[AddPVC] LVMVolumeGroup %s belongs to PVC %s/%s selected node %s", lvgName, pvc.Namespace, pvc.Name, pvc.Annotations[SelectedNodeAnnotation])) + + _, found = lvgCh.(*lvgCache).pvcs.Load(pvcKey) + if found { + c.log.Warning(fmt.Sprintf("[AddPVC] PVC %s cache has been already added to the LVMVolumeGroup %s", pvcKey, lvgName)) + return nil + } + } + + c.log.Debug(fmt.Sprintf("[AddPVC] new PVC %s cache will be added to the LVMVolumeGroup %s", pvcKey, lvgName)) + c.addNewPVC(lvgCh.(*lvgCache), pvc) + + return nil +} + +func (c *Cache) addNewPVC(lvgCh *lvgCache, pvc *v1.PersistentVolumeClaim) { + pvcKey := configurePVCKey(pvc) + lvgCh.pvcs.Store(pvcKey, &pvcCache{pvc: pvc, selectedNode: pvc.Annotations[SelectedNodeAnnotation]}) + + lvgsForPVC, found := c.pvcLVGs.Load(pvcKey) + if !found || lvgsForPVC == nil { + lvgsForPVC = make([]string, 0, lvgsPerPVCCount) + } + + c.log.Trace(fmt.Sprintf("[addNewPVC] LVMVolumeGroups from the cache for PVC %s before append: %v", pvcKey, lvgsForPVC)) + lvgsForPVC = append(lvgsForPVC.([]string), lvgCh.lvg.Name) + c.log.Trace(fmt.Sprintf("[addNewPVC] LVMVolumeGroups from the cache for PVC %s after append: %v", pvcKey, lvgsForPVC)) + c.pvcLVGs.Store(pvcKey, lvgsForPVC) +} + +// UpdatePVC updates selected PVC in selected LVMVolumeGroup resource. If no such PVC is stored in the cache, adds it. +func (c *Cache) UpdatePVC(lvgName string, pvc *v1.PersistentVolumeClaim) error { + pvcKey := configurePVCKey(pvc) + + lvgCh, found := c.lvgs.Load(lvgName) + if !found { + return fmt.Errorf("the LVMVolumeGroup %s was not found in the cache", lvgName) + } + + pvcCh, found := lvgCh.(*lvgCache).pvcs.Load(pvcKey) + if !found { + c.log.Warning(fmt.Sprintf("[UpdatePVC] PVC %s was not found in the cache for the LVMVolumeGroup %s. It will be added", pvcKey, lvgName)) + err := c.AddPVC(lvgName, pvc) + if err != nil { + c.log.Error(err, fmt.Sprintf("[UpdatePVC] an error occurred while trying to update the PVC %s", pvcKey)) + return err + } + return nil + } + + pvcCh.(*pvcCache).pvc = pvc + pvcCh.(*pvcCache).selectedNode = pvc.Annotations[SelectedNodeAnnotation] + c.log.Debug(fmt.Sprintf("[UpdatePVC] successfully updated PVC %s with selected node %s in the cache for LVMVolumeGroup %s", pvcKey, pvc.Annotations[SelectedNodeAnnotation], lvgName)) + + return nil +} + +// GetAllPVCForLVG returns slice of PVC belonging to selected LVMVolumeGroup resource. If such LVMVolumeGroup is not stored in the cache, returns an error. +func (c *Cache) GetAllPVCForLVG(lvgName string) ([]*v1.PersistentVolumeClaim, error) { + lvgCh, found := c.lvgs.Load(lvgName) + if !found { + err := fmt.Errorf("cache was not found for the LVMVolumeGroup %s", lvgName) + c.log.Error(err, fmt.Sprintf("[GetAllPVCForLVG] an error occured while trying to get all PVC for the LVMVolumeGroup %s", lvgName)) + return nil, err + } + + result := make([]*v1.PersistentVolumeClaim, 0, pvcPerLVGCount) + lvgCh.(*lvgCache).pvcs.Range(func(pvcName, pvcCh any) bool { + result = append(result, pvcCh.(*pvcCache).pvc) + return true + }) + + return result, nil +} + +// GetLVGNamesForPVC returns a slice of LVMVolumeGroup resources names, where selected PVC has been stored in. If no such LVMVolumeGroup found, returns empty slice. +func (c *Cache) GetLVGNamesForPVC(pvc *v1.PersistentVolumeClaim) []string { + pvcKey := configurePVCKey(pvc) + lvgNames, found := c.pvcLVGs.Load(pvcKey) + if !found { + c.log.Warning(fmt.Sprintf("[GetLVGNamesForPVC] no cached LVMVolumeGroups were found for PVC %s", pvcKey)) + return nil + } + + return lvgNames.([]string) +} + +// RemoveBoundedPVCSpaceReservation removes selected bounded PVC space reservation from a target LVMVolumeGroup resource. If no such LVMVolumeGroup found or PVC +// is not in a Status Bound, returns an error. +func (c *Cache) RemoveBoundedPVCSpaceReservation(lvgName string, pvc *v1.PersistentVolumeClaim) error { + if pvc.Status.Phase != v1.ClaimBound { + return fmt.Errorf("PVC %s/%s not in a Status.Phase Bound", pvc.Namespace, pvc.Name) + } + + pvcKey := configurePVCKey(pvc) + lvgCh, found := c.lvgs.Load(lvgName) + if !found { + err := fmt.Errorf("LVMVolumeGroup %s was not found in the cache", lvgName) + c.log.Error(err, fmt.Sprintf("[RemoveBoundedPVCSpaceReservation] an error occured while trying to remove space reservation for PVC %s in the LVMVolumeGroup %s", pvcKey, lvgName)) + return err + } + + pvcCh, found := lvgCh.(*lvgCache).pvcs.Load(pvcKey) + if !found || pvcCh == nil { + err := fmt.Errorf("cache for PVC %s was not found", pvcKey) + c.log.Error(err, fmt.Sprintf("[RemoveBoundedPVCSpaceReservation] an error occured while trying to remove space reservation for PVC %s in the LVMVolumeGroup %s", pvcKey, lvgName)) + return err + } + + lvgCh.(*lvgCache).pvcs.Delete(pvcKey) + c.pvcLVGs.Delete(pvcKey) + + return nil +} + +// CheckIsPVCStored checks if selected PVC has been already stored in the cache. +func (c *Cache) CheckIsPVCStored(pvc *v1.PersistentVolumeClaim) bool { + pvcKey := configurePVCKey(pvc) + if _, found := c.pvcLVGs.Load(pvcKey); found { + return true + } + + return false +} + +// RemoveSpaceReservationForPVCWithSelectedNode removes space reservation for selected PVC for every LVMVolumeGroup resource, which is not bound to the PVC selected node. +func (c *Cache) RemoveSpaceReservationForPVCWithSelectedNode(pvc *v1.PersistentVolumeClaim) error { + pvcKey := configurePVCKey(pvc) + selectedLVGName := "" + + lvgNamesForPVC, found := c.pvcLVGs.Load(pvcKey) + if !found { + c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] cache for PVC %s has been already removed", pvcKey)) + return nil + } + + for _, lvgName := range lvgNamesForPVC.([]string) { + lvgCh, found := c.lvgs.Load(lvgName) + if !found || lvgCh == nil { + err := fmt.Errorf("no cache found for the LVMVolumeGroup %s", lvgName) + c.log.Error(err, fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] an error occured while trying to remove space reservation for PVC %s", pvcKey)) + return err + } + + pvcCh, found := lvgCh.(*lvgCache).pvcs.Load(pvcKey) + if !found { + c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] PVC %s space reservation in the LVMVolumeGroup %s has been already removed", pvcKey, lvgName)) + continue + } + + selectedNode := pvcCh.(*pvcCache).selectedNode + if selectedNode == "" { + lvgCh.(*lvgCache).pvcs.Delete(pvcKey) + c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] removed space reservation for PVC %s in the LVMVolumeGroup %s due the PVC got selected to the node %s", pvcKey, lvgName, pvc.Annotations[SelectedNodeAnnotation])) + } else { + selectedLVGName = lvgName + c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] PVC %s got selected to the node %s. It should not be revomed from the LVMVolumeGroup %s", pvcKey, pvc.Annotations[SelectedNodeAnnotation], lvgName)) + } + } + c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] PVC %s space reservation has been removed from LVMVolumeGroup cache", pvcKey)) + + c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] cache for PVC %s will be wiped from unused LVMVolumeGroups", pvcKey)) + cleared := make([]string, 0, len(lvgNamesForPVC.([]string))) + for _, lvgName := range lvgNamesForPVC.([]string) { + if lvgName == selectedLVGName { + c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] the LVMVolumeGroup %s will be saved for PVC %s cache as used", lvgName, pvcKey)) + cleared = append(cleared, lvgName) + } else { + c.log.Debug(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] the LVMVolumeGroup %s will be removed from PVC %s cache as not used", lvgName, pvcKey)) + } + } + c.log.Trace(fmt.Sprintf("[RemoveSpaceReservationForPVCWithSelectedNode] cleared LVMVolumeGroups for PVC %s: %v", pvcKey, cleared)) + c.pvcLVGs.Store(pvcKey, cleared) + + return nil +} + +// RemovePVCFromTheCache completely removes selected PVC in the cache. +func (c *Cache) RemovePVCFromTheCache(pvc *v1.PersistentVolumeClaim) { + targetPvcKey := configurePVCKey(pvc) + + c.log.Debug(fmt.Sprintf("[RemovePVCFromTheCache] run full cache wipe for PVC %s", targetPvcKey)) + c.pvcLVGs.Range(func(pvcKey, lvgArray any) bool { + if pvcKey == targetPvcKey { + for _, lvgName := range lvgArray.([]string) { + lvgCh, found := c.lvgs.Load(lvgName) + if found { + lvgCh.(*lvgCache).pvcs.Delete(pvcKey.(string)) + } + } + } + + return true + }) + + c.pvcLVGs.Delete(targetPvcKey) +} + +// FindLVGForPVCBySelectedNode finds a suitable LVMVolumeGroup resource's name for selected PVC based on selected node. If no such LVMVolumeGroup found, returns empty string. +func (c *Cache) FindLVGForPVCBySelectedNode(pvc *v1.PersistentVolumeClaim, nodeName string) string { + pvcKey := configurePVCKey(pvc) + + lvgsForPVC, found := c.pvcLVGs.Load(pvcKey) + if !found { + c.log.Debug(fmt.Sprintf("[FindLVGForPVCBySelectedNode] no LVMVolumeGroups were found in the cache for PVC %s. Returns empty string", pvcKey)) + return "" + } + + lvgsOnTheNode, found := c.nodeLVGs.Load(nodeName) + if !found { + c.log.Debug(fmt.Sprintf("[FindLVGForPVCBySelectedNode] no LVMVolumeGroups were found in the cache for the node %s. Returns empty string", nodeName)) + return "" + } + + var targetLVG string + for _, lvgName := range lvgsForPVC.([]string) { + if slices2.Contains(lvgsOnTheNode.([]string), lvgName) { + targetLVG = lvgName + } + } + + if targetLVG == "" { + c.log.Debug(fmt.Sprintf("[FindLVGForPVCBySelectedNode] no LVMVolumeGroup was found for PVC %s. Returns empty string", pvcKey)) + } + + return targetLVG +} + +// PrintTheCacheLog prints the logs with cache state. +func (c *Cache) PrintTheCacheLog() { + c.log.Cache("*******************CACHE BEGIN*******************") + c.log.Cache("[LVMVolumeGroups BEGIN]") + c.lvgs.Range(func(lvgName, lvgCh any) bool { + c.log.Cache(fmt.Sprintf("[%s]", lvgName)) + + lvgCh.(*lvgCache).pvcs.Range(func(pvcName, pvcCh any) bool { + c.log.Cache(fmt.Sprintf(" PVC %s, selected node: %s", pvcName, pvcCh.(*pvcCache).selectedNode)) + return true + }) + + return true + }) + + c.log.Cache("[LVMVolumeGroups ENDS]") + c.log.Cache("[PVC and LVG BEGINS]") + c.pvcLVGs.Range(func(pvcName, lvgs any) bool { + c.log.Cache(fmt.Sprintf("[PVC: %s]", pvcName)) + + for _, lvgName := range lvgs.([]string) { + c.log.Cache(fmt.Sprintf(" LVMVolumeGroup: %s", lvgName)) + } + + return true + }) + + c.log.Cache("[PVC and LVG ENDS]") + c.log.Cache("[Node and LVG BEGINS]") + c.nodeLVGs.Range(func(nodeName, lvgs any) bool { + c.log.Cache(fmt.Sprintf("[Node: %s]", nodeName)) + + for _, lvgName := range lvgs.([]string) { + c.log.Cache(fmt.Sprintf(" LVMVolumeGroup name: %s", lvgName)) + } + + return true + }) + c.log.Cache("[Node and LVG ENDS]") + c.log.Cache("*******************CACHE END*******************") +} + +func configurePVCKey(pvc *v1.PersistentVolumeClaim) string { + return fmt.Sprintf("%s/%s", pvc.Namespace, pvc.Name) +} diff --git a/images/sds-local-volume-scheduler-extender/pkg/cache/cache_test.go b/images/sds-local-volume-scheduler-extender/pkg/cache/cache_test.go new file mode 100644 index 00000000..dc28272b --- /dev/null +++ b/images/sds-local-volume-scheduler-extender/pkg/cache/cache_test.go @@ -0,0 +1,589 @@ +package cache + +import ( + "fmt" + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sds-local-volume-scheduler-extender/api/v1alpha1" + "sds-local-volume-scheduler-extender/pkg/logger" + "testing" +) + +func BenchmarkCache_DeleteLVG(b *testing.B) { + cache := NewCache(logger.Logger{}) + lvg := &v1alpha1.LvmVolumeGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: "first", + }, + } + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + cache.AddLVG(lvg) + if _, found := cache.lvgs.Load(lvg.Name); found { + //b.Log("lvg found, delete it") + cache.DeleteLVG(lvg.Name) + } + } + }) +} + +func BenchmarkCache_GetLVGReservedSpace(b *testing.B) { + cache := NewCache(logger.Logger{}) + lvg := &v1alpha1.LvmVolumeGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: "first", + }, + } + + cache.AddLVG(lvg) + + pvcs := []v1.PersistentVolumeClaim{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pvc-1", + }, + Spec: v1.PersistentVolumeClaimSpec{ + Resources: v1.VolumeResourceRequirements{ + Requests: v1.ResourceList{ + "pvc": *resource.NewQuantity(1000000, resource.BinarySI), + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pvc-2", + }, + Spec: v1.PersistentVolumeClaimSpec{ + Resources: v1.VolumeResourceRequirements{ + Requests: v1.ResourceList{ + "pvc": *resource.NewQuantity(2000000, resource.BinarySI), + }, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pvc-3", + }, + Spec: v1.PersistentVolumeClaimSpec{ + Resources: v1.VolumeResourceRequirements{ + Requests: v1.ResourceList{ + "pvc": *resource.NewQuantity(30000000, resource.BinarySI), + }, + }, + }, + }, + } + + for _, pvc := range pvcs { + err := cache.AddPVC(lvg.Name, &pvc) + if err != nil { + b.Error(err) + } + } + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + _, err := cache.GetLVGReservedSpace(lvg.Name) + if err != nil { + b.Error(err) + } + } + }) +} + +func BenchmarkCache_AddPVC(b *testing.B) { + cache := NewCache(logger.Logger{}) + + lvg1 := &v1alpha1.LvmVolumeGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: "first", + }, + Status: v1alpha1.LvmVolumeGroupStatus{ + Nodes: []v1alpha1.LvmVolumeGroupNode{ + {Name: "test-node1"}, + }, + }, + } + lvg2 := &v1alpha1.LvmVolumeGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: "second", + }, + Status: v1alpha1.LvmVolumeGroupStatus{ + Nodes: []v1alpha1.LvmVolumeGroupNode{ + {Name: "test-node2"}, + }, + }, + } + lvg3 := &v1alpha1.LvmVolumeGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: "third", + }, + Status: v1alpha1.LvmVolumeGroupStatus{ + Nodes: []v1alpha1.LvmVolumeGroupNode{ + {Name: "test-node3"}, + }, + }, + } + cache.AddLVG(lvg1) + cache.AddLVG(lvg2) + cache.AddLVG(lvg3) + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + pvc := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-pvc", + Namespace: "test-ns", + }, + Status: v1.PersistentVolumeClaimStatus{ + Phase: v1.ClaimPending, + }, + } + + err := cache.AddPVC(lvg1.Name, pvc) + if err != nil { + b.Error(err) + } + err = cache.AddPVC(lvg2.Name, pvc) + if err != nil { + b.Error(err) + } + err = cache.AddPVC(lvg3.Name, pvc) + if err != nil { + b.Error(err) + } + + lvgs := cache.GetLVGNamesForPVC(pvc) + b.Log(lvgs) + } + }) +} + +func BenchmarkCache_GetAllLVG(b *testing.B) { + cache := NewCache(logger.Logger{}) + lvgs := map[string]*lvgCache{ + "first": { + lvg: &v1alpha1.LvmVolumeGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: "first", + }, + }, + }, + "second": { + lvg: &v1alpha1.LvmVolumeGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: "second", + }, + }, + }, + } + + for _, lvg := range lvgs { + cache.lvgs.Store(lvg.lvg.Name, lvg) + } + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + mp := cache.GetAllLVG() + + if len(mp) != 2 { + b.Error("not enough LVG") + } + } + }) +} + +func BenchmarkCache_GetLVGNamesByNodeName(b *testing.B) { + cache := NewCache(logger.Logger{}) + lvgs := []string{ + "first", + "second", + "third", + } + nodeName := "test-node" + + cache.nodeLVGs.Store(nodeName, lvgs) + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + l := cache.GetLVGNamesByNodeName(nodeName) + if len(l) != 3 { + b.Error("not enough LVG") + } + } + }) +} + +func BenchmarkCache_TryGetLVG(b *testing.B) { + cache := NewCache(logger.Logger{}) + name := "test-name" + + lvg := &v1alpha1.LvmVolumeGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + } + cache.AddLVG(lvg) + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + l := cache.TryGetLVG(lvg.Name) + if l == nil { + b.Error("nil LVG from cache") + } + } + }) +} + +func BenchmarkCache_AddLVG(b *testing.B) { + cache := NewCache(logger.Logger{}) + i := 0 + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + i++ + lvg1 := &v1alpha1.LvmVolumeGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("test-lvg-%d", i), + }, + Status: v1alpha1.LvmVolumeGroupStatus{ + Nodes: []v1alpha1.LvmVolumeGroupNode{ + { + Name: "test-1", + }, + }, + }, + } + + lvg2 := &v1alpha1.LvmVolumeGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("test-lvg-%d", i+1), + }, + Status: v1alpha1.LvmVolumeGroupStatus{ + Nodes: []v1alpha1.LvmVolumeGroupNode{ + { + Name: "test-1", + }, + }, + }, + } + + lvg3 := &v1alpha1.LvmVolumeGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("test-lvg-%d", i+2), + }, + Status: v1alpha1.LvmVolumeGroupStatus{ + Nodes: []v1alpha1.LvmVolumeGroupNode{ + { + Name: "test-1", + }, + }, + }, + } + + cache.AddLVG(lvg1) + cache.AddLVG(lvg2) + cache.AddLVG(lvg3) + + lvgs, _ := cache.nodeLVGs.Load("test-1") + b.Log(lvgs.([]string)) + } + }) +} + +func TestCache_UpdateLVG(t *testing.T) { + cache := NewCache(logger.Logger{}) + name := "test-lvg" + lvg := &v1alpha1.LvmVolumeGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Status: v1alpha1.LvmVolumeGroupStatus{ + AllocatedSize: "1Gi", + }, + } + cache.AddLVG(lvg) + + newLVG := &v1alpha1.LvmVolumeGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Status: v1alpha1.LvmVolumeGroupStatus{ + AllocatedSize: "2Gi", + }, + } + + err := cache.UpdateLVG(newLVG) + if err != nil { + t.Error(err) + } + + updatedLvg := cache.TryGetLVG(name) + assert.Equal(t, newLVG.Status.AllocatedSize, updatedLvg.Status.AllocatedSize) +} + +func BenchmarkCache_UpdateLVG(b *testing.B) { + cache := NewCache(logger.Logger{}) + name := "test-name" + i := 0 + + lvg := &v1alpha1.LvmVolumeGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + } + cache.AddLVG(lvg) + + _, found := cache.lvgs.Load(name) + if !found { + b.Error("not found LVG") + } + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + i++ + updated := &v1alpha1.LvmVolumeGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Status: v1alpha1.LvmVolumeGroupStatus{ + AllocatedSize: fmt.Sprintf("2%dGi", i), + }, + } + b.Logf("updates the LVG with allocated size: %s", updated.Status.AllocatedSize) + err := cache.UpdateLVG(updated) + if err != nil { + b.Error(err) + } + } + }) +} + +func BenchmarkCache_UpdatePVC(b *testing.B) { + cache := NewCache(logger.Logger{}) + i := 0 + lvg := &v1alpha1.LvmVolumeGroup{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-lvg", + }, + Status: v1alpha1.LvmVolumeGroupStatus{ + Nodes: []v1alpha1.LvmVolumeGroupNode{ + { + Name: "test-node", + }, + }, + }, + } + cache.AddLVG(lvg) + + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + i++ + pvc := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("test-pvc-%d", i), + Namespace: "test-ns", + }, + } + + updatedPVC := &v1.PersistentVolumeClaim{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("test-pvc-%d", i), + Namespace: "test-ns", + Annotations: map[string]string{ + SelectedNodeAnnotation: "test-node", + }, + }, + } + err := cache.UpdatePVC(lvg.Name, pvc) + if err != nil { + b.Error(err) + } + err = cache.UpdatePVC(lvg.Name, updatedPVC) + if err != nil { + b.Error(err) + } + } + }) +} + +func BenchmarkCache_FullLoad(b *testing.B) { + cache := NewCache(logger.Logger{}) + + const ( + nodeName = "test-node" + ) + + i := 0 + b.RunParallel(func(pb *testing.PB) { + for pb.Next() { + i++ + + lvgs := []*v1alpha1.LvmVolumeGroup{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("test-lvg-%d", i), + }, + Status: v1alpha1.LvmVolumeGroupStatus{ + Nodes: []v1alpha1.LvmVolumeGroupNode{ + { + Name: nodeName, + }, + }, + AllocatedSize: fmt.Sprintf("1%dGi", i), + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("test-lvg-%d", i+1), + }, + Status: v1alpha1.LvmVolumeGroupStatus{ + Nodes: []v1alpha1.LvmVolumeGroupNode{ + { + Name: nodeName, + }, + }, + AllocatedSize: fmt.Sprintf("1%dGi", i), + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("test-lvg-%d", i+2), + }, + Status: v1alpha1.LvmVolumeGroupStatus{ + Nodes: []v1alpha1.LvmVolumeGroupNode{ + { + Name: nodeName, + }, + }, + AllocatedSize: fmt.Sprintf("1%dGi", i), + }, + }, + } + + for _, lvg := range lvgs { + cache.AddLVG(lvg) + pvcs := []*v1.PersistentVolumeClaim{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("test-pvc-%d", i), + Namespace: "test-ns", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("test-pvc-%d", i+1), + Namespace: "test-ns", + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("test-pvc-%d", i+2), + Namespace: "test-ns", + }, + }, + } + + for _, pvc := range pvcs { + err := cache.AddPVC(lvg.Name, pvc) + if err != nil { + b.Error(err) + } + + cache.GetLVGNamesForPVC(pvc) + } + } + + updatedLvgs := []*v1alpha1.LvmVolumeGroup{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("test-lvg-%d", i), + }, + Status: v1alpha1.LvmVolumeGroupStatus{ + AllocatedSize: fmt.Sprintf("1%dGi", i+1), + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("test-lvg-%d", i+1), + }, + Status: v1alpha1.LvmVolumeGroupStatus{ + AllocatedSize: fmt.Sprintf("1%dGi", i+1), + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("test-lvg-%d", i+2), + }, + Status: v1alpha1.LvmVolumeGroupStatus{ + AllocatedSize: fmt.Sprintf("1%dGi", i+1), + }, + }, + } + + for _, lvg := range updatedLvgs { + var err error + for err != nil { + err = cache.UpdateLVG(lvg) + } + + pvcs := []*v1.PersistentVolumeClaim{ + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("test-pvc-%d", i), + Namespace: "test-ns", + Annotations: map[string]string{ + SelectedNodeAnnotation: nodeName, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("test-pvc-%d", i+1), + Namespace: "test-ns", + Annotations: map[string]string{ + SelectedNodeAnnotation: nodeName, + }, + }, + }, + { + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("test-pvc-%d", i+2), + Namespace: "test-ns", + Annotations: map[string]string{ + SelectedNodeAnnotation: nodeName, + }, + }, + }, + } + + for _, pvc := range pvcs { + for err != nil { + err = cache.UpdatePVC(lvg.Name, pvc) + } + + cache.GetLVGNamesForPVC(pvc) + } + } + + lvgMp := cache.GetAllLVG() + for lvgName := range lvgMp { + _, err := cache.GetAllPVCForLVG(lvgName) + if err != nil { + b.Error(err) + } + _, err = cache.GetLVGReservedSpace(lvgName) + if err != nil { + b.Error(err) + } + } + + cache.GetLVGNamesByNodeName(nodeName) + } + }) +} diff --git a/images/sds-local-volume-scheduler-extender/pkg/controller/lvg_watcher_cache.go b/images/sds-local-volume-scheduler-extender/pkg/controller/lvg_watcher_cache.go new file mode 100644 index 00000000..4db9230f --- /dev/null +++ b/images/sds-local-volume-scheduler-extender/pkg/controller/lvg_watcher_cache.go @@ -0,0 +1,178 @@ +package controller + +import ( + "context" + "errors" + "fmt" + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/client-go/util/workqueue" + "sds-local-volume-scheduler-extender/api/v1alpha1" + "sds-local-volume-scheduler-extender/pkg/cache" + "sds-local-volume-scheduler-extender/pkg/logger" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +const ( + LVGWatcherCacheCtrlName = "lvg-watcher-cache-controller" +) + +func RunLVGWatcherCacheController( + mgr manager.Manager, + log logger.Logger, + cache *cache.Cache, +) (controller.Controller, error) { + log.Info("[RunLVGWatcherCacheController] starts the work") + + c, err := controller.New(LVGWatcherCacheCtrlName, mgr, controller.Options{ + Reconciler: reconcile.Func(func(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { + return reconcile.Result{}, nil + }), + }) + if err != nil { + log.Error(err, "[RunCacheWatcherController] unable to create a controller") + return nil, err + } + + err = c.Watch(source.Kind(mgr.GetCache(), &v1alpha1.LvmVolumeGroup{}), handler.Funcs{ + CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) { + log.Info(fmt.Sprintf("[RunLVGWatcherCacheController] CreateFunc starts the cache reconciliation for the LVMVolumeGroup %s", e.Object.GetName())) + + lvg, ok := e.Object.(*v1alpha1.LvmVolumeGroup) + if !ok { + err = errors.New("unable to cast event object to a given type") + log.Error(err, "[RunLVGWatcherCacheController] an error occurred while handling create event") + return + } + + if lvg.DeletionTimestamp != nil { + log.Info(fmt.Sprintf("[RunLVGWatcherCacheController] the LVMVolumeGroup %s should not be reconciled", lvg.Name)) + return + } + + log.Debug(fmt.Sprintf("[RunLVGWatcherCacheController] tries to get the LVMVolumeGroup %s from the cache", lvg.Name)) + existedLVG := cache.TryGetLVG(lvg.Name) + if existedLVG != nil { + log.Debug(fmt.Sprintf("[RunLVGWatcherCacheController] the LVMVolumeGroup %s was found in the cache. It will be updated", lvg.Name)) + err = cache.UpdateLVG(lvg) + if err != nil { + log.Error(err, fmt.Sprintf("[RunLVGWatcherCacheController] unable to update the LVMVolumeGroup %s in the cache", lvg.Name)) + } + + log.Info(fmt.Sprintf("[RunLVGWatcherCacheController] cache was updated for the LVMVolumeGroup %s", lvg.Name)) + } else { + log.Debug(fmt.Sprintf("[RunLVGWatcherCacheController] the LVMVolumeGroup %s was not found. It will be added to the cache", lvg.Name)) + cache.AddLVG(lvg) + log.Info(fmt.Sprintf("[RunLVGWatcherCacheController] cache was added for the LVMVolumeGroup %s", lvg.Name)) + } + + log.Debug(fmt.Sprintf("[RunLVGWatcherCacheController] starts to clear the cache for the LVMVolumeGroup %s", lvg.Name)) + pvcs, err := cache.GetAllPVCForLVG(lvg.Name) + if err != nil { + log.Error(err, fmt.Sprintf("[RunLVGWatcherCacheController] unable to get all PVC for the LVMVolumeGroup %s", lvg.Name)) + } + + for _, pvc := range pvcs { + log.Trace(fmt.Sprintf("[RunLVGWatcherCacheController] cached PVC %s/%s belongs to LVMVolumeGroup %s", pvc.Namespace, pvc.Name, lvg.Name)) + log.Trace(fmt.Sprintf("[RunLVGWatcherCacheController] PVC %s/%s has status phase %s", pvc.Namespace, pvc.Name, pvc.Status.Phase)) + if pvc.Status.Phase == v1.ClaimBound { + log.Trace(fmt.Sprintf("[RunLVGWatcherCacheController] cached PVC %s/%s has Status.Phase Bound. It will be removed from the cache for LVMVolumeGroup %s", pvc.Namespace, pvc.Name, lvg.Name)) + cache.RemovePVCFromTheCache(pvc) + + log.Debug(fmt.Sprintf("[RunLVGWatcherCacheController] PVC %s/%s was removed from the cache for LVMVolumeGroup %s", pvc.Namespace, pvc.Name, lvg.Name)) + } + } + + log.Info(fmt.Sprintf("[RunLVGWatcherCacheController] cache for the LVMVolumeGroup %s was reconciled by CreateFunc", lvg.Name)) + }, + UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) { + log.Info(fmt.Sprintf("[RunCacheWatcherController] UpdateFunc starts the cache reconciliation for the LVMVolumeGroup %s", e.ObjectNew.GetName())) + oldLvg, ok := e.ObjectOld.(*v1alpha1.LvmVolumeGroup) + if !ok { + err = errors.New("unable to cast event object to a given type") + log.Error(err, "[RunLVGWatcherCacheController] an error occurred while handling create event") + return + } + + newLvg, ok := e.ObjectNew.(*v1alpha1.LvmVolumeGroup) + if !ok { + err = errors.New("unable to cast event object to a given type") + log.Error(err, "[RunLVGWatcherCacheController] an error occurred while handling create event") + return + } + + log.Debug(fmt.Sprintf("[RunLVGWatcherCacheController] starts to calculate the size difference for LVMVolumeGroup %s", newLvg.Name)) + oldSize, err := resource.ParseQuantity(oldLvg.Status.AllocatedSize) + if err != nil { + log.Error(err, fmt.Sprintf("[RunLVGWatcherCacheController] unable to parse the allocated size for the LVMVolumeGroup %s", oldLvg.Name)) + return + } + log.Trace(fmt.Sprintf("[RunLVGWatcherCacheController] old state LVMVolumeGroup %s has size %s", oldLvg.Name, oldSize.String())) + + newSize, err := resource.ParseQuantity(newLvg.Status.AllocatedSize) + if err != nil { + log.Error(err, fmt.Sprintf("[RunLVGWatcherCacheController] unable to parse the allocated size for the LVMVolumeGroup %s", oldLvg.Name)) + return + } + log.Trace(fmt.Sprintf("[RunLVGWatcherCacheController] new state LVMVolumeGroup %s has size %s", newLvg.Name, newSize.String())) + log.Debug(fmt.Sprintf("[RunLVGWatcherCacheController] successfully calculated the size difference for LVMVolumeGroup %s", newLvg.Name)) + + if newLvg.DeletionTimestamp != nil || + oldSize.Value() == newSize.Value() { + log.Debug(fmt.Sprintf("[RunLVGWatcherCacheController] the LVMVolumeGroup %s should not be reconciled", newLvg.Name)) + return + } + + log.Debug(fmt.Sprintf("[RunLVGWatcherCacheController] the LVMVolumeGroup %s should be reconciled by Update Func. It will be updated in the cache", newLvg.Name)) + err = cache.UpdateLVG(newLvg) + if err != nil { + log.Error(err, fmt.Sprintf("[RunLVGWatcherCacheController] unable to update the LVMVolumeGroup %s cache", newLvg.Name)) + return + } + log.Debug(fmt.Sprintf("[RunLVGWatcherCacheController] successfully updated the LVMVolumeGroup %s in the cache", newLvg.Name)) + + cachedPVCs, err := cache.GetAllPVCForLVG(newLvg.Name) + if err != nil { + log.Error(err, fmt.Sprintf("[RunLVGWatcherCacheController] unable to get all PVC for the LVMVolumeGroup %s", newLvg.Name)) + } + for _, pvc := range cachedPVCs { + log.Trace(fmt.Sprintf("[RunLVGWatcherCacheController] PVC %s/%s from the cache belongs to LVMVolumeGroup %s", pvc.Namespace, pvc.Name, newLvg.Name)) + log.Trace(fmt.Sprintf("[RunLVGWatcherCacheController] PVC %s/%s has status phase %s", pvc.Namespace, pvc.Name, pvc.Status.Phase)) + if pvc.Status.Phase == v1.ClaimBound { + log.Debug(fmt.Sprintf("[RunLVGWatcherCacheController] PVC %s/%s from the cache has Status.Phase Bound. It will be removed from the reserved space in the LVMVolumeGroup %s", pvc.Namespace, pvc.Name, newLvg.Name)) + err = cache.RemoveBoundedPVCSpaceReservation(newLvg.Name, pvc) + if err != nil { + log.Error(err, fmt.Sprintf("[RunLVGWatcherCacheController] unable to remove PVC %s/%s from the cache in the LVMVolumeGroup %s", pvc.Namespace, pvc.Name, newLvg.Name)) + continue + } + + log.Debug(fmt.Sprintf("[RunLVGWatcherCacheController] PVC %s/%s was removed from the LVMVolumeGroup %s in the cache", pvc.Namespace, pvc.Name, newLvg.Name)) + } + } + + log.Debug(fmt.Sprintf("[RunLVGWatcherCacheController] Update Func ends reconciliation the LVMVolumeGroup %s cache", newLvg.Name)) + }, + DeleteFunc: func(ctx context.Context, e event.DeleteEvent, q workqueue.RateLimitingInterface) { + log.Info(fmt.Sprintf("[RunCacheWatcherController] DeleteFunc starts the cache reconciliation for the LVMVolumeGroup %s", e.Object.GetName())) + lvg, ok := e.Object.(*v1alpha1.LvmVolumeGroup) + if !ok { + err = errors.New("unable to cast event object to a given type") + log.Error(err, "[RunLVGWatcherCacheController] an error occurred while handling create event") + return + } + cache.DeleteLVG(lvg.Name) + log.Debug(fmt.Sprintf("[RunLVGWatcherCacheController] LVMVolumeGroup %s was deleted from the cache", lvg.Name)) + }, + }) + if err != nil { + log.Error(err, "[RunCacheWatcherController] unable to watch the events") + return nil, err + } + + return c, nil +} diff --git a/images/sds-local-volume-scheduler-extender/pkg/controller/pvc_watcher_cache.go b/images/sds-local-volume-scheduler-extender/pkg/controller/pvc_watcher_cache.go new file mode 100644 index 00000000..c96ee95d --- /dev/null +++ b/images/sds-local-volume-scheduler-extender/pkg/controller/pvc_watcher_cache.go @@ -0,0 +1,190 @@ +package controller + +import ( + "context" + "errors" + "fmt" + v1 "k8s.io/api/core/v1" + v12 "k8s.io/api/storage/v1" + "k8s.io/client-go/util/workqueue" + "k8s.io/utils/strings/slices" + "sds-local-volume-scheduler-extender/pkg/cache" + "sds-local-volume-scheduler-extender/pkg/logger" + "sds-local-volume-scheduler-extender/pkg/scheduler" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" + "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/manager" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + "sigs.k8s.io/controller-runtime/pkg/source" +) + +const ( + PVCWatcherCacheCtrlName = "pvc-watcher-cache-controller" + sdsLocalVolumeProvisioner = "local.csi.storage.deckhouse.io" +) + +func RunPVCWatcherCacheController( + mgr manager.Manager, + log logger.Logger, + schedulerCache *cache.Cache, +) error { + log.Info("[RunPVCWatcherCacheController] starts the work") + + c, err := controller.New("test-pvc-watcher", mgr, controller.Options{ + Reconciler: reconcile.Func(func(ctx context.Context, request reconcile.Request) (reconcile.Result, error) { + return reconcile.Result{}, nil + }), + }) + if err != nil { + log.Error(err, "[RunPVCWatcherCacheController] unable to create controller") + return err + } + + err = c.Watch(source.Kind(mgr.GetCache(), &v1.PersistentVolumeClaim{}), handler.Funcs{ + CreateFunc: func(ctx context.Context, e event.CreateEvent, q workqueue.RateLimitingInterface) { + log.Info("[RunPVCWatcherCacheController] CreateFunc reconciliation starts") + pvc, ok := e.Object.(*v1.PersistentVolumeClaim) + if !ok { + err = errors.New("unable to cast event object to a given type") + log.Error(err, "[RunPVCWatcherCacheController] an error occurred while handling create event") + } + log.Debug(fmt.Sprintf("[RunPVCWatcherCacheController] CreateFunc starts the reconciliation for the PVC %s/%s", pvc.Namespace, pvc.Name)) + + if pvc.Annotations == nil { + log.Debug(fmt.Sprintf("[RunPVCWatcherCacheController] PVC %s/%s should not be reconciled by CreateFunc due to annotations is nil", pvc.Namespace, pvc.Name)) + return + } + + selectedNodeName, wasSelected := pvc.Annotations[cache.SelectedNodeAnnotation] + if !wasSelected || + pvc.Status.Phase == v1.ClaimBound || + pvc.DeletionTimestamp != nil { + log.Debug(fmt.Sprintf("[RunPVCWatcherCacheController] PVC %s/%s should not be reconciled by CreateFunc due to no selected node annotaion found or deletion timestamp is not nil", pvc.Namespace, pvc.Name)) + return + } + log.Debug(fmt.Sprintf("[RunPVCWatcherCacheController] PVC %s/%s has selected node annotation, it will be reconciled in CreateFunc", pvc.Namespace, pvc.Name)) + log.Trace(fmt.Sprintf("[RunPVCWatcherCacheController] PVC %s/%s has been selected to the node %s", pvc.Namespace, pvc.Name, selectedNodeName)) + + reconcilePVC(ctx, mgr, log, schedulerCache, pvc, selectedNodeName) + log.Info("[RunPVCWatcherCacheController] CreateFunc reconciliation ends") + }, + UpdateFunc: func(ctx context.Context, e event.UpdateEvent, q workqueue.RateLimitingInterface) { + log.Info("[RunPVCWatcherCacheController] Update Func reconciliation starts") + pvc, ok := e.ObjectNew.(*v1.PersistentVolumeClaim) + if !ok { + err = errors.New("unable to cast event object to a given type") + log.Error(err, "[RunPVCWatcherCacheController] an error occurred while handling create event") + } + log.Debug(fmt.Sprintf("[RunPVCWatcherCacheController] UpdateFunc starts the reconciliation for the PVC %s/%s", pvc.Namespace, pvc.Name)) + + if pvc.Annotations == nil { + log.Debug(fmt.Sprintf("[RunPVCWatcherCacheController] PVC %s/%s should not be reconciled by UpdateFunc due to annotations is nil", pvc.Namespace, pvc.Name)) + return + } + + selectedNodeName, wasSelected := pvc.Annotations[cache.SelectedNodeAnnotation] + if !wasSelected || pvc.DeletionTimestamp != nil { + log.Debug(fmt.Sprintf("[RunPVCWatcherCacheController] PVC %s/%s should not be reconciled by UpdateFunc due to no selected node annotaion found or deletion timestamp is not nil", pvc.Namespace, pvc.Name)) + return + } + log.Debug(fmt.Sprintf("[RunPVCWatcherCacheController] PVC %s/%s has selected node annotation, it will be reconciled in UpdateFunc", pvc.Namespace, pvc.Name)) + log.Trace(fmt.Sprintf("[RunPVCWatcherCacheController] PVC %s/%s has been selected to the node %s", pvc.Namespace, pvc.Name, selectedNodeName)) + + reconcilePVC(ctx, mgr, log, schedulerCache, pvc, selectedNodeName) + log.Info("[RunPVCWatcherCacheController] Update Func reconciliation ends") + }, + DeleteFunc: func(ctx context.Context, e event.DeleteEvent, q workqueue.RateLimitingInterface) { + log.Info("[RunPVCWatcherCacheController] Delete Func reconciliation starts") + pvc, ok := e.Object.(*v1.PersistentVolumeClaim) + if !ok { + err = errors.New("unable to cast event object to a given type") + log.Error(err, "[RunPVCWatcherCacheController] an error occurred while handling create event") + } + log.Debug(fmt.Sprintf("[RunPVCWatcherCacheController] DeleteFunc starts the reconciliation for the PVC %s/%s", pvc.Namespace, pvc.Name)) + + log.Debug(fmt.Sprintf("[RunPVCWatcherCacheController] PVC %s/%s was removed from the cluster. It will be full removed from the cache", pvc.Namespace, pvc.Name)) + schedulerCache.RemovePVCFromTheCache(pvc) + log.Debug(fmt.Sprintf("[RunPVCWatcherCacheController] successfully full removed PVC %s/%s from the cache", pvc.Namespace, pvc.Name)) + }, + }) + if err != nil { + log.Error(err, "[RunPVCWatcherCacheController] unable to controller Watch") + return err + } + + return nil +} + +func reconcilePVC(ctx context.Context, mgr manager.Manager, log logger.Logger, schedulerCache *cache.Cache, pvc *v1.PersistentVolumeClaim, selectedNodeName string) { + log.Debug(fmt.Sprintf("[reconcilePVC] starts to find common LVMVolumeGroup for the selected node %s and PVC %s/%s", selectedNodeName, pvc.Namespace, pvc.Name)) + lvgsOnTheNode := schedulerCache.GetLVGNamesByNodeName(selectedNodeName) + for _, lvgName := range lvgsOnTheNode { + log.Trace(fmt.Sprintf("[reconcilePVC] LVMVolumeGroup %s belongs to the node %s", lvgName, selectedNodeName)) + } + + lvgsForPVC := schedulerCache.GetLVGNamesForPVC(pvc) + if lvgsForPVC == nil || len(lvgsForPVC) == 0 { + log.Debug(fmt.Sprintf("[reconcilePVC] no LVMVolumeGroups were found in the cache for PVC %s/%s. Use Storage Class %s instead", pvc.Namespace, pvc.Name, *pvc.Spec.StorageClassName)) + sc := &v12.StorageClass{} + err := mgr.GetClient().Get(ctx, client.ObjectKey{ + Name: *pvc.Spec.StorageClassName, + }, sc) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcilePVC] unable to get Storage Class %s for PVC %s/%s", *pvc.Spec.StorageClassName, pvc.Namespace, pvc.Name)) + return + } + + if sc.Provisioner != sdsLocalVolumeProvisioner { + log.Debug(fmt.Sprintf("[reconcilePVC] Storage Class %s for PVC %s/%s is not managed by sds-local-volume-provisioner. Ends the reconciliation", sc.Name, pvc.Namespace, pvc.Name)) + return + } + + lvgsFromSc, err := scheduler.ExtractLVGsFromSC(sc) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcilePVC] unable to extract LVMVolumeGroups from the Storage Class %s", sc.Name)) + } + + for _, lvg := range lvgsFromSc { + lvgsForPVC = append(lvgsForPVC, lvg.Name) + } + } + for _, lvgName := range lvgsForPVC { + log.Trace(fmt.Sprintf("[reconcilePVC] LVMVolumeGroup %s belongs to PVC %s/%s", lvgName, pvc.Namespace, pvc.Name)) + } + + var commonLVGName string + for _, pvcLvg := range lvgsForPVC { + if slices.Contains(lvgsOnTheNode, pvcLvg) { + commonLVGName = pvcLvg + } + } + if commonLVGName == "" { + log.Error(errors.New("common LVMVolumeGroup was not found"), fmt.Sprintf("[reconcilePVC] unable to identify a LVMVolumeGroup for PVC %s/%s", pvc.Namespace, pvc.Name)) + return + } + + log.Debug(fmt.Sprintf("[reconcilePVC] successfully found common LVMVolumeGroup %s for the selected node %s and PVC %s/%s", commonLVGName, selectedNodeName, pvc.Namespace, pvc.Name)) + log.Debug(fmt.Sprintf("[reconcilePVC] starts to update PVC %s/%s in the cache", pvc.Namespace, pvc.Name)) + log.Trace(fmt.Sprintf("[reconcilePVC] PVC %s/%s has status phase: %s", pvc.Namespace, pvc.Name, pvc.Status.Phase)) + err := schedulerCache.UpdatePVC(commonLVGName, pvc) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcilePVC] unable to update PVC %s/%s in the cache", pvc.Namespace, pvc.Name)) + return + } + log.Debug(fmt.Sprintf("[reconcilePVC] successfully updated PVC %s/%s in the cache", pvc.Namespace, pvc.Name)) + + log.Cache(fmt.Sprintf("[reconcilePVC] cache state BEFORE the removal space reservation for PVC %s/%s", pvc.Namespace, pvc.Name)) + schedulerCache.PrintTheCacheLog() + log.Debug(fmt.Sprintf("[reconcilePVC] starts to remove space reservation for PVC %s/%s with selected node from the cache", pvc.Namespace, pvc.Name)) + err = schedulerCache.RemoveSpaceReservationForPVCWithSelectedNode(pvc) + if err != nil { + log.Error(err, fmt.Sprintf("[reconcilePVC] unable to remove PVC %s/%s space reservation in the cache", pvc.Namespace, pvc.Name)) + return + } + log.Debug(fmt.Sprintf("[reconcilePVC] successfully removed space reservation for PVC %s/%s with selected node", pvc.Namespace, pvc.Name)) + + log.Cache(fmt.Sprintf("[reconcilePVC] cache state AFTER the removal space reservation for PVC %s/%s", pvc.Namespace, pvc.Name)) + schedulerCache.PrintTheCacheLog() +} diff --git a/images/sds-local-volume-scheduler-extender/pkg/logger/logger.go b/images/sds-local-volume-scheduler-extender/pkg/logger/logger.go index 19dd40a3..0f6bc2de 100644 --- a/images/sds-local-volume-scheduler-extender/pkg/logger/logger.go +++ b/images/sds-local-volume-scheduler-extender/pkg/logger/logger.go @@ -27,6 +27,7 @@ const ( InfoLevel Verbosity = "2" DebugLevel Verbosity = "3" TraceLevel Verbosity = "4" + CacheLevel Verbosity = "5" ) const ( @@ -34,6 +35,7 @@ const ( infoLvl debugLvl traceLvl + cacheLvl ) type ( @@ -79,3 +81,7 @@ func (l Logger) Debug(message string, keysAndValues ...interface{}) { func (l Logger) Trace(message string, keysAndValues ...interface{}) { l.log.V(traceLvl).Info(fmt.Sprintf("TRACE %s", message), keysAndValues...) } + +func (l Logger) Cache(message string, keysAndValues ...interface{}) { + l.log.V(cacheLvl).Info(fmt.Sprintf("CACHE %s", message), keysAndValues...) +} diff --git a/images/sds-local-volume-scheduler-extender/pkg/scheduler/filter.go b/images/sds-local-volume-scheduler-extender/pkg/scheduler/filter.go index 09195373..bc8e5d57 100644 --- a/images/sds-local-volume-scheduler-extender/pkg/scheduler/filter.go +++ b/images/sds-local-volume-scheduler-extender/pkg/scheduler/filter.go @@ -21,19 +21,22 @@ import ( "encoding/json" "errors" "fmt" - "net/http" - "sds-local-volume-scheduler-extender/api/v1alpha1" - "sds-local-volume-scheduler-extender/pkg/logger" - "sync" - corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/utils/strings/slices" + "net/http" + "sds-local-volume-scheduler-extender/api/v1alpha1" + "sds-local-volume-scheduler-extender/pkg/cache" + "sds-local-volume-scheduler-extender/pkg/logger" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/yaml" + "sync" ) const ( + sdsLocalVolumeProvisioner = "local.csi.storage.deckhouse.io" + lvmTypeParamKey = "local.csi.storage.deckhouse.io/lvm-type" lvmVolumeGroupsParamKey = "local.csi.storage.deckhouse.io/lvm-volume-groups" @@ -41,7 +44,7 @@ const ( thin = "Thin" ) -func (s scheduler) filter(w http.ResponseWriter, r *http.Request) { +func (s *scheduler) filter(w http.ResponseWriter, r *http.Request) { s.log.Debug("[filter] starts the serving") var input ExtenderArgs reader := http.MaxBytesReader(w, r.Body, 10<<20) @@ -52,18 +55,35 @@ func (s scheduler) filter(w http.ResponseWriter, r *http.Request) { return } + s.log.Debug(fmt.Sprintf("[filter] starts the filtering for Pod %s/%s", input.Pod.Namespace, input.Pod.Name)) + for _, n := range input.Nodes.Items { - s.log.Trace(fmt.Sprintf("[filter] a node from request, name :%s", n.Name)) + s.log.Trace(fmt.Sprintf("[filter] Pod %s/%s has node %s from the request", input.Pod.Namespace, input.Pod.Name, n.Name)) } - pvcs, err := getUsedPVC(s.ctx, s.client, input.Pod) + pvcs, err := getUsedPVC(s.ctx, s.client, s.log, input.Pod) if err != nil { - s.log.Error(err, "[filter] unable to get PVC from the Pod") + s.log.Error(err, fmt.Sprintf("[filter] unable to get used PVC for a Pod %s in the namespace %s", input.Pod.Name, input.Pod.Namespace)) + http.Error(w, "internal server error", http.StatusInternalServerError) + return + } + if len(pvcs) == 0 { + s.log.Error(fmt.Errorf("no PVC was found for pod %s in namespace %s", input.Pod.Name, input.Pod.Namespace), fmt.Sprintf("[filter] unable to get used PVC for Pod %s", input.Pod.Name)) http.Error(w, "bad request", http.StatusBadRequest) return } + for _, pvc := range pvcs { - s.log.Trace(fmt.Sprintf("[filter] used PVC: %s", pvc.Name)) + s.log.Trace(fmt.Sprintf("[filter] Pod %s/%s uses PVC: %s", input.Pod.Namespace, input.Pod.Name, pvc.Name)) + + // this might happen when the extender-scheduler recovers after failure, populates the cache with PVC-watcher controller and then + // the kube scheduler post a request to schedule the pod with the PVC. + if s.cache.CheckIsPVCStored(pvc) { + s.log.Debug(fmt.Sprintf("[filter] PVC %s/%s has been already stored in the cache. Old state will be removed from the cache", pvc.Namespace, pvc.Name)) + s.cache.RemovePVCFromTheCache(pvc) + } else { + s.log.Debug(fmt.Sprintf("[filter] PVC %s/%s was not found in the scheduler cache", pvc.Namespace, pvc.Name)) + } } scs, err := getStorageClassesUsedByPVCs(s.ctx, s.client, pvcs) @@ -73,32 +93,105 @@ func (s scheduler) filter(w http.ResponseWriter, r *http.Request) { return } for _, sc := range scs { - s.log.Trace(fmt.Sprintf("[filter] used StorageClasses: %s", sc.Name)) + s.log.Trace(fmt.Sprintf("[filter] Pod %s/%s uses StorageClass: %s", input.Pod.Namespace, input.Pod.Name, sc.Name)) } - s.log.Debug("[filter] starts to extract pvcRequests size") - pvcRequests, err := extractRequestedSize(s.ctx, s.client, s.log, pvcs, scs) + managedPVCs := filterNotManagedPVC(s.log, pvcs, scs) + for _, pvc := range managedPVCs { + s.log.Trace(fmt.Sprintf("[filter] filtered managed PVC %s/%s", pvc.Namespace, pvc.Name)) + } + + s.log.Debug(fmt.Sprintf("[filter] starts to extract PVC requested sizes for a Pod %s/%s", input.Pod.Namespace, input.Pod.Name)) + pvcRequests, err := extractRequestedSize(s.ctx, s.client, s.log, managedPVCs, scs) if err != nil { - s.log.Error(err, fmt.Sprintf("[filter] unable to extract request size for a pod %s", input.Pod.Name)) + s.log.Error(err, fmt.Sprintf("[filter] unable to extract request size for a Pod %s/%s", input.Pod.Namespace, input.Pod.Name)) http.Error(w, "bad request", http.StatusBadRequest) + return } - s.log.Debug("[filter] successfully extracted the pvcRequests size") + s.log.Debug(fmt.Sprintf("[filter] successfully extracted the PVC requested sizes of a Pod %s/%s", input.Pod.Namespace, input.Pod.Name)) - s.log.Debug("[filter] starts to filter the nodes") - result, err := filterNodes(s.ctx, s.client, s.log, input.Nodes, pvcs, scs, pvcRequests) + s.log.Debug(fmt.Sprintf("[filter] starts to filter the nodes from the request for a Pod %s/%s", input.Pod.Namespace, input.Pod.Name)) + filteredNodes, err := filterNodes(s.log, s.cache, input.Nodes, input.Pod, managedPVCs, scs, pvcRequests) if err != nil { s.log.Error(err, "[filter] unable to filter the nodes") http.Error(w, "bad request", http.StatusBadRequest) + return } - s.log.Debug("[filter] successfully filtered the nodes") + s.log.Debug(fmt.Sprintf("[filter] successfully filtered the nodes from the request for a Pod %s/%s", input.Pod.Namespace, input.Pod.Name)) + + s.log.Debug(fmt.Sprintf("[filter] starts to populate the cache for a Pod %s/%s", input.Pod.Namespace, input.Pod.Name)) + s.log.Cache(fmt.Sprintf("[filter] cache before the PVC reservation for a Pod %s/%s", input.Pod.Namespace, input.Pod.Name)) + s.cache.PrintTheCacheLog() + err = populateCache(s.log, filteredNodes.Nodes.Items, input.Pod, s.cache, managedPVCs, scs) + if err != nil { + s.log.Error(err, "[filter] unable to populate cache") + http.Error(w, "bad request", http.StatusBadRequest) + return + } + s.log.Debug(fmt.Sprintf("[filter] successfully populated the cache for a Pod %s/%s", input.Pod.Namespace, input.Pod.Name)) + s.log.Cache(fmt.Sprintf("[filter] cache after the PVC reservation for a Pod %s/%s", input.Pod.Namespace, input.Pod.Name)) + s.cache.PrintTheCacheLog() w.Header().Set("content-type", "application/json") - err = json.NewEncoder(w).Encode(result) + err = json.NewEncoder(w).Encode(filteredNodes) if err != nil { s.log.Error(err, "[filter] unable to encode a response") http.Error(w, "internal error", http.StatusInternalServerError) + return + } + s.log.Debug(fmt.Sprintf("[filter] ends the serving the request for a Pod %s/%s", input.Pod.Namespace, input.Pod.Name)) +} + +func filterNotManagedPVC(log logger.Logger, pvcs map[string]*corev1.PersistentVolumeClaim, scs map[string]*v1.StorageClass) map[string]*corev1.PersistentVolumeClaim { + filteredPVCs := make(map[string]*corev1.PersistentVolumeClaim, len(pvcs)) + for _, pvc := range pvcs { + sc := scs[*pvc.Spec.StorageClassName] + if sc.Provisioner != sdsLocalVolumeProvisioner { + log.Debug(fmt.Sprintf("[filterNotManagedPVC] filter out PVC %s/%s due to used Storage class %s is not managed by sds-local-volume-provisioner", pvc.Name, pvc.Namespace, sc.Name)) + continue + } + + filteredPVCs[pvc.Name] = pvc + } + + return filteredPVCs +} + +func populateCache(log logger.Logger, nodes []corev1.Node, pod *corev1.Pod, schedulerCache *cache.Cache, pvcs map[string]*corev1.PersistentVolumeClaim, scs map[string]*v1.StorageClass) error { + for _, node := range nodes { + for _, volume := range pod.Spec.Volumes { + if volume.PersistentVolumeClaim != nil { + log.Debug(fmt.Sprintf("[populateCache] reconcile the PVC %s for Pod %s/%s on node %s", volume.PersistentVolumeClaim.ClaimName, pod.Namespace, pod.Name, node.Name)) + lvgNamesForTheNode := schedulerCache.GetLVGNamesByNodeName(node.Name) + log.Trace(fmt.Sprintf("[populateCache] LVMVolumeGroups from cache for the node %s: %v", node.Name, lvgNamesForTheNode)) + pvc := pvcs[volume.PersistentVolumeClaim.ClaimName] + sc := scs[*pvc.Spec.StorageClassName] + + if sc.Parameters[lvmTypeParamKey] == thick { + log.Debug(fmt.Sprintf("[populateCache] Storage Class %s has device type Thick, so the cache will be populated by PVC space requests", sc.Name)) + lvgsForPVC, err := ExtractLVGsFromSC(sc) + if err != nil { + return err + } + + log.Trace(fmt.Sprintf("[populateCache] LVMVolumeGroups from Storage Class %s for PVC %s/%s: %+v", sc.Name, pvc.Namespace, pvc.Name, lvgsForPVC)) + for _, lvg := range lvgsForPVC { + if slices.Contains(lvgNamesForTheNode, lvg.Name) { + log.Trace(fmt.Sprintf("[populateCache] PVC %s/%s will reserve space in LVMVolumeGroup %s cache", pvc.Namespace, pvc.Name, lvg.Name)) + err = schedulerCache.AddPVC(lvg.Name, pvc) + if err != nil { + return err + } + } + } + } else { + log.Debug(fmt.Sprintf("[populateCache] Storage Class %s has device type Thin, so the cache should NOT be populated by PVC space requests", sc.Name)) + } + } + } } - s.log.Debug("[filter] ends the serving") + + return nil } type PVCRequest struct { @@ -110,8 +203,8 @@ func extractRequestedSize( ctx context.Context, cl client.Client, log logger.Logger, - pvcs map[string]corev1.PersistentVolumeClaim, - scs map[string]v1.StorageClass, + pvcs map[string]*corev1.PersistentVolumeClaim, + scs map[string]*v1.StorageClass, ) (map[string]PVCRequest, error) { pvs, err := getPersistentVolumes(ctx, cl) if err != nil { @@ -121,6 +214,7 @@ func extractRequestedSize( pvcRequests := make(map[string]PVCRequest, len(pvcs)) for _, pvc := range pvcs { sc := scs[*pvc.Spec.StorageClassName] + log.Debug(fmt.Sprintf("[extractRequestedSize] PVC %s/%s has status phase: %s", pvc.Namespace, pvc.Name, pvc.Status.Phase)) switch pvc.Status.Phase { case corev1.ClaimPending: switch sc.Parameters[lvmTypeParamKey] { @@ -161,12 +255,12 @@ func extractRequestedSize( } func filterNodes( - ctx context.Context, - cl client.Client, log logger.Logger, + schedulerCache *cache.Cache, nodes *corev1.NodeList, - pvcs map[string]corev1.PersistentVolumeClaim, - scs map[string]v1.StorageClass, + pod *corev1.Pod, + pvcs map[string]*corev1.PersistentVolumeClaim, + scs map[string]*v1.StorageClass, pvcRequests map[string]PVCRequest, ) (*ExtenderFilterResult, error) { // Param "pvcRequests" is a total amount of the pvcRequests space (both thick and thin) for Pod (i.e. from every PVC) @@ -176,26 +270,49 @@ func filterNodes( }, nil } - lvgs, err := getLVMVolumeGroups(ctx, cl) - if err != nil { - return nil, err + lvgs := schedulerCache.GetAllLVG() + for _, lvg := range lvgs { + log.Trace(fmt.Sprintf("[filterNodes] LVMVolumeGroup %s in the cache", lvg.Name)) } - lvgsThickFree, err := getLVGThickFreeSpaces(lvgs) + log.Debug(fmt.Sprintf("[filterNodes] starts to get LVMVolumeGroups for Storage Classes for a Pod %s/%s", pod.Namespace, pod.Name)) + scLVGs, err := GetSortedLVGsFromStorageClasses(scs) if err != nil { return nil, err } - log.Trace(fmt.Sprintf("[filterNodes] LVGs Thick FreeSpace: %+v", lvgsThickFree)) - lvgsThickFreeMutex := &sync.RWMutex{} + log.Debug(fmt.Sprintf("[filterNodes] successfully got LVMVolumeGroups for Storage Classes for a Pod %s/%s", pod.Namespace, pod.Name)) + for scName, sortedLVGs := range scLVGs { + for _, lvg := range sortedLVGs { + log.Trace(fmt.Sprintf("[filterNodes] LVMVolumeGroup %s belongs to Storage Class %s", lvg.Name, scName)) + } + } + + usedLVGs := RemoveUnusedLVGs(lvgs, scLVGs) + for _, lvg := range usedLVGs { + log.Trace(fmt.Sprintf("[filterNodes] the LVMVolumeGroup %s is actually used. VG size: %s, allocatedSize: %s", lvg.Name, lvg.Status.VGSize, lvg.Status.AllocatedSize)) + } - scLVGs, err := getSortedLVGsFromStorageClasses(scs) + lvgsThickFree, err := getLVGThickFreeSpaces(log, usedLVGs) if err != nil { return nil, err } + log.Trace(fmt.Sprintf("[filterNodes] for a Pod %s/%s current LVMVolumeGroups Thick FreeSpace on the node: %+v", pod.Namespace, pod.Name, lvgsThickFree)) - usedLVGs := removeUnusedLVGs(lvgs, scLVGs) + for lvgName, freeSpace := range lvgsThickFree { + log.Trace(fmt.Sprintf("[filterNodes] current LVMVolumeGroup %s Thick free space %s", lvgName, resource.NewQuantity(freeSpace, resource.BinarySI))) + reservedSize, err := schedulerCache.GetLVGReservedSpace(lvgName) + if err != nil { + log.Error(err, fmt.Sprintf("[filterNodes] unable to cound cache reserved size for the LVMVolumeGroup %s", lvgName)) + continue + } + log.Trace(fmt.Sprintf("[filterNodes] current LVMVolumeGroup %s reserved PVC space %s", lvgName, resource.NewQuantity(reservedSize, resource.BinarySI))) + lvgsThickFree[lvgName] -= reservedSize + } + log.Trace(fmt.Sprintf("[filterNodes] for a Pod %s/%s current LVMVolumeGroups Thick FreeSpace with reserved PVC: %+v", pod.Namespace, pod.Name, lvgsThickFree)) + + lvgsThickFreeMutex := &sync.RWMutex{} - nodeLVGs := sortLVGsByNodeName(usedLVGs) + nodeLVGs := SortLVGsByNodeName(usedLVGs) for n, ls := range nodeLVGs { for _, l := range ls { log.Trace(fmt.Sprintf("[filterNodes] the LVMVolumeGroup %s belongs to node %s", l.Name, n)) @@ -204,14 +321,14 @@ func filterNodes( commonNodes, err := getCommonNodesByStorageClasses(scs, nodeLVGs) for nodeName := range commonNodes { - log.Trace(fmt.Sprintf("[filterNodes] common node %s", nodeName)) + log.Trace(fmt.Sprintf("[filterNodes] Node %s is a common for every storage class", nodeName)) } result := &ExtenderFilterResult{ Nodes: &corev1.NodeList{}, FailedNodes: FailedNodesMap{}, } - failedNodesMapMutex := &sync.Mutex{} + failedNodesMutex := &sync.Mutex{} wg := &sync.WaitGroup{} wg.Add(len(nodes.Items)) @@ -227,9 +344,9 @@ func filterNodes( if _, common := commonNodes[node.Name]; !common { log.Debug(fmt.Sprintf("[filterNodes] node %s is not common for used Storage Classes", node.Name)) - failedNodesMapMutex.Lock() + failedNodesMutex.Lock() result.FailedNodes[node.Name] = "node is not common for used Storage Classes" - failedNodesMapMutex.Unlock() + failedNodesMutex.Unlock() return } @@ -291,9 +408,9 @@ func filterNodes( } if !hasEnoughSpace { - failedNodesMapMutex.Lock() + failedNodesMutex.Lock() result.FailedNodes[node.Name] = "not enough space" - failedNodesMapMutex.Unlock() + failedNodesMutex.Unlock() return } @@ -313,24 +430,26 @@ func filterNodes( } for _, node := range result.Nodes.Items { - log.Trace(fmt.Sprintf("[filterNodes] suitable node: %s", node.Name)) + log.Trace(fmt.Sprintf("[filterNodes] for a Pod %s/%s there is a suitable node: %s", pod.Namespace, pod.Name, node.Name)) } for node, reason := range result.FailedNodes { - log.Trace(fmt.Sprintf("[filterNodes] failed node: %s, reason: %s", node, reason)) + log.Trace(fmt.Sprintf("[filterNodes] for a Pod %s/%s there is a failed node: %s, reason: %s", pod.Namespace, pod.Name, node, reason)) } return result, nil } -func getLVGThickFreeSpaces(lvgs map[string]v1alpha1.LvmVolumeGroup) (map[string]int64, error) { +func getLVGThickFreeSpaces(log logger.Logger, lvgs map[string]*v1alpha1.LvmVolumeGroup) (map[string]int64, error) { result := make(map[string]int64, len(lvgs)) for _, lvg := range lvgs { - free, err := getVGFreeSpace(&lvg) + log.Debug(fmt.Sprintf("[getLVGThickFreeSpaces] tries to count free VG space for LVMVolumeGroup %s", lvg.Name)) + free, err := getVGFreeSpace(lvg) if err != nil { return nil, err } + log.Debug(fmt.Sprintf("[getLVGThickFreeSpaces] successfully counted free VG space for LVMVolumeGroup %s", lvg.Name)) result[lvg.Name] = free.Value() } @@ -348,7 +467,7 @@ func findMatchedThinPool(thinPools []v1alpha1.StatusThinPool, name string) *v1al return nil } -func findMatchedLVG(nodeLVGs []v1alpha1.LvmVolumeGroup, scLVGs LVMVolumeGroups) *LVMVolumeGroup { +func findMatchedLVG(nodeLVGs []*v1alpha1.LvmVolumeGroup, scLVGs LVMVolumeGroups) *LVMVolumeGroup { nodeLVGNames := make(map[string]struct{}, len(nodeLVGs)) for _, lvg := range nodeLVGs { nodeLVGNames[lvg.Name] = struct{}{} @@ -363,8 +482,8 @@ func findMatchedLVG(nodeLVGs []v1alpha1.LvmVolumeGroup, scLVGs LVMVolumeGroups) return nil } -func getCommonNodesByStorageClasses(scs map[string]v1.StorageClass, nodesWithLVGs map[string][]v1alpha1.LvmVolumeGroup) (map[string][]v1alpha1.LvmVolumeGroup, error) { - result := make(map[string][]v1alpha1.LvmVolumeGroup, len(nodesWithLVGs)) +func getCommonNodesByStorageClasses(scs map[string]*v1.StorageClass, nodesWithLVGs map[string][]*v1alpha1.LvmVolumeGroup) (map[string][]*v1alpha1.LvmVolumeGroup, error) { + result := make(map[string][]*v1alpha1.LvmVolumeGroup, len(nodesWithLVGs)) for nodeName, lvgs := range nodesWithLVGs { lvgNames := make(map[string]struct{}, len(lvgs)) @@ -374,7 +493,7 @@ func getCommonNodesByStorageClasses(scs map[string]v1.StorageClass, nodesWithLVG nodeIncludesLVG := true for _, sc := range scs { - scLvgs, err := extractLVGsFromSC(sc) + scLvgs, err := ExtractLVGsFromSC(sc) if err != nil { return nil, err } @@ -401,8 +520,8 @@ func getCommonNodesByStorageClasses(scs map[string]v1.StorageClass, nodesWithLVG return result, nil } -func removeUnusedLVGs(lvgs map[string]v1alpha1.LvmVolumeGroup, scsLVGs map[string]LVMVolumeGroups) map[string]v1alpha1.LvmVolumeGroup { - result := make(map[string]v1alpha1.LvmVolumeGroup, len(lvgs)) +func RemoveUnusedLVGs(lvgs map[string]*v1alpha1.LvmVolumeGroup, scsLVGs map[string]LVMVolumeGroups) map[string]*v1alpha1.LvmVolumeGroup { + result := make(map[string]*v1alpha1.LvmVolumeGroup, len(lvgs)) usedLvgs := make(map[string]struct{}, len(lvgs)) for _, scLvgs := range scsLVGs { @@ -420,11 +539,11 @@ func removeUnusedLVGs(lvgs map[string]v1alpha1.LvmVolumeGroup, scsLVGs map[strin return result } -func getSortedLVGsFromStorageClasses(scs map[string]v1.StorageClass) (map[string]LVMVolumeGroups, error) { +func GetSortedLVGsFromStorageClasses(scs map[string]*v1.StorageClass) (map[string]LVMVolumeGroups, error) { result := make(map[string]LVMVolumeGroups, len(scs)) for _, sc := range scs { - lvgs, err := extractLVGsFromSC(sc) + lvgs, err := ExtractLVGsFromSC(sc) if err != nil { return nil, err } @@ -445,7 +564,7 @@ type LVMVolumeGroup struct { } type LVMVolumeGroups []LVMVolumeGroup -func extractLVGsFromSC(sc v1.StorageClass) (LVMVolumeGroups, error) { +func ExtractLVGsFromSC(sc *v1.StorageClass) (LVMVolumeGroups, error) { var lvmVolumeGroups LVMVolumeGroups err := yaml.Unmarshal([]byte(sc.Parameters[lvmVolumeGroupsParamKey]), &lvmVolumeGroups) if err != nil { @@ -454,8 +573,8 @@ func extractLVGsFromSC(sc v1.StorageClass) (LVMVolumeGroups, error) { return lvmVolumeGroups, nil } -func sortLVGsByNodeName(lvgs map[string]v1alpha1.LvmVolumeGroup) map[string][]v1alpha1.LvmVolumeGroup { - sorted := make(map[string][]v1alpha1.LvmVolumeGroup, len(lvgs)) +func SortLVGsByNodeName(lvgs map[string]*v1alpha1.LvmVolumeGroup) map[string][]*v1alpha1.LvmVolumeGroup { + sorted := make(map[string][]*v1alpha1.LvmVolumeGroup, len(lvgs)) for _, lvg := range lvgs { for _, node := range lvg.Status.Nodes { sorted[node.Name] = append(sorted[node.Name], lvg) @@ -465,30 +584,15 @@ func sortLVGsByNodeName(lvgs map[string]v1alpha1.LvmVolumeGroup) map[string][]v1 return sorted } -func getLVMVolumeGroups(ctx context.Context, cl client.Client) (map[string]v1alpha1.LvmVolumeGroup, error) { - lvgl := &v1alpha1.LvmVolumeGroupList{} - err := cl.List(ctx, lvgl) - if err != nil { - return nil, err - } - - lvgMap := make(map[string]v1alpha1.LvmVolumeGroup, len(lvgl.Items)) - for _, lvg := range lvgl.Items { - lvgMap[lvg.Name] = lvg - } - - return lvgMap, nil -} - func getVGFreeSpace(lvg *v1alpha1.LvmVolumeGroup) (resource.Quantity, error) { free, err := resource.ParseQuantity(lvg.Status.VGSize) if err != nil { - return resource.Quantity{}, err + return resource.Quantity{}, fmt.Errorf("unable to parse Status.VGSize quantity for LVMVolumeGroup %s, err: %w", lvg.Name, err) } used, err := resource.ParseQuantity(lvg.Status.AllocatedSize) if err != nil { - return resource.Quantity{}, err + return resource.Quantity{}, fmt.Errorf("unable to parse Status.AllocatedSize quantity for LVMVolumeGroup %s, err: %w", lvg.Name, err) } free.Sub(used) @@ -521,7 +625,7 @@ func getPersistentVolumes(ctx context.Context, cl client.Client) (map[string]cor return pvMap, nil } -func getStorageClassesUsedByPVCs(ctx context.Context, cl client.Client, pvcs map[string]corev1.PersistentVolumeClaim) (map[string]v1.StorageClass, error) { +func getStorageClassesUsedByPVCs(ctx context.Context, cl client.Client, pvcs map[string]*corev1.PersistentVolumeClaim) (map[string]*v1.StorageClass, error) { scs := &v1.StorageClassList{} err := cl.List(ctx, scs) if err != nil { @@ -533,7 +637,7 @@ func getStorageClassesUsedByPVCs(ctx context.Context, cl client.Client, pvcs map scMap[sc.Name] = sc } - result := make(map[string]v1.StorageClass, len(pvcs)) + result := make(map[string]*v1.StorageClass, len(pvcs)) for _, pvc := range pvcs { if pvc.Spec.StorageClassName == nil { err = errors.New(fmt.Sprintf("not StorageClass specified for PVC %s", pvc.Name)) @@ -542,32 +646,47 @@ func getStorageClassesUsedByPVCs(ctx context.Context, cl client.Client, pvcs map scName := *pvc.Spec.StorageClassName if sc, match := scMap[scName]; match { - result[sc.Name] = sc + result[sc.Name] = &sc } } return result, nil } -func getUsedPVC(ctx context.Context, cl client.Client, pod *corev1.Pod) (map[string]corev1.PersistentVolumeClaim, error) { - usedPvc := make(map[string]corev1.PersistentVolumeClaim, len(pod.Spec.Volumes)) - - pvcs := &corev1.PersistentVolumeClaimList{} - err := cl.List(ctx, pvcs) +func getUsedPVC(ctx context.Context, cl client.Client, log logger.Logger, pod *corev1.Pod) (map[string]*corev1.PersistentVolumeClaim, error) { + pvcMap, err := getAllPVCsFromNamespace(ctx, cl, pod.Namespace) if err != nil { + log.Error(err, fmt.Sprintf("[getUsedPVC] unable to get all PVC for Pod %s in the namespace %s", pod.Name, pod.Namespace)) return nil, err } - pvcMap := make(map[string]corev1.PersistentVolumeClaim, len(pvcs.Items)) - for _, pvc := range pvcs.Items { - pvcMap[pvc.Name] = pvc + for pvcName := range pvcMap { + log.Trace(fmt.Sprintf("[getUsedPVC] PVC %s is in namespace %s", pod.Namespace, pvcName)) } + usedPvc := make(map[string]*corev1.PersistentVolumeClaim, len(pod.Spec.Volumes)) for _, volume := range pod.Spec.Volumes { if volume.PersistentVolumeClaim != nil { - usedPvc[volume.PersistentVolumeClaim.ClaimName] = pvcMap[volume.PersistentVolumeClaim.ClaimName] + log.Trace(fmt.Sprintf("[getUsedPVC] Pod %s/%s uses PVC %s", pod.Namespace, pod.Name, volume.PersistentVolumeClaim.ClaimName)) + pvc := pvcMap[volume.PersistentVolumeClaim.ClaimName] + usedPvc[volume.PersistentVolumeClaim.ClaimName] = &pvc } } - return usedPvc, nil + return usedPvc, err +} + +func getAllPVCsFromNamespace(ctx context.Context, cl client.Client, namespace string) (map[string]corev1.PersistentVolumeClaim, error) { + list := &corev1.PersistentVolumeClaimList{} + err := cl.List(ctx, list, &client.ListOptions{Namespace: namespace}) + if err != nil { + return nil, err + } + + pvcs := make(map[string]corev1.PersistentVolumeClaim, len(list.Items)) + for _, pvc := range list.Items { + pvcs[pvc.Name] = pvc + } + + return pvcs, nil } diff --git a/images/sds-local-volume-scheduler-extender/pkg/scheduler/filter_test.go b/images/sds-local-volume-scheduler-extender/pkg/scheduler/filter_test.go new file mode 100644 index 00000000..29b4d5c8 --- /dev/null +++ b/images/sds-local-volume-scheduler-extender/pkg/scheduler/filter_test.go @@ -0,0 +1,75 @@ +package scheduler + +import ( + "github.com/stretchr/testify/assert" + v1 "k8s.io/api/core/v1" + v12 "k8s.io/api/storage/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sds-local-volume-scheduler-extender/pkg/logger" + "testing" +) + +func TestFilter(t *testing.T) { + log := logger.Logger{} + t.Run("filterNotManagedPVC", func(t *testing.T) { + sc1 := "sc1" + sc2 := "sc2" + sc3 := "sc3" + scs := map[string]*v12.StorageClass{ + sc1: { + ObjectMeta: metav1.ObjectMeta{ + Name: sc1, + }, + Provisioner: sdsLocalVolumeProvisioner, + }, + sc2: { + ObjectMeta: metav1.ObjectMeta{ + Name: sc2, + }, + Provisioner: sdsLocalVolumeProvisioner, + }, + sc3: { + ObjectMeta: metav1.ObjectMeta{ + Name: sc3, + }, + }, + } + pvcs := map[string]*v1.PersistentVolumeClaim{ + "first": { + ObjectMeta: metav1.ObjectMeta{ + Name: "first", + }, + Spec: v1.PersistentVolumeClaimSpec{ + StorageClassName: &sc1, + }, + }, + "second": { + ObjectMeta: metav1.ObjectMeta{ + Name: "second", + }, + Spec: v1.PersistentVolumeClaimSpec{ + StorageClassName: &sc2, + }, + }, + "third": { + ObjectMeta: metav1.ObjectMeta{ + Name: "third", + }, + Spec: v1.PersistentVolumeClaimSpec{ + StorageClassName: &sc3, + }, + }, + } + + filtered := filterNotManagedPVC(log, pvcs, scs) + + if assert.Equal(t, 2, len(filtered)) { + _, ok := filtered["first"] + assert.True(t, ok) + _, ok = filtered["second"] + assert.True(t, ok) + _, ok = filtered["third"] + assert.False(t, ok) + } + }) +} diff --git a/images/sds-local-volume-scheduler-extender/pkg/scheduler/prioritize.go b/images/sds-local-volume-scheduler-extender/pkg/scheduler/prioritize.go index c966f6c7..e65070b3 100644 --- a/images/sds-local-volume-scheduler-extender/pkg/scheduler/prioritize.go +++ b/images/sds-local-volume-scheduler-extender/pkg/scheduler/prioritize.go @@ -17,22 +17,21 @@ limitations under the License. package scheduler import ( - "context" "encoding/json" "errors" "fmt" "math" "net/http" + "sds-local-volume-scheduler-extender/pkg/cache" "sds-local-volume-scheduler-extender/pkg/logger" "sync" corev1 "k8s.io/api/core/v1" v1 "k8s.io/api/storage/v1" "k8s.io/apimachinery/pkg/api/resource" - "sigs.k8s.io/controller-runtime/pkg/client" ) -func (s scheduler) prioritize(w http.ResponseWriter, r *http.Request) { +func (s *scheduler) prioritize(w http.ResponseWriter, r *http.Request) { s.log.Debug("[prioritize] starts serving") var input ExtenderArgs reader := http.MaxBytesReader(w, r.Body, 10<<20) @@ -43,78 +42,85 @@ func (s scheduler) prioritize(w http.ResponseWriter, r *http.Request) { return } - pvcs, err := getUsedPVC(s.ctx, s.client, input.Pod) + s.log.Debug(fmt.Sprintf("[prioritize] starts the prioritizing for Pod %s/%s", input.Pod.Namespace, input.Pod.Name)) + + pvcs, err := getUsedPVC(s.ctx, s.client, s.log, input.Pod) if err != nil { - s.log.Error(err, "[prioritize] unable to get PVC from the Pod") + s.log.Error(err, fmt.Sprintf("[prioritize] unable to get PVC from the Pod %s/%s", input.Pod.Namespace, input.Pod.Name)) + http.Error(w, "internal server error", http.StatusInternalServerError) + return + } + if len(pvcs) == 0 { + s.log.Error(fmt.Errorf("no PVC was found for pod %s in namespace %s", input.Pod.Name, input.Pod.Namespace), fmt.Sprintf("[prioritize] unable to get used PVC for Pod %s", input.Pod.Name)) http.Error(w, "bad request", http.StatusBadRequest) return } for _, pvc := range pvcs { - s.log.Trace(fmt.Sprintf("[prioritize] used PVC: %s", pvc.Name)) + s.log.Trace(fmt.Sprintf("[prioritize] Pod %s/%s uses PVC: %s", input.Pod.Namespace, input.Pod.Name, pvc.Name)) } scs, err := getStorageClassesUsedByPVCs(s.ctx, s.client, pvcs) if err != nil { - s.log.Error(err, "[prioritize] unable to get StorageClasses from the PVC") + s.log.Error(err, fmt.Sprintf("[prioritize] unable to get StorageClasses from the PVC for Pod %s/%s", input.Pod.Namespace, input.Pod.Name)) http.Error(w, "bad request", http.StatusBadRequest) return } for _, sc := range scs { - s.log.Trace(fmt.Sprintf("[prioritize] used StorageClasses: %s", sc.Name)) + s.log.Trace(fmt.Sprintf("[prioritize] Pod %s/%s uses Storage Class: %s", input.Pod.Namespace, input.Pod.Name, sc.Name)) + } + + managedPVCs := filterNotManagedPVC(s.log, pvcs, scs) + for _, pvc := range managedPVCs { + s.log.Trace(fmt.Sprintf("[prioritize] filtered managed PVC %s/%s", pvc.Namespace, pvc.Name)) } - s.log.Debug("[prioritize] starts to extract pvcRequests size") - pvcRequests, err := extractRequestedSize(s.ctx, s.client, s.log, pvcs, scs) + s.log.Debug(fmt.Sprintf("[prioritize] starts to extract pvcRequests size for Pod %s/%s", input.Pod.Namespace, input.Pod.Name)) + pvcRequests, err := extractRequestedSize(s.ctx, s.client, s.log, managedPVCs, scs) if err != nil { - s.log.Error(err, fmt.Sprintf("[filter] unable to extract request size for a pod %s", input.Pod.Name)) + s.log.Error(err, fmt.Sprintf("[prioritize] unable to extract request size for Pod %s/%s", input.Pod.Namespace, input.Pod.Name)) http.Error(w, "bad request", http.StatusBadRequest) } - s.log.Debug("[filter] successfully extracted the pvcRequests size") + s.log.Debug(fmt.Sprintf("[prioritize] successfully extracted the pvcRequests size for Pod %s/%s", input.Pod.Namespace, input.Pod.Name)) - s.log.Debug("[prioritize] starts to score the nodes") - result, err := scoreNodes(s.ctx, s.client, s.log, input.Nodes, pvcs, scs, pvcRequests, s.defaultDivisor) + s.log.Debug(fmt.Sprintf("[prioritize] starts to score the nodes for Pod %s/%s", input.Pod.Namespace, input.Pod.Name)) + result, err := scoreNodes(s.log, s.cache, input.Nodes, managedPVCs, scs, pvcRequests, s.defaultDivisor) if err != nil { - s.log.Error(err, "[prioritize] unable to score nodes") - http.Error(w, "Bad Request.", http.StatusBadRequest) + s.log.Error(err, fmt.Sprintf("[prioritize] unable to score nodes for Pod %s/%s", input.Pod.Namespace, input.Pod.Name)) + http.Error(w, "bad request", http.StatusBadRequest) return } - s.log.Debug("[prioritize] successfully scored the nodes") + s.log.Debug(fmt.Sprintf("[prioritize] successfully scored the nodes for Pod %s/%s", input.Pod.Namespace, input.Pod.Name)) w.Header().Set("content-type", "application/json") err = json.NewEncoder(w).Encode(result) if err != nil { - s.log.Error(err, "[prioritize] unable to encode a response") + s.log.Error(err, fmt.Sprintf("[prioritize] unable to encode a response for a Pod %s/%s", input.Pod.Namespace, input.Pod.Name)) http.Error(w, "internal error", http.StatusInternalServerError) } s.log.Debug("[prioritize] ends serving") } func scoreNodes( - ctx context.Context, - cl client.Client, log logger.Logger, + schedulerCache *cache.Cache, nodes *corev1.NodeList, - pvcs map[string]corev1.PersistentVolumeClaim, - scs map[string]v1.StorageClass, + pvcs map[string]*corev1.PersistentVolumeClaim, + scs map[string]*v1.StorageClass, pvcRequests map[string]PVCRequest, divisor float64, ) ([]HostPriority, error) { - lvgs, err := getLVMVolumeGroups(ctx, cl) - if err != nil { - return nil, err - } - - scLVGs, err := getSortedLVGsFromStorageClasses(scs) + lvgs := schedulerCache.GetAllLVG() + scLVGs, err := GetSortedLVGsFromStorageClasses(scs) if err != nil { return nil, err } - usedLVGs := removeUnusedLVGs(lvgs, scLVGs) + usedLVGs := RemoveUnusedLVGs(lvgs, scLVGs) for lvgName := range usedLVGs { log.Trace(fmt.Sprintf("[scoreNodes] used LVMVolumeGroup %s", lvgName)) } - nodeLVGs := sortLVGsByNodeName(usedLVGs) + nodeLVGs := SortLVGsByNodeName(usedLVGs) for n, ls := range nodeLVGs { for _, l := range ls { log.Trace(fmt.Sprintf("[scoreNodes] the LVMVolumeGroup %s belongs to node %s", l.Name, n)) @@ -136,7 +142,6 @@ func scoreNodes( lvgsFromNode := nodeLVGs[node.Name] var totalFreeSpaceLeft int64 - // TODO: change pvs to vgs for _, pvc := range pvcs { pvcReq := pvcRequests[pvc.Name] lvgsFromSC := scLVGs[*pvc.Spec.StorageClassName] @@ -152,13 +157,21 @@ func scoreNodes( lvg := lvgs[commonLVG.Name] switch pvcReq.DeviceType { case thick: - freeSpace, err = getVGFreeSpace(&lvg) + freeSpace, err = getVGFreeSpace(lvg) if err != nil { errs <- err return } - log.Trace(fmt.Sprintf("[scoreNodes] LVMVolumeGroup %s free thick space %s", lvg.Name, freeSpace.String())) - + log.Trace(fmt.Sprintf("[scoreNodes] LVMVolumeGroup %s free thick space before PVC reservation: %s", lvg.Name, freeSpace.String())) + reserved, err := schedulerCache.GetLVGReservedSpace(lvg.Name) + if err != nil { + log.Error(err, fmt.Sprintf("[scoreNodes] unable to count reserved space for the LVMVolumeGroup %s", lvg.Name)) + continue + } + log.Trace(fmt.Sprintf("[scoreNodes] LVMVolumeGroup %s PVC Space reservation: %s", lvg.Name, resource.NewQuantity(reserved, resource.BinarySI))) + spaceWithReserved := freeSpace.Value() - reserved + freeSpace = *resource.NewQuantity(spaceWithReserved, resource.BinarySI) + log.Trace(fmt.Sprintf("[scoreNodes] LVMVolumeGroup %s free thick space after PVC reservation: %s", lvg.Name, freeSpace.String())) case thin: thinPool := findMatchedThinPool(lvg.Status.ThinPools, commonLVG.Thin.PoolName) if thinPool == nil { @@ -173,17 +186,19 @@ func scoreNodes( errs <- err return } - } + lvgTotalSize, err := resource.ParseQuantity(lvg.Status.VGSize) if err != nil { errs <- err return } + log.Trace(fmt.Sprintf("[scoreNodes] LVMVolumeGroup %s total size: %s", lvg.Name, lvgTotalSize.String())) totalFreeSpaceLeft += getFreeSpaceLeftPercent(freeSpace.Value(), pvcReq.RequestedSize, lvgTotalSize.Value()) } averageFreeSpace := totalFreeSpaceLeft / int64(len(pvcs)) + log.Trace(fmt.Sprintf("[scoreNodes] average free space left for the node: %s", node.Name)) score := getNodeScore(averageFreeSpace, divisor) log.Trace(fmt.Sprintf("[scoreNodes] node %s has score %d with average free space left (after all PVC bounded), percent %d", node.Name, score, averageFreeSpace)) diff --git a/images/sds-local-volume-scheduler-extender/pkg/scheduler/route.go b/images/sds-local-volume-scheduler-extender/pkg/scheduler/route.go index 4176a928..85488ce3 100644 --- a/images/sds-local-volume-scheduler-extender/pkg/scheduler/route.go +++ b/images/sds-local-volume-scheduler-extender/pkg/scheduler/route.go @@ -19,9 +19,10 @@ package scheduler import ( "context" "fmt" + "k8s.io/apimachinery/pkg/api/resource" "net/http" + "sds-local-volume-scheduler-extender/pkg/cache" "sds-local-volume-scheduler-extender/pkg/logger" - "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -30,9 +31,11 @@ type scheduler struct { log logger.Logger client client.Client ctx context.Context + cache *cache.Cache + requestCount int } -func (s scheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { +func (s *scheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { switch r.URL.Path { case "/filter": s.log.Debug("[ServeHTTP] filter route starts handling the request") @@ -46,18 +49,27 @@ func (s scheduler) ServeHTTP(w http.ResponseWriter, r *http.Request) { s.log.Debug("[ServeHTTP] status route starts handling the request") status(w, r) s.log.Debug("[ServeHTTP] status route ends handling the request") + case "/cache": + s.log.Debug("[ServeHTTP] cache route starts handling the request") + s.getCache(w, r) + s.log.Debug("[ServeHTTP] cache route ends handling the request") + case "/stat": + s.log.Debug("[ServeHTTP] stat route starts handling the request") + s.getCacheStat(w, r) + s.log.Debug("[ServeHTTP] stat route ends handling the request") default: http.Error(w, "not found", http.StatusNotFound) } } // NewHandler return new http.Handler of the scheduler extender -func NewHandler(ctx context.Context, cl client.Client, log logger.Logger, defaultDiv float64) (http.Handler, error) { - return scheduler{ +func NewHandler(ctx context.Context, cl client.Client, log logger.Logger, lvgCache *cache.Cache, defaultDiv float64) (http.Handler, error) { + return &scheduler{ defaultDivisor: defaultDiv, log: log, client: cl, ctx: ctx, + cache: lvgCache, }, nil } @@ -68,3 +80,108 @@ func status(w http.ResponseWriter, r *http.Request) { fmt.Println(fmt.Sprintf("error occurs on status route, err: %s", err.Error())) } } + +func (s *scheduler) getCache(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + + s.cache.PrintTheCacheLog() + + result := make(map[string][]struct { + pvcName string + selectedNode string + status string + size string + }) + + lvgs := s.cache.GetAllLVG() + for _, lvg := range lvgs { + pvcs, err := s.cache.GetAllPVCForLVG(lvg.Name) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + s.log.Error(err, "something bad") + } + + result[lvg.Name] = make([]struct { + pvcName string + selectedNode string + status string + size string + }, 0) + + for _, pvc := range pvcs { + result[lvg.Name] = append(result[lvg.Name], struct { + pvcName string + selectedNode string + status string + size string + }{pvcName: pvc.Name, selectedNode: pvc.Annotations[cache.SelectedNodeAnnotation], status: string(pvc.Status.Phase), size: pvc.Spec.Resources.Requests.Storage().String()}) + } + } + + for lvgName, pvcs := range result { + reserved, err := s.cache.GetLVGReservedSpace(lvgName) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + _, err = w.Write([]byte("unable to write the cache")) + if err != nil { + s.log.Error(err, "error write response") + } + } + + _, err = w.Write([]byte(fmt.Sprintf("LVMVolumeGroup: %s Reserved: %s\n", lvgName, resource.NewQuantity(reserved, resource.BinarySI)))) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + _, err = w.Write([]byte("unable to write the cache")) + if err != nil { + s.log.Error(err, "error write response") + } + } + + for _, pvc := range pvcs { + _, err = w.Write([]byte(fmt.Sprintf("\tPVC: %s\n", pvc.pvcName))) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + s.log.Error(err, "error write response") + } + _, err = w.Write([]byte(fmt.Sprintf("\t\tNodeName: %s\n", pvc.selectedNode))) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + s.log.Error(err, "error write response") + } + _, err = w.Write([]byte(fmt.Sprintf("\t\tStatus: %s\n", pvc.status))) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + s.log.Error(err, "error write response") + } + _, err = w.Write([]byte(fmt.Sprintf("\t\tSize: %s\n", pvc.size))) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + s.log.Error(err, "error write response") + } + } + } +} + +func (s *scheduler) getCacheStat(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + + pvcTotalCount := 0 + lvgs := s.cache.GetAllLVG() + for _, lvg := range lvgs { + pvcs, err := s.cache.GetAllPVCForLVG(lvg.Name) + if err != nil { + s.log.Error(err, "something bad") + } + + pvcTotalCount += len(pvcs) + } + + _, err := w.Write([]byte(fmt.Sprintf("Filter request count: %d , PVC Count from ALL LVG: %d", s.requestCount, pvcTotalCount))) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + _, err = w.Write([]byte("unable to write the cache")) + if err != nil { + s.log.Error(err, "error write response") + } + } +} diff --git a/images/webhooks/src/go.mod b/images/webhooks/src/go.mod index 4e3cc2ab..49eb09c2 100644 --- a/images/webhooks/src/go.mod +++ b/images/webhooks/src/go.mod @@ -1,6 +1,6 @@ module webhooks -go 1.21.1 +go 1.21 require ( github.com/sirupsen/logrus v1.9.3 diff --git a/templates/sds-local-volume-scheduler-extender/rbac-for-us.yaml b/templates/sds-local-volume-scheduler-extender/rbac-for-us.yaml index 316b9857..6687e6d9 100644 --- a/templates/sds-local-volume-scheduler-extender/rbac-for-us.yaml +++ b/templates/sds-local-volume-scheduler-extender/rbac-for-us.yaml @@ -46,7 +46,10 @@ rules: verbs: ["create", "get", "update"] - apiGroups: [ "storage.deckhouse.io" ] resources: [ "lvmvolumegroups" ] - verbs: [ "list" ] + verbs: [ "list", "watch", "get"] + - apiGroups: ["v1"] + resources: ["persistentvolumeclaims"] + verbs: ["list", "watch", "get"] --- apiVersion: rbac.authorization.k8s.io/v1 kind: ClusterRoleBinding