Skip to content

Commit

Permalink
add expand node
Browse files Browse the repository at this point in the history
Signed-off-by: Aleksandr Zimin <alexandr.zimin@flant.com>
  • Loading branch information
AleksZimin committed Feb 18, 2024
1 parent fd25127 commit 59351ad
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 47 deletions.
2 changes: 1 addition & 1 deletion images/sds-lvm-csi/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ RUN GOOS=linux GOARCH=amd64 go build -o sds-lvm-csi
FROM --platform=linux/amd64 $BASE_ALPINE

ENV DEBIAN_FRONTEND noninteractive
RUN apk add --no-cache lvm2 e2fsprogs xfsprogs blkid
RUN apk add --no-cache lvm2 e2fsprogs e2fsprogs-extra xfsprogs xfsprogs-extra blkid

COPY --from=builder /go/src/cmd/sds-lvm-csi /go/src/cmd/sds-lvm-csi

Expand Down
42 changes: 23 additions & 19 deletions images/sds-lvm-csi/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func (d *Driver) CreateVolume(ctx context.Context, request *csi.CreateVolumeRequ
lvmVG[v.Name] = v.Thin.PoolName
}
}
d.log.Info(fmt.Sprintf("lvm-volume-groups: %+v", lvmVG))
d.log.Info(fmt.Sprintf("StorageClass LVM volume groups names: %+v", lvmVG))

llvName := request.Name
d.log.Info(fmt.Sprintf("llv name: %s ", llvName))
Expand All @@ -84,6 +84,7 @@ func (d *Driver) CreateVolume(ctx context.Context, request *csi.CreateVolumeRequ

var preferredNode string
if LvmBindingMode == BindingModeI {
d.log.Info("LvmBindingMode is Immediate. Start selecting node")
prefNode, freeSpace, err := utils.GetNodeMaxFreeVGSize(ctx, d.cl)
if err != nil {
d.log.Error(err, "error GetNodeMaxVGSize")
Expand All @@ -92,7 +93,7 @@ func (d *Driver) CreateVolume(ctx context.Context, request *csi.CreateVolumeRequ
if llvSize.Value() > freeSpace.Value() {
return nil, status.Errorf(codes.Internal, "requested size: %s is greater than free space: %s", llvSize.String(), freeSpace.String())
}
d.log.Info(fmt.Sprintf("prefered node: %s, free space %s ", prefNode, freeSpace.String()))
d.log.Info(fmt.Sprintf("Selected node: %s, free space %s ", prefNode, freeSpace.String()))
}

if LvmBindingMode == BindingModeWFFC {
Expand All @@ -103,8 +104,6 @@ func (d *Driver) CreateVolume(ctx context.Context, request *csi.CreateVolumeRequ
}

d.log.Info(fmt.Sprintf("prefered node: %s", preferredNode))
d.log.Info(fmt.Sprintf("lvm-volume-groups: %+v", lvmVG))
d.log.Info(fmt.Sprintf("lvm-type: %s", LvmType))
lvmVolumeGroupName, vgName, err := utils.GetLVMVolumeGroupParams(ctx, d.cl, *d.log, lvmVG, preferredNode, LvmType)
if err != nil {
d.log.Error(err, "error GetVGName")
Expand Down Expand Up @@ -274,21 +273,11 @@ func (d *Driver) ListSnapshots(ctx context.Context, request *csi.ListSnapshotsRe
func (d *Driver) ControllerExpandVolume(ctx context.Context, request *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
d.log.Info(" call method ControllerExpandVolume")

d.log.Info("========== ExpandVolume ============")
d.log.Info("========== ControllerExpandVolume ============")
d.log.Info(request.String())
d.log.Info("========== ExpandVolume ============")
d.log.Info("========== ControllerExpandVolume ============")

volumeID := request.GetVolumeId()
resizeDelta, err := resource.ParseQuantity(ResizeDelta)
d.log.Trace("resizeDelta: %s", resizeDelta.String())
requestCapacity := resource.NewQuantity(request.CapacityRange.GetRequiredBytes(), resource.BinarySI)
d.log.Trace("requestCapacity: %s", requestCapacity.String())

if err != nil {
d.log.Error(err, "error ParseQuantity for ResizeDelta")
return nil, err
}

if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume id cannot be empty")
}
Expand All @@ -301,11 +290,26 @@ func (d *Driver) ControllerExpandVolume(ctx context.Context, request *csi.Contro
return nil, status.Errorf(codes.Internal, "error getting LVMLogicalVolume: %v", err)
}

resizeDelta, err := resource.ParseQuantity(ResizeDelta)
if err != nil {
d.log.Error(err, "error ParseQuantity for ResizeDelta")
return nil, err
}
d.log.Trace("resizeDelta: %s", resizeDelta.String())
requestCapacity := resource.NewQuantity(request.CapacityRange.GetRequiredBytes(), resource.BinarySI)
d.log.Trace("requestCapacity: %s", requestCapacity.String())

nodeExpansionRequired := true
if request.GetVolumeCapability().GetBlock() != nil {
nodeExpansionRequired = false
}
d.log.Info(fmt.Sprintf("NodeExpansionRequired: %t", nodeExpansionRequired))

if llv.Status.ActualSize.Value() > requestCapacity.Value()+resizeDelta.Value() || utils.AreSizesEqualWithinDelta(*requestCapacity, llv.Status.ActualSize, resizeDelta) {
d.log.Warning("requested size is less than or equal to the actual size of the volume include delta %s , no need to resize LVMLogicalVolume %s, requested size: %s, actual size: %s, return NodeExpansionRequired: true and CapacityBytes: %d", resizeDelta.String(), volumeID, requestCapacity.String(), llv.Status.ActualSize.String(), llv.Status.ActualSize.Value())
d.log.Warning("requested size is less than or equal to the actual size of the volume include delta %s , no need to resize LVMLogicalVolume %s, requested size: %s, actual size: %s, return NodeExpansionRequired: %t and CapacityBytes: %d", resizeDelta.String(), volumeID, requestCapacity.String(), llv.Status.ActualSize.String(), nodeExpansionRequired, llv.Status.ActualSize.Value())
return &csi.ControllerExpandVolumeResponse{
CapacityBytes: llv.Status.ActualSize.Value(),
NodeExpansionRequired: true,
NodeExpansionRequired: nodeExpansionRequired,
}, nil
}

Expand Down Expand Up @@ -340,7 +344,7 @@ func (d *Driver) ControllerExpandVolume(ctx context.Context, request *csi.Contro

return &csi.ControllerExpandVolumeResponse{
CapacityBytes: request.CapacityRange.RequiredBytes,
NodeExpansionRequired: true,
NodeExpansionRequired: nodeExpansionRequired,
}, nil
}

Expand Down
19 changes: 10 additions & 9 deletions images/sds-lvm-csi/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ import (
"context"
"errors"
"fmt"
"github.com/container-storage-interface/spec/lib/go/csi"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"net"
"net/http"
"net/url"
Expand All @@ -31,9 +28,13 @@ import (
"path/filepath"
"sds-lvm-csi/pkg/logger"
"sds-lvm-csi/pkg/utils"
"sigs.k8s.io/controller-runtime/pkg/client"
"sync"
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
Expand Down Expand Up @@ -63,10 +64,10 @@ type Driver struct {
httpSrv http.Server
log *logger.Logger

readyMu sync.Mutex // protects ready
ready bool
cl client.Client
mounter utils.Store
readyMu sync.Mutex // protects ready
ready bool
cl client.Client
nodeStorage utils.Store
}

// NewDriver returns a CSI plugin that contains the necessary gRPC
Expand All @@ -88,7 +89,7 @@ func NewDriver(ep, driverName, address string, nodeName *string, log *logger.Log
log: log,
waitActionTimeout: defaultWaitActionTimeout,
cl: cl,
mounter: *st,
nodeStorage: *st,
}, nil
}

Expand Down
52 changes: 47 additions & 5 deletions images/sds-lvm-csi/driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"fmt"

"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

func (d *Driver) NodeStageVolume(ctx context.Context, request *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
Expand Down Expand Up @@ -58,7 +60,7 @@ func (d *Driver) NodePublishVolume(ctx context.Context, request *csi.NodePublish
mountOptions = append(mountOptions, mnt.GetMountFlags()...)
}

err := d.mounter.Mount(dev, request.GetTargetPath(), IsBlock, fsType, false, mountOptions)
err := d.nodeStorage.Mount(dev, request.GetTargetPath(), IsBlock, fsType, false, mountOptions)
if err != nil {
d.log.Error(err, "d.mounter.Mount :")
return nil, err
Expand All @@ -73,7 +75,7 @@ func (d *Driver) NodeUnpublishVolume(ctx context.Context, request *csi.NodeUnpub
fmt.Println(request.String())
fmt.Println("------------- NodeUnpublishVolume --------------")

err := d.mounter.Unmount(request.GetTargetPath())
err := d.nodeStorage.Unmount(request.GetTargetPath())
if err != nil {
d.log.Error(err, "NodeUnpublishVolume err ")
}
Expand All @@ -86,13 +88,53 @@ func (d *Driver) NodeGetVolumeStats(ctx context.Context, request *csi.NodeGetVol
}

func (d *Driver) NodeExpandVolume(ctx context.Context, request *csi.NodeExpandVolumeRequest) (*csi.NodeExpandVolumeResponse, error) {
d.log.Info("method NodeExpandVolume")
d.log.Info("Call method NodeExpandVolume")

d.log.Info("========== NodeExpandVolume ============")
d.log.Info(request.String())
d.log.Info("========== NodeExpandVolume ============")

volumeID := request.GetVolumeId()
volumePath := request.GetVolumePath()
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume id cannot be empty")
}
if len(volumePath) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume Path cannot be empty")
}

err := d.nodeStorage.ResizeFS(volumePath)
// err := d.mounter.Expander.NodeExpand(volumeID, volumePath)
if err != nil {
d.log.Error(err, "d.mounter.ResizeFS :")
return nil, status.Error(codes.Internal, err.Error())
}

return &csi.NodeExpandVolumeResponse{}, nil
}

func (d *Driver) NodeGetCapabilities(ctx context.Context, request *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
d.log.Info("method NodeGetCapabilities")
return &csi.NodeGetCapabilitiesResponse{}, nil
d.log.Info("Call method NodeGetCapabilities")

capabilities := []csi.NodeServiceCapability_RPC_Type{
csi.NodeServiceCapability_RPC_GET_VOLUME_STATS,
csi.NodeServiceCapability_RPC_EXPAND_VOLUME,
}

csiCaps := make([]*csi.NodeServiceCapability, len(capabilities))
for i, capability := range capabilities {
csiCaps[i] = &csi.NodeServiceCapability{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: capability,
},
},
}
}

return &csi.NodeGetCapabilitiesResponse{
Capabilities: csiCaps,
}, nil
}

func (d *Driver) NodeGetInfo(ctx context.Context, request *csi.NodeGetInfoRequest) (*csi.NodeGetInfoResponse, error) {
Expand Down
48 changes: 35 additions & 13 deletions images/sds-lvm-csi/pkg/utils/volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,28 +18,30 @@ package utils

import (
"fmt"
mu "k8s.io/mount-utils"
utilexec "k8s.io/utils/exec"
"os"
"sds-lvm-csi/pkg/logger"
"time"

mu "k8s.io/mount-utils"
utilexec "k8s.io/utils/exec"
)

type Store struct {
Log *logger.Logger
Mounter mu.SafeFormatAndMount
Log *logger.Logger
NodeStorage mu.SafeFormatAndMount
}

type NewMounter interface {
type NewNodeStore interface {
Mount(source, target, fsType string, readonly bool, mntOpts []string) error
Unmount(target string) error
IsNotMountPoint(target string) (bool, error)
ResizeFS(target string) error
}

func NewStore(logger *logger.Logger) *Store {
return &Store{
Log: logger,
Mounter: mu.SafeFormatAndMount{
NodeStorage: mu.SafeFormatAndMount{
Interface: mu.New("/bin/mount"),
Exec: utilexec.New(),
},
Expand Down Expand Up @@ -79,9 +81,9 @@ func (s *Store) Mount(source, target string, isBlock bool, fsType string, readon
}
s.Log.Info("-----------------== stop MkdirAll ==-----------------")

needsMount, err := s.Mounter.IsMountPoint(target)
needsMount, err := s.NodeStorage.IsMountPoint(target)
if err != nil {
return fmt.Errorf("[s.Mounter.IsMountPoint] unable to determine mount status of %s %v", target, err)
return fmt.Errorf("[s.NodeStorage.IsMountPoint] unable to determine mount status of %s %v", target, err)
}

s.Log.Info("≈≈≈≈≈≈≈≈≈≈≈≈≈≈≈≈≈≈≈≈ needsMount ≈≈≈≈≈≈≈≈≈≈≈≈≈≈≈≈≈≈≈≈≈≈≈≈")
Expand All @@ -94,7 +96,7 @@ func (s *Store) Mount(source, target string, isBlock bool, fsType string, readon
//}

s.Log.Info("-----------------== start FormatAndMount ==---------------")
err = s.Mounter.FormatAndMount(source, target, fsType, mntOpts)
err = s.NodeStorage.FormatAndMount(source, target, fsType, mntOpts)
if err != nil {
return fmt.Errorf("failed to FormatAndMount : %w", err)
}
Expand All @@ -115,7 +117,7 @@ func (s *Store) Mount(source, target string, isBlock bool, fsType string, readon
}
s.Log.Info("-----------------== stop Create File ==---------------")
s.Log.Info("-----------------== start Mount ==---------------")
err = s.Mounter.Mount(source, target, fsType, mntOpts)
err = s.NodeStorage.Mount(source, target, fsType, mntOpts)
if err != nil {
s.Log.Error(err, "block mount error :")
return err
Expand All @@ -131,17 +133,17 @@ func (s *Store) Mount(source, target string, isBlock bool, fsType string, readon
func (s *Store) Unmount(target string) error {
s.Log.Info(fmt.Sprintf("[unmount volume] target=%s", target))

err := s.Mounter.Unmount(target)
err := s.NodeStorage.Unmount(target)
if err != nil {
s.Log.Error(err, "[s.Mounter.Unmount]: ")
s.Log.Error(err, "[s.NodeStorage.Unmount]: ")
return err
}
time.Sleep(time.Second * 1)
return nil
}

func (s *Store) IsNotMountPoint(target string) (bool, error) {
notMounted, err := s.Mounter.IsMountPoint(target)
notMounted, err := s.NodeStorage.IsMountPoint(target)
if err != nil {
if os.IsNotExist(err) {
return true, nil
Expand All @@ -150,3 +152,23 @@ func (s *Store) IsNotMountPoint(target string) (bool, error) {
}
return notMounted, nil
}

func (s *Store) ResizeFS(mountTarget string) error {
s.Log.Info(" ----== Resize FS ==---- ")
devicePath, _, err := mu.GetDeviceNameFromMount(s.NodeStorage.Interface, mountTarget)
if err != nil {
s.Log.Error(err, "Failed to find the device mounted at mountTarget", "mountTarget", mountTarget)
return fmt.Errorf("failed to find the device mounted at %s: %w", mountTarget, err)
}

s.Log.Info("Found device for resizing", "devicePath", devicePath, "mountTarget", mountTarget)

_, err = mu.NewResizeFs(s.NodeStorage.Exec).Resize(devicePath, mountTarget)
if err != nil {
s.Log.Error(err, "Failed to resize filesystem", "devicePath", devicePath, "mountTarget", mountTarget)
return fmt.Errorf("failed to resize filesystem %s on device %s: %w", mountTarget, devicePath, err)
}

s.Log.Info("Filesystem resized successfully", "devicePath", devicePath)
return nil
}

0 comments on commit 59351ad

Please sign in to comment.