Skip to content

Commit

Permalink
[csi] Add expand volume capability (#7)
Browse files Browse the repository at this point in the history
Signed-off-by: Aleksandr Zimin <alexandr.zimin@flant.com>
Co-authored-by: Viktor Kramarenko <viktor.kramarenko@flant.com>
  • Loading branch information
AleksZimin and ViktorKram committed Feb 27, 2024
1 parent 8474250 commit 86f9a94
Show file tree
Hide file tree
Showing 10 changed files with 394 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"strings"
"time"

"gopkg.in/yaml.v2"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/storage/v1"
errors2 "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -41,6 +40,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
"sigs.k8s.io/yaml"

"sigs.k8s.io/controller-runtime/pkg/manager"
)
Expand Down
2 changes: 1 addition & 1 deletion images/sds-lvm-csi/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ RUN GOOS=linux GOARCH=amd64 go build -o sds-lvm-csi
FROM --platform=linux/amd64 $BASE_ALPINE

ENV DEBIAN_FRONTEND noninteractive
RUN apk add --no-cache lvm2 e2fsprogs xfsprogs blkid
RUN apk add --no-cache lvm2 e2fsprogs e2fsprogs-extra xfsprogs xfsprogs-extra blkid

COPY --from=builder /go/src/cmd/sds-lvm-csi /go/src/cmd/sds-lvm-csi

Expand Down
2 changes: 1 addition & 1 deletion images/sds-lvm-csi/api/v1alpha1/lvm_logical_volume.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type LvmLogicalVolumeSpec struct {
Type string `json:"type"`
Size resource.Quantity `json:"size"`
LvmVolumeGroup string `json:"lvmVolumeGroup"`
Thin *ThinLogicalVolumeSpec `json:"thin,omitempty"`
Thin *ThinLogicalVolumeSpec `json:"thin"`
}

type ThinLogicalVolumeSpec struct {
Expand Down
1 change: 1 addition & 0 deletions images/sds-lvm-csi/driver/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ const (
LLVStatusCreated = "Created"
BindingModeWFFC = "WaitForFirstConsumer"
BindingModeI = "Immediate"
ResizeDelta = "32Mi"
)
174 changes: 124 additions & 50 deletions images/sds-lvm-csi/driver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ package driver

import (
"context"
"errors"
"fmt"
"sds-lvm-csi/api/v1alpha1"
"sds-lvm-csi/pkg/utils"

kerrors "k8s.io/apimachinery/pkg/api/errors"

"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -43,36 +46,30 @@ func (d *Driver) CreateVolume(ctx context.Context, request *csi.CreateVolumeRequ
return nil, status.Error(codes.InvalidArgument, "Volume Capability cannot de empty")
}

var LvmBindingMode string
switch request.GetParameters()[lvmBindingMode] {
case BindingModeWFFC:
LvmBindingMode = BindingModeWFFC
case BindingModeI:
LvmBindingMode = BindingModeI
}
LvmBindingMode := request.GetParameters()[lvmBindingMode]
d.log.Info(fmt.Sprintf("storage class LvmBindingMode: %s", LvmBindingMode))

var LvmType string
switch request.GetParameters()[lvmType] {
case LLVTypeThin:
LvmType = LLVTypeThin
case LLVTypeThick:
LvmType = LLVTypeThick
}
LvmType := request.GetParameters()[lvmType]
d.log.Info(fmt.Sprintf("storage class LvmType: %s", LvmType))

lvmVG := make(map[string]string)
if len(request.GetParameters()[lvmVolumeGroup]) != 0 {
var lvmVolumeGroups LVMVolumeGroups
err := yaml.Unmarshal([]byte(request.GetParameters()[lvmVolumeGroup]), &lvmVolumeGroups)
if err != nil {
d.log.Error(err, "unmarshal yaml lvmVolumeGroup")
}
for _, v := range lvmVolumeGroups {
lvmVG[v.Name] = v.Thin.PoolName
}
if len(request.GetParameters()[lvmVolumeGroup]) == 0 {
err := errors.New("no LVMVolumeGroups specified in a storage class's parameters")
d.log.Error(err, fmt.Sprintf("no LVMVolumeGroups were found for the request: %+v", request))
return nil, status.Errorf(codes.InvalidArgument, err.Error())
}

var lvmVolumeGroups LVMVolumeGroups
err := yaml.Unmarshal([]byte(request.GetParameters()[lvmVolumeGroup]), &lvmVolumeGroups)
if err != nil {
d.log.Error(err, "unmarshal yaml lvmVolumeGroup")
}
d.log.Info(fmt.Sprintf("lvm-volume-groups: %+v", lvmVG))

lvmVG := make(map[string]string, len(lvmVolumeGroups))
for _, v := range lvmVolumeGroups {
lvmVG[v.Name] = v.Thin.PoolName
}

d.log.Info(fmt.Sprintf("StorageClass LVM volume groups names: %+v", lvmVG))

llvName := request.Name
d.log.Info(fmt.Sprintf("llv name: %s ", llvName))
Expand All @@ -81,37 +78,40 @@ func (d *Driver) CreateVolume(ctx context.Context, request *csi.CreateVolumeRequ
d.log.Info(fmt.Sprintf("llv size: %s ", llvSize.String()))

var preferredNode string
if LvmBindingMode == BindingModeI {
prefNode, freeSpace, err := utils.GetNodeMaxVGSize(ctx, d.cl)
switch LvmBindingMode {
case BindingModeI:
d.log.Info(fmt.Sprintf("LvmBindingMode is %s. Start selecting node", BindingModeI))
selectedNodeName, freeSpace, err := utils.GetNodeMaxFreeVGSize(ctx, d.cl)
if err != nil {
d.log.Error(err, "error GetNodeMaxVGSize")
}
preferredNode = prefNode
d.log.Info(fmt.Sprintf("prefered node: %s, free space %s ", prefNode, freeSpace))
}

if LvmBindingMode == BindingModeWFFC {
preferredNode = selectedNodeName
if llvSize.Value() > freeSpace.Value() {
return nil, status.Errorf(codes.Internal, "requested size: %s is greater than free space: %s", llvSize.String(), freeSpace.String())
}
d.log.Info(fmt.Sprintf("Selected node: %s, free space %s ", selectedNodeName, freeSpace.String()))
case BindingModeWFFC:
d.log.Info(fmt.Sprintf("LvmBindingMode is %s. Get preferredNode", BindingModeWFFC))
if len(request.AccessibilityRequirements.Preferred) != 0 {
t := request.AccessibilityRequirements.Preferred[0].Segments
preferredNode = t[topologyKey]
}
}

lvmVolumeGroupName, vgName, err := utils.GetVGName(ctx, d.cl, lvmVG, preferredNode, LvmType)
lvmVolumeGroupName, vgName, err := utils.GetLVMVolumeGroupParams(ctx, d.cl, *d.log, lvmVG, preferredNode, LvmType)
if err != nil {
d.log.Error(err, "error GetVGName")
return nil, status.Errorf(codes.Internal, err.Error())
}

d.log.Info(fmt.Sprintf("LvmVolumeGroup: %s", lvmVolumeGroupName))
d.log.Info(fmt.Sprintf("VGName: %s", vgName))
d.log.Info(fmt.Sprintf("prefered node: %s", preferredNode))

d.log.Info("------------ CreateLVMLogicalVolume ------------")
llvThin := &v1alpha1.ThinLogicalVolumeSpec{}
if LvmType == LLVTypeThick {
llvThin = nil
}
var llvThin *v1alpha1.ThinLogicalVolumeSpec
if LvmType == LLVTypeThin {
llvThin = &v1alpha1.ThinLogicalVolumeSpec{}
llvThin.PoolName = lvmVG[lvmVolumeGroupName]
}

Expand All @@ -124,36 +124,43 @@ func (d *Driver) CreateVolume(ctx context.Context, request *csi.CreateVolumeRequ

d.log.Info(fmt.Sprintf("LvmLogicalVolumeSpec : %+v", spec))

err, llv := utils.CreateLVMLogicalVolume(ctx, d.cl, llvName, spec)
_, err = utils.CreateLVMLogicalVolume(ctx, d.cl, llvName, spec)
if err != nil {
d.log.Error(err, "error CreateLVMLogicalVolume")
// todo if llv exist?
//return nil, err
if kerrors.IsAlreadyExists(err) {
d.log.Info(fmt.Sprintf("LVMLogicalVolume %s already exists", llvName))
} else {
d.log.Error(err, "error CreateLVMLogicalVolume")
return nil, err
}
}
d.log.Info("------------ CreateLVMLogicalVolume ------------")

d.log.Info("start wait CreateLVMLogicalVolume ")
attemptCounter, err := utils.WaitForStatusUpdate(ctx, d.cl, request.Name, llv.Namespace)
resizeDelta, err := resource.ParseQuantity(ResizeDelta)
if err != nil {
d.log.Error(err, "error ParseQuantity for ResizeDelta")
return nil, err
}
attemptCounter, err := utils.WaitForStatusUpdate(ctx, d.cl, *d.log, request.Name, "", *llvSize, resizeDelta)
if err != nil {
d.log.Error(err, "error WaitForStatusUpdate")
return nil, err
}
d.log.Info(fmt.Sprintf("stop wait CreateLVMLogicalVolume, attempt сounter = %d ", attemptCounter))
d.log.Info(fmt.Sprintf("stop waiting CreateLVMLogicalVolume, attempt сounter = %d ", attemptCounter))

//Create context
volCtx := make(map[string]string)
volumeCtx := make(map[string]string, len(request.Parameters))
for k, v := range request.Parameters {
volCtx[k] = v
volumeCtx[k] = v
}

volCtx[subPath] = request.Name
volCtx[VGNameKey] = vgName
volumeCtx[subPath] = request.Name
volumeCtx[VGNameKey] = vgName

return &csi.CreateVolumeResponse{
Volume: &csi.Volume{
CapacityBytes: request.CapacityRange.GetRequiredBytes(),
VolumeId: request.Name,
VolumeContext: volCtx,
VolumeContext: volumeCtx,
ContentSource: request.VolumeContentSource,
AccessibleTopology: []*csi.Topology{
{Segments: map[string]string{
Expand Down Expand Up @@ -256,9 +263,76 @@ func (d *Driver) ListSnapshots(ctx context.Context, request *csi.ListSnapshotsRe

func (d *Driver) ControllerExpandVolume(ctx context.Context, request *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) {
d.log.Info(" call method ControllerExpandVolume")

d.log.Info("========== ControllerExpandVolume ============")
d.log.Info(request.String())
d.log.Info("========== ControllerExpandVolume ============")

volumeID := request.GetVolumeId()
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "Volume id cannot be empty")
}

llv, err := utils.GetLVMLogicalVolume(ctx, d.cl, volumeID, "")
if err != nil {
return nil, status.Errorf(codes.Internal, "error getting LVMLogicalVolume: %s", err.Error())
}

resizeDelta, err := resource.ParseQuantity(ResizeDelta)
if err != nil {
d.log.Error(err, "error ParseQuantity for ResizeDelta")
return nil, err
}
d.log.Trace(fmt.Sprintf("resizeDelta: %s", resizeDelta.String()))
requestCapacity := resource.NewQuantity(request.CapacityRange.GetRequiredBytes(), resource.BinarySI)
d.log.Trace(fmt.Sprintf("requestCapacity: %s", requestCapacity.String()))

nodeExpansionRequired := true
if request.GetVolumeCapability().GetBlock() != nil {
nodeExpansionRequired = false
}
d.log.Info(fmt.Sprintf("NodeExpansionRequired: %t", nodeExpansionRequired))

if llv.Status.ActualSize.Value() > requestCapacity.Value()+resizeDelta.Value() || utils.AreSizesEqualWithinDelta(*requestCapacity, llv.Status.ActualSize, resizeDelta) {
d.log.Warning(fmt.Sprintf("requested size is less than or equal to the actual size of the volume include delta %s , no need to resize LVMLogicalVolume %s, requested size: %s, actual size: %s, return NodeExpansionRequired: %t and CapacityBytes: %d", resizeDelta.String(), volumeID, requestCapacity.String(), llv.Status.ActualSize.String(), nodeExpansionRequired, llv.Status.ActualSize.Value()))
return &csi.ControllerExpandVolumeResponse{
CapacityBytes: llv.Status.ActualSize.Value(),
NodeExpansionRequired: nodeExpansionRequired,
}, nil
}

lvg, err := utils.GetLVMVolumeGroup(ctx, d.cl, llv.Spec.LvmVolumeGroup, llv.Namespace)
if err != nil {
return nil, status.Errorf(codes.Internal, "error getting LVMVolumeGroup: %v", err)
}

lvgFreeSpace, err := utils.GetLVMVolumeGroupFreeSpace(*lvg)
if err != nil {
return nil, status.Errorf(codes.Internal, "error getting LVMVolumeGroupCapacity: %v", err)
}

if lvgFreeSpace.Value() < (requestCapacity.Value() - llv.Status.ActualSize.Value()) {
return nil, status.Errorf(codes.Internal, "requested size: %s is greater than the capacity of the LVMVolumeGroup: %s", requestCapacity.String(), lvgFreeSpace.String())
}

d.log.Info("start resize LVMLogicalVolume")
d.log.Info(fmt.Sprintf("requested size: %s, actual size: %s", requestCapacity.String(), llv.Status.ActualSize.String()))
llv.Spec.Size = *requestCapacity
err = utils.UpdateLVMLogicalVolume(ctx, d.cl, llv)
if err != nil {
return nil, status.Errorf(codes.Internal, "error updating LVMLogicalVolume: %v", err)
}

attemptCounter, err := utils.WaitForStatusUpdate(ctx, d.cl, *d.log, llv.Name, llv.Namespace, *requestCapacity, resizeDelta)
if err != nil {
d.log.Error(err, "error WaitForStatusUpdate")
return nil, err
}
d.log.Info(fmt.Sprintf("finish resize LVMLogicalVolume, attempt сounter = %d ", attemptCounter))

return &csi.ControllerExpandVolumeResponse{
CapacityBytes: request.CapacityRange.RequiredBytes,
NodeExpansionRequired: true,
NodeExpansionRequired: nodeExpansionRequired,
}, nil
}

Expand Down
19 changes: 10 additions & 9 deletions images/sds-lvm-csi/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,6 @@ import (
"context"
"errors"
"fmt"
"github.com/container-storage-interface/spec/lib/go/csi"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"net"
"net/http"
"net/url"
Expand All @@ -31,9 +28,13 @@ import (
"path/filepath"
"sds-lvm-csi/pkg/logger"
"sds-lvm-csi/pkg/utils"
"sigs.k8s.io/controller-runtime/pkg/client"
"sync"
"time"

"github.com/container-storage-interface/spec/lib/go/csi"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
"sigs.k8s.io/controller-runtime/pkg/client"
)

const (
Expand Down Expand Up @@ -63,10 +64,10 @@ type Driver struct {
httpSrv http.Server
log *logger.Logger

readyMu sync.Mutex // protects ready
ready bool
cl client.Client
mounter utils.Store
readyMu sync.Mutex // protects ready
ready bool
cl client.Client
storeManager utils.NodeStoreManager
}

// NewDriver returns a CSI plugin that contains the necessary gRPC
Expand All @@ -88,7 +89,7 @@ func NewDriver(ep, driverName, address string, nodeName *string, log *logger.Log
log: log,
waitActionTimeout: defaultWaitActionTimeout,
cl: cl,
mounter: *st,
storeManager: st,
}, nil
}

Expand Down
Loading

0 comments on commit 86f9a94

Please sign in to comment.