Skip to content

Commit

Permalink
refactoring
Browse files Browse the repository at this point in the history
Signed-off-by: Viktor Kramarenko <viktor.kramarenko@flant.com>
  • Loading branch information
ViktorKram committed Feb 20, 2024
1 parent 898c4ff commit c17a566
Show file tree
Hide file tree
Showing 5 changed files with 69 additions and 99 deletions.
87 changes: 37 additions & 50 deletions images/sds-lvm-csi/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package driver

import (
"context"
"errors"
"fmt"
"sds-lvm-csi/api/v1alpha1"
"sds-lvm-csi/pkg/utils"
Expand Down Expand Up @@ -45,35 +46,29 @@ func (d *Driver) CreateVolume(ctx context.Context, request *csi.CreateVolumeRequ
return nil, status.Error(codes.InvalidArgument, "Volume Capability cannot de empty")
}

var LvmBindingMode string
switch request.GetParameters()[lvmBindingMode] {
case BindingModeWFFC:
LvmBindingMode = BindingModeWFFC
case BindingModeI:
LvmBindingMode = BindingModeI
}
LvmBindingMode := request.GetParameters()[lvmBindingMode]
d.log.Info(fmt.Sprintf("storage class LvmBindingMode: %s", LvmBindingMode))

var LvmType string
switch request.GetParameters()[lvmType] {
case LLVTypeThin:
LvmType = LLVTypeThin
case LLVTypeThick:
LvmType = LLVTypeThick
}
LvmType := request.GetParameters()[lvmType]
d.log.Info(fmt.Sprintf("storage class LvmType: %s", LvmType))

lvmVG := make(map[string]string)
if len(request.GetParameters()[lvmVolumeGroup]) != 0 {
var lvmVolumeGroups LVMVolumeGroups
err := yaml.Unmarshal([]byte(request.GetParameters()[lvmVolumeGroup]), &lvmVolumeGroups)
if err != nil {
d.log.Error(err, "unmarshal yaml lvmVolumeGroup")
}
for _, v := range lvmVolumeGroups {
lvmVG[v.Name] = v.Thin.PoolName
}
if len(request.GetParameters()[lvmVolumeGroup]) == 0 {
err := errors.New("no LVMVolumeGroups specified in a storage class's parameters")
d.log.Error(err, fmt.Sprintf("no LVMVolumeGroups were found for the request: %+v", request))
return nil, status.Errorf(codes.InvalidArgument, err.Error())
}

var lvmVolumeGroups LVMVolumeGroups
err := yaml.Unmarshal([]byte(request.GetParameters()[lvmVolumeGroup]), &lvmVolumeGroups)
if err != nil {
d.log.Error(err, "unmarshal yaml lvmVolumeGroup")
}

lvmVG := make(map[string]string, len(lvmVolumeGroups))
for _, v := range lvmVolumeGroups {
lvmVG[v.Name] = v.Thin.PoolName
}

d.log.Info(fmt.Sprintf("StorageClass LVM volume groups names: %+v", lvmVG))

llvName := request.Name
Expand All @@ -83,20 +78,19 @@ func (d *Driver) CreateVolume(ctx context.Context, request *csi.CreateVolumeRequ
d.log.Info(fmt.Sprintf("llv size: %s ", llvSize.String()))

var preferredNode string
if LvmBindingMode == BindingModeI {
switch LvmBindingMode {
case BindingModeI:
d.log.Info("LvmBindingMode is Immediate. Start selecting node")
prefNode, freeSpace, err := utils.GetNodeMaxFreeVGSize(ctx, d.cl)
selectedNodeName, freeSpace, err := utils.GetNodeMaxFreeVGSize(ctx, d.cl)
if err != nil {
d.log.Error(err, "error GetNodeMaxVGSize")
}
preferredNode = prefNode
preferredNode = selectedNodeName
if llvSize.Value() > freeSpace.Value() {
return nil, status.Errorf(codes.Internal, "requested size: %s is greater than free space: %s", llvSize.String(), freeSpace.String())
}
d.log.Info(fmt.Sprintf("Selected node: %s, free space %s ", prefNode, freeSpace.String()))
}

if LvmBindingMode == BindingModeWFFC {
d.log.Info(fmt.Sprintf("Selected node: %s, free space %s ", selectedNodeName, freeSpace.String()))
case BindingModeWFFC:
if len(request.AccessibilityRequirements.Preferred) != 0 {
t := request.AccessibilityRequirements.Preferred[0].Segments
preferredNode = t[topologyKey]
Expand All @@ -107,18 +101,15 @@ func (d *Driver) CreateVolume(ctx context.Context, request *csi.CreateVolumeRequ
lvmVolumeGroupName, vgName, err := utils.GetLVMVolumeGroupParams(ctx, d.cl, *d.log, lvmVG, preferredNode, LvmType)
if err != nil {
d.log.Error(err, "error GetVGName")
// return nil, err
return nil, status.Errorf(codes.Internal, err.Error())
}

d.log.Info(fmt.Sprintf("LvmVolumeGroup: %s", lvmVolumeGroupName))
d.log.Info(fmt.Sprintf("VGName: %s", vgName))
d.log.Info(fmt.Sprintf("prefered node: %s", preferredNode))

d.log.Info("------------ CreateLVMLogicalVolume ------------")
llvThin := &v1alpha1.ThinLogicalVolumeSpec{}
if LvmType == LLVTypeThick {
llvThin = nil
}
var llvThin *v1alpha1.ThinLogicalVolumeSpec
if LvmType == LLVTypeThin {
llvThin.PoolName = lvmVG[lvmVolumeGroupName]
}
Expand Down Expand Up @@ -154,22 +145,21 @@ func (d *Driver) CreateVolume(ctx context.Context, request *csi.CreateVolumeRequ
d.log.Error(err, "error WaitForStatusUpdate")
return nil, err
}
d.log.Info(fmt.Sprintf("stop wait CreateLVMLogicalVolume, attempt сounter = %d ", attemptCounter))
d.log.Info(fmt.Sprintf("stop waiting CreateLVMLogicalVolume, attempt сounter = %d ", attemptCounter))

//Create context
volCtx := make(map[string]string)
volumeCtx := make(map[string]string, len(request.Parameters))
for k, v := range request.Parameters {
volCtx[k] = v
volumeCtx[k] = v
}

volCtx[subPath] = request.Name
volCtx[VGNameKey] = vgName
volumeCtx[subPath] = request.Name
volumeCtx[VGNameKey] = vgName

return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
CapacityBytes: request.CapacityRange.GetRequiredBytes(),
VolumeId: request.Name,
VolumeContext: volCtx,
VolumeContext: volumeCtx,
ContentSource: request.VolumeContentSource,
AccessibleTopology: []*csi.Topology{
{Segments: map[string]string{
Expand Down Expand Up @@ -284,10 +274,7 @@ func (d *Driver) ControllerExpandVolume(ctx context.Context, request *csi.Contro

llv, err := utils.GetLVMLogicalVolume(ctx, d.cl, volumeID, "")
if err != nil {
if kerrors.IsNotFound(err) {
return nil, status.Errorf(codes.NotFound, "LVMLogicalVolume with id: %s not found", volumeID)
}
return nil, status.Errorf(codes.Internal, "error getting LVMLogicalVolume: %v", err)
return nil, status.Errorf(codes.Internal, "error getting LVMLogicalVolume: %s", err.Error())
}

resizeDelta, err := resource.ParseQuantity(ResizeDelta)
Expand Down Expand Up @@ -318,13 +305,13 @@ func (d *Driver) ControllerExpandVolume(ctx context.Context, request *csi.Contro
return nil, status.Errorf(codes.Internal, "error getting LVMVolumeGroup: %v", err)
}

lvgCapacity, err := utils.GetLVMVolumeGroupCapacity(*lvg)
lvgFreeSpace, err := utils.GetLVMVolumeGroupFreeSpace(*lvg)
if err != nil {
return nil, status.Errorf(codes.Internal, "error getting LVMVolumeGroupCapacity: %v", err)
}

if lvgCapacity.Value() < (requestCapacity.Value() - llv.Status.ActualSize.Value()) {
return nil, status.Errorf(codes.Internal, "requested size: %s is greater than the capacity of the LVMVolumeGroup: %s", requestCapacity.String(), lvgCapacity.String())
if lvgFreeSpace.Value() < (requestCapacity.Value() - llv.Status.ActualSize.Value()) {
return nil, status.Errorf(codes.Internal, "requested size: %s is greater than the capacity of the LVMVolumeGroup: %s", requestCapacity.String(), lvgFreeSpace.String())
}

d.log.Info("start resize LVMLogicalVolume")
Expand Down
10 changes: 5 additions & 5 deletions images/sds-lvm-csi/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,10 +64,10 @@ type Driver struct {
httpSrv http.Server
log *logger.Logger

readyMu sync.Mutex // protects ready
ready bool
cl client.Client
nodeStorage utils.Store
readyMu sync.Mutex // protects ready
ready bool
cl client.Client
storeManager utils.NodeStoreManager
}

// NewDriver returns a CSI plugin that contains the necessary gRPC
Expand All @@ -89,7 +89,7 @@ func NewDriver(ep, driverName, address string, nodeName *string, log *logger.Log
log: log,
waitActionTimeout: defaultWaitActionTimeout,
cl: cl,
nodeStorage: *st,
storeManager: st,
}, nil
}

Expand Down
9 changes: 4 additions & 5 deletions images/sds-lvm-csi/driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (d *Driver) NodePublishVolume(ctx context.Context, request *csi.NodePublish
mountOptions = append(mountOptions, mnt.GetMountFlags()...)
}

err := d.nodeStorage.Mount(dev, request.GetTargetPath(), IsBlock, fsType, false, mountOptions)
err := d.storeManager.Mount(dev, request.GetTargetPath(), IsBlock, fsType, false, mountOptions)
if err != nil {
d.log.Error(err, "d.mounter.Mount :")
return nil, err
Expand All @@ -75,7 +75,7 @@ func (d *Driver) NodeUnpublishVolume(ctx context.Context, request *csi.NodeUnpub
fmt.Println(request.String())
fmt.Println("------------- NodeUnpublishVolume --------------")

err := d.nodeStorage.Unmount(request.GetTargetPath())
err := d.storeManager.Unmount(request.GetTargetPath())
if err != nil {
d.log.Error(err, "NodeUnpublishVolume err ")
}
Expand Down Expand Up @@ -103,10 +103,9 @@ func (d *Driver) NodeExpandVolume(ctx context.Context, request *csi.NodeExpandVo
return nil, status.Error(codes.InvalidArgument, "Volume Path cannot be empty")
}

err := d.nodeStorage.ResizeFS(volumePath)
// err := d.mounter.Expander.NodeExpand(volumeID, volumePath)
err := d.storeManager.ResizeFS(volumePath)
if err != nil {
d.log.Error(err, "d.mounter.ResizeFS :")
d.log.Error(err, "d.mounter.ResizeFS:")
return nil, status.Error(codes.Internal, err.Error())
}

Expand Down
48 changes: 16 additions & 32 deletions images/sds-lvm-csi/pkg/utils/func.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"errors"
"fmt"
kerrors "k8s.io/apimachinery/pkg/api/errors"
"math"
"sds-lvm-csi/api/v1alpha1"
"sds-lvm-csi/pkg/logger"
Expand All @@ -38,17 +39,6 @@ const (
KubernetesApiRequestTimeout = 1
)

func NodeWithMaxSize(vgNameAndSize map[string]int64) (vgName string, maxSize int64) {

for k, n := range vgNameAndSize {
if n > maxSize {
maxSize = n
vgName = k
}
}
return vgName, maxSize
}

func CreateLVMLogicalVolume(ctx context.Context, kc client.Client, name string, LvmLogicalVolumeSpec v1alpha1.LvmLogicalVolumeSpec) (*v1alpha1.LvmLogicalVolume, error) {
var err error
llv := &v1alpha1.LvmLogicalVolume{
Expand All @@ -68,6 +58,11 @@ func CreateLVMLogicalVolume(ctx context.Context, kc client.Client, name string,
if err == nil {
return llv, nil
}

if kerrors.IsAlreadyExists(err) {
return nil, err
}

time.Sleep(KubernetesApiRequestTimeout)
}

Expand Down Expand Up @@ -179,31 +174,20 @@ func GetNodeMaxFreeVGSize(ctx context.Context, kc client.Client) (nodeName strin
return "", freeSpace, fmt.Errorf("after %d attempts of getting LvmVolumeGroups, last error: %w", KubernetesApiRequestLimit, err)
}

nodesVGFreeSize := make(map[string]int64)
vgNameNodeName := make(map[string]string)
var maxFreeSpace int64
for _, lvg := range listLvgs.Items {
// obj := &v1alpha1.LvmVolumeGroup{}
// err = kc.Get(ctx, client.ObjectKey{
// Name: lvg.Name,
// Namespace: lvg.Namespace,
// }, obj)
// if err != nil {
// return "", "", errors.New(fmt.Sprintf("get lvg name: %s", lvg.Name))
// }

vgFreeSize, err := GetLVMVolumeGroupCapacity(*&lvg)
free, err := GetLVMVolumeGroupFreeSpace(lvg)
if err != nil {
return "", freeSpace, err
return
}
nodesVGFreeSize[lvg.Name] = vgFreeSize.Value()
vgNameNodeName[lvg.Name] = lvg.Status.Nodes[0].Name
}

VGNameWihMaxFreeSpace, _ := NodeWithMaxSize(nodesVGFreeSize)
freeSpace = *resource.NewQuantity(nodesVGFreeSize[VGNameWihMaxFreeSpace], resource.BinarySI)
nodeName = vgNameNodeName[VGNameWihMaxFreeSpace]
if free.Value() > maxFreeSpace {
nodeName = lvg.Status.Nodes[0].Name
maxFreeSpace = free.Value()
}
}

return nodeName, freeSpace, nil
return nodeName, *resource.NewQuantity(maxFreeSpace, resource.BinarySI), nil
}

func GetLVMVolumeGroupParams(ctx context.Context, kc client.Client, log logger.Logger, lvmVG map[string]string, nodeName, LvmType string) (lvgName, vgName string, err error) {
Expand Down Expand Up @@ -273,7 +257,7 @@ func GetLVMVolumeGroup(ctx context.Context, kc client.Client, lvgName, namespace
return nil, fmt.Errorf("after %d attempts of getting LvmVolumeGroup %s in namespace %s, last error: %w", KubernetesApiRequestLimit, lvgName, namespace, err)
}

func GetLVMVolumeGroupCapacity(lvg v1alpha1.LvmVolumeGroup) (vgCapacity resource.Quantity, err error) {
func GetLVMVolumeGroupFreeSpace(lvg v1alpha1.LvmVolumeGroup) (vgCapacity resource.Quantity, err error) {
vgSize, err := resource.ParseQuantity(lvg.Status.VGSize)
if err != nil {
return vgCapacity, fmt.Errorf("parse size vgSize (%s): %w", lvg.Status.VGSize, err)
Expand Down
14 changes: 7 additions & 7 deletions images/sds-lvm-csi/pkg/utils/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,18 @@ import (
utilexec "k8s.io/utils/exec"
)

type Store struct {
Log *logger.Logger
NodeStorage mu.SafeFormatAndMount
}

type NewNodeStore interface {
Mount(source, target, fsType string, readonly bool, mntOpts []string) error
type NodeStoreManager interface {
Mount(source, target string, isBlock bool, fsType string, readonly bool, mntOpts []string) error
Unmount(target string) error
IsNotMountPoint(target string) (bool, error)
ResizeFS(target string) error
}

type Store struct {
Log *logger.Logger
NodeStorage mu.SafeFormatAndMount
}

func NewStore(logger *logger.Logger) *Store {
return &Store{
Log: logger,
Expand Down

0 comments on commit c17a566

Please sign in to comment.