Skip to content

Commit

Permalink
add stage/unstage
Browse files Browse the repository at this point in the history
Signed-off-by: Aleksandr Zimin <alexandr.zimin@flant.com>
  • Loading branch information
AleksZimin committed Apr 15, 2024
1 parent 97d21cb commit a16a43d
Show file tree
Hide file tree
Showing 6 changed files with 504 additions and 57 deletions.
3 changes: 3 additions & 0 deletions images/sds-local-volume-csi/driver/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"os"
"path"
"path/filepath"
"sds-local-volume-csi/internal"
"sds-local-volume-csi/pkg/logger"
"sds-local-volume-csi/pkg/utils"
"sync"
Expand Down Expand Up @@ -68,6 +69,7 @@ type Driver struct {
ready bool
cl client.Client
storeManager utils.NodeStoreManager
inFlight *internal.InFlight
}

// NewDriver returns a CSI plugin that contains the necessary gRPC
Expand All @@ -90,6 +92,7 @@ func NewDriver(ep, driverName, address string, nodeName *string, log *logger.Log
waitActionTimeout: defaultWaitActionTimeout,
cl: cl,
storeManager: st,
inFlight: internal.NewInFlight(),
}, nil
}

Expand Down
309 changes: 269 additions & 40 deletions images/sds-local-volume-csi/driver/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,19 +20,160 @@ import (
"context"
"fmt"
"sds-local-volume-csi/internal"
"strings"

"github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

const (
// default file system type to be used when it is not provided
defaultFsType = internal.FSTypeExt4

// VolumeOperationAlreadyExists is message fmt returned to CO when there is another in-flight call on the given volumeID
VolumeOperationAlreadyExists = "An operation with the given volume=%q is already in progress"
)

var (
// nodeCaps represents the capability of node service.
nodeCaps = []csi.NodeServiceCapability_RPC_Type{
csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME,
csi.NodeServiceCapability_RPC_EXPAND_VOLUME,
csi.NodeServiceCapability_RPC_GET_VOLUME_STATS,
}

ValidFSTypes = map[string]struct{}{
internal.FSTypeExt4: {},
}
)

func (d *Driver) NodeStageVolume(ctx context.Context, request *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) {
d.log.Info("method NodeStageVolume")
d.log.Debug("[NodeStageVolume] method called with request: %v", request)

volumeID := request.GetVolumeId()
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "[NodeStageVolume] Volume id cannot be empty")
}

target := request.GetStagingTargetPath()
if len(target) == 0 {
return nil, status.Error(codes.InvalidArgument, "[NodeStageVolume] Staging target path cannot be empty")
}

volCap := request.GetVolumeCapability()
if volCap == nil {
return nil, status.Error(codes.InvalidArgument, "[NodeStageVolume] Volume capability cannot be empty")
}

vgName, ok := request.GetVolumeContext()[internal.VGNameKey]
if !ok {
return nil, status.Error(codes.InvalidArgument, "[NodeStageVolume] Volume group name cannot be empty")
}

if volCap.GetBlock() != nil {
d.log.Info("[NodeStageVolume] Block volume detected. Skipping staging.")
return &csi.NodeStageVolumeResponse{}, nil
}

isBlock := false

mountVolume := volCap.GetMount()
if mountVolume == nil {
return nil, status.Error(codes.InvalidArgument, "[NodeStageVolume] Volume capability mount cannot be empty")
}

fsType := mountVolume.GetFsType()
if fsType == "" {
fsType = defaultFsType
}

_, ok = ValidFSTypes[strings.ToLower(fsType)]
if !ok {
return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("[NodeStageVolume] Invalid fsType: %s. Supported values: %v", fsType, ValidFSTypes))
}

mountOptions := collectMountOptions(fsType, mountVolume.GetMountFlags())

d.log.Debug("[NodeStageVolume] Volume %s operation started", volumeID)
ok = d.inFlight.Insert(volumeID)
if !ok {
return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID)
}
defer func() {
d.log.Debug("[NodeStageVolume] Volume %s operation completed", volumeID)
d.inFlight.Delete(volumeID)
}()

devPath := fmt.Sprintf("/dev/%s/%s", vgName, request.VolumeId)
d.log.Debug("[NodeStageVolume] Checking if device exists: %s", devPath)
exists, err := d.storeManager.PathExists(devPath)
if err != nil {
return nil, status.Errorf(codes.Internal, "[NodeStageVolume] Error checking if device exists: %v", err)
}
if !exists {
return nil, status.Errorf(codes.NotFound, "[NodeStageVolume] Device %s not found", devPath)
}

lvmType := request.GetVolumeContext()[internal.LvmTypeKey]
lvmThinPoolName := request.GetVolumeContext()[internal.ThinPoolNameKey]

d.log.Trace(fmt.Sprintf("mountOptions = %s", mountOptions))
d.log.Trace(fmt.Sprintf("lvmType = %s", lvmType))
d.log.Trace(fmt.Sprintf("lvmThinPoolName = %s", lvmThinPoolName))
d.log.Trace(fmt.Sprintf("fsType = %s", fsType))

err = d.storeManager.Mount(devPath, target, isBlock, fsType, false, mountOptions, lvmType, lvmThinPoolName)
if err != nil {
d.log.Error(err, "[NodeStageVolume] Error mounting volume")
return nil, status.Errorf(codes.Internal, "[NodeStageVolume] Error format device %q and mounting volume at %q: %v", devPath, target, err)
}

needResize, err := d.storeManager.NeedResize(devPath, target)
if err != nil {
d.log.Error(err, "[NodeStageVolume] Error checking if volume needs resize")
return nil, status.Errorf(codes.Internal, "[NodeStageVolume] Error checking if the volume %q (%q) mounted at %q needs resizing: %v", volumeID, devPath, target, err)
}

if needResize {
d.log.Info("[NodeStageVolume] Resizing volume %q (%q) mounted at %q", volumeID, devPath, target)
err = d.storeManager.ResizeFS(target)
if err != nil {
return nil, status.Errorf(codes.Internal, "[NodeStageVolume] Error resizing volume %q (%q) mounted at %q: %v", volumeID, devPath, target, err)
}
}

d.log.Info("[NodeStageVolume] Volume %q (%q) successfully staged at %s. FsType: %s", volumeID, devPath, target, fsType)

return &csi.NodeStageVolumeResponse{}, nil
}

func (d *Driver) NodeUnstageVolume(ctx context.Context, request *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) {
d.log.Info("method NodeUnstageVolume")
d.log.Debug("[NodeUnstageVolume] method called with request: %v", request)
volumeID := request.GetVolumeId()
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "[NodeUnstageVolume] Volume id cannot be empty")
}

target := request.GetStagingTargetPath()
if len(target) == 0 {
return nil, status.Error(codes.InvalidArgument, "[NodeUnstageVolume] Staging target path cannot be empty")
}

d.log.Debug("[NodeUnstageVolume] Volume %s operation started", volumeID)
ok := d.inFlight.Insert(volumeID)
if !ok {
return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID)
}
defer func() {
d.log.Debug("[NodeUnstageVolume] Volume %s operation completed", volumeID)
d.inFlight.Delete(volumeID)
}()
err := d.storeManager.Unstage(target)
if err != nil {
return nil, status.Errorf(codes.Internal, "[NodeUnstageVolume] Error unmounting volume %q mounted at %q: %v", volumeID, target, err)
}

return &csi.NodeUnstageVolumeResponse{}, nil
}

Expand All @@ -42,55 +183,115 @@ func (d *Driver) NodePublishVolume(ctx context.Context, request *csi.NodePublish
d.log.Trace(request.String())
d.log.Trace("------------- NodePublishVolume --------------")

dev := fmt.Sprintf("/dev/%s/%s", request.GetVolumeContext()[internal.VGNameKey], request.VolumeId)
lvmType := request.GetVolumeContext()[internal.LvmTypeKey]
lvmThinPoolName := request.GetVolumeContext()[internal.ThinPoolNameKey]
d.log.Info(fmt.Sprintf("dev = %s", dev))
volumeID := request.GetVolumeId()
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "[NodePublishVolume] Volume id cannot be empty")
}

var mountOptions []string
if request.GetReadonly() {
mountOptions = append(mountOptions, "ro")
source := request.GetStagingTargetPath()
if len(source) == 0 {
return nil, status.Error(codes.InvalidArgument, "[NodePublishVolume] Staging target path cannot be empty")
}
var fsType string
var IsBlock bool

if request.GetVolumeCapability().GetBlock() != nil {
mountOptions = []string{"bind"}
IsBlock = true
target := request.GetTargetPath()
if len(target) == 0 {
return nil, status.Error(codes.InvalidArgument, "[NodePublishVolume] Target path cannot be empty")
}

if mnt := request.GetVolumeCapability().GetMount(); mnt != nil {
fsType = request.VolumeCapability.GetMount().FsType
mountOptions = append(mountOptions, mnt.GetMountFlags()...)
volCap := request.GetVolumeCapability()
if volCap == nil {
return nil, status.Error(codes.InvalidArgument, "[NodePublishVolume] Volume capability cannot be empty")
}

d.log.Info(fmt.Sprintf("mountOptions = %s", mountOptions))
d.log.Info(fmt.Sprintf("lvmType = %s", lvmType))
d.log.Info(fmt.Sprintf("lvmThinPoolName = %s", lvmThinPoolName))
d.log.Info(fmt.Sprintf("fsType = %s", fsType))
d.log.Info(fmt.Sprintf("IsBlock = %t", IsBlock))
// vgName, ok := request.GetVolumeContext()[internal.VGNameKey]
// if !ok {
// return nil, status.Error(codes.InvalidArgument, "[NodePublishVolume] Volume group name cannot be empty")
// }

err := d.storeManager.Mount(dev, request.GetTargetPath(), IsBlock, fsType, false, mountOptions, lvmType, lvmThinPoolName)
if err != nil {
d.log.Error(err, "d.mounter.Mount :")
return nil, err
mountOptions := []string{"bind"}
if request.GetReadonly() {
mountOptions = append(mountOptions, "ro")
}

d.log.Info("Success method NodePublishVolume")
d.log.Debug("[NodePublishVolume] Volume %s operation started", volumeID)
ok := d.inFlight.Insert(volumeID)
if !ok {
return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID)
}
defer func() {
d.log.Debug("[NodePublishVolume] Volume %s operation completed", volumeID)
d.inFlight.Delete(volumeID)
}()

switch volCap.GetAccessType().(type) {
case *csi.VolumeCapability_Block:
d.log.Trace("NodePublishVolume[] Block volume detected.")
err := d.storeManager.Mount(source, target, true, "", request.GetReadonly(), mountOptions, "", "")
if err != nil {
return nil, status.Errorf(codes.Internal, "[NodePublishVolume] Error bind mounting block volume %q. Source: %q. Target: %q: %v", volumeID, source, target, err)
}
case *csi.VolumeCapability_Mount:
mountVolume := volCap.GetMount()
if mountVolume == nil {
return nil, status.Error(codes.InvalidArgument, "[NodePublishVolume] Volume capability mount cannot be empty")
}
fsType := mountVolume.GetFsType()
if fsType == "" {
fsType = defaultFsType
}

_, ok = ValidFSTypes[strings.ToLower(fsType)]
if !ok {
return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("[NodeStageVolume] Invalid fsType: %s. Supported values: %v", fsType, ValidFSTypes))
}

for _, mnt := range mountVolume.GetMountFlags() {
if !hasMountOption(mountOptions, mnt) {
mountOptions = append(mountOptions, mnt)
}
}

err := d.storeManager.Mount(source, target, false, fsType, request.GetReadonly(), mountOptions, "", "")
if err != nil {
return nil, status.Errorf(codes.Internal, "[NodePublishVolume] Error bind mounting volume %q. Source: %q. Target: %q: %v", volumeID, source, target, err)
}

}

return &csi.NodePublishVolumeResponse{}, nil
}

func (d *Driver) NodeUnpublishVolume(ctx context.Context, request *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) {
d.log.Info("method NodeUnpublishVolume")
d.log.Debug("[NodeUnpublishVolume] method called with request: %v", request)
d.log.Trace("------------- NodeUnpublishVolume --------------")
d.log.Trace(request.String())
d.log.Trace("------------- NodeUnpublishVolume --------------")

err := d.storeManager.Unmount(request.GetTargetPath())
volumeID := request.GetVolumeId()
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "[NodeUnpublishVolume] Volume id cannot be empty")
}

target := request.GetTargetPath()
if len(target) == 0 {
return nil, status.Error(codes.InvalidArgument, "[NodeUnpublishVolume] Staging target path cannot be empty")
}

d.log.Debug("[NodeUnpublishVolume] Volume %s operation started", volumeID)
ok := d.inFlight.Insert(volumeID)
if !ok {
return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID)
}
defer func() {
d.log.Debug("[NodeUnpublishVolume] Volume %s operation completed", volumeID)
d.inFlight.Delete(volumeID)
}()

err := d.storeManager.Unpublish(target)
if err != nil {
d.log.Error(err, "NodeUnpublishVolume err ")
return nil, status.Errorf(codes.Internal, "[NodeUnpublishVolume] Error unmounting volume %q mounted at %q: %v", volumeID, target, err)
}

return &csi.NodeUnpublishVolumeResponse{}, nil
}

Expand Down Expand Up @@ -125,16 +326,11 @@ func (d *Driver) NodeExpandVolume(ctx context.Context, request *csi.NodeExpandVo
}

func (d *Driver) NodeGetCapabilities(ctx context.Context, request *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) {
d.log.Info("method NodeGetCapabilities")
d.log.Debug("[NodeGetCapabilities] method called with request: %v", request)

capabilities := []csi.NodeServiceCapability_RPC_Type{
csi.NodeServiceCapability_RPC_GET_VOLUME_STATS,
csi.NodeServiceCapability_RPC_EXPAND_VOLUME,
}

csiCaps := make([]*csi.NodeServiceCapability, len(capabilities))
for i, capability := range capabilities {
csiCaps[i] = &csi.NodeServiceCapability{
caps := make([]*csi.NodeServiceCapability, len(nodeCaps))
for i, capability := range nodeCaps {
caps[i] = &csi.NodeServiceCapability{
Type: &csi.NodeServiceCapability_Rpc{
Rpc: &csi.NodeServiceCapability_RPC{
Type: capability,
Expand All @@ -144,7 +340,7 @@ func (d *Driver) NodeGetCapabilities(ctx context.Context, request *csi.NodeGetCa
}

return &csi.NodeGetCapabilitiesResponse{
Capabilities: csiCaps,
Capabilities: caps,
}, nil
}

Expand All @@ -162,3 +358,36 @@ func (d *Driver) NodeGetInfo(ctx context.Context, request *csi.NodeGetInfoReques
},
}, nil
}

// hasMountOption returns a boolean indicating whether the given
// slice already contains a mount option. This is used to prevent
// passing duplicate option to the mount command.
func hasMountOption(options []string, opt string) bool {
for _, o := range options {
if o == opt {
return true
}
}
return false
}

// collectMountOptions returns array of mount options from
// VolumeCapability_MountVolume and special mount options for
// given filesystem.
func collectMountOptions(fsType string, mntFlags []string) []string {
var options []string
for _, opt := range mntFlags {
if !hasMountOption(options, opt) {
options = append(options, opt)
}
}

// // By default, xfs does not allow mounting of two volumes with the same filesystem uuid.
// // Force ignore this uuid to be able to mount volume + its clone / restored snapshot on the same node.
// if fsType == FSTypeXfs {
// if !hasMountOption(options, "nouuid") {
// options = append(options, "nouuid")
// }
// }
return options
}
Loading

0 comments on commit a16a43d

Please sign in to comment.