From a16a43d3d5a66670f70ede226ad03b65af64ec90 Mon Sep 17 00:00:00 2001 From: Aleksandr Zimin Date: Mon, 15 Apr 2024 10:06:27 +0300 Subject: [PATCH] add stage/unstage Signed-off-by: Aleksandr Zimin --- images/sds-local-volume-csi/driver/driver.go | 3 + images/sds-local-volume-csi/driver/node.go | 309 +++++++++++++++--- images/sds-local-volume-csi/internal/const.go | 2 + .../sds-local-volume-csi/internal/inflight.go | 75 +++++ .../internal/inflight_test.go | 112 +++++++ .../sds-local-volume-csi/pkg/utils/volume.go | 60 +++- 6 files changed, 504 insertions(+), 57 deletions(-) create mode 100644 images/sds-local-volume-csi/internal/inflight.go create mode 100644 images/sds-local-volume-csi/internal/inflight_test.go diff --git a/images/sds-local-volume-csi/driver/driver.go b/images/sds-local-volume-csi/driver/driver.go index 8a4a001f..6ab85c2d 100644 --- a/images/sds-local-volume-csi/driver/driver.go +++ b/images/sds-local-volume-csi/driver/driver.go @@ -26,6 +26,7 @@ import ( "os" "path" "path/filepath" + "sds-local-volume-csi/internal" "sds-local-volume-csi/pkg/logger" "sds-local-volume-csi/pkg/utils" "sync" @@ -68,6 +69,7 @@ type Driver struct { ready bool cl client.Client storeManager utils.NodeStoreManager + inFlight *internal.InFlight } // NewDriver returns a CSI plugin that contains the necessary gRPC @@ -90,6 +92,7 @@ func NewDriver(ep, driverName, address string, nodeName *string, log *logger.Log waitActionTimeout: defaultWaitActionTimeout, cl: cl, storeManager: st, + inFlight: internal.NewInFlight(), }, nil } diff --git a/images/sds-local-volume-csi/driver/node.go b/images/sds-local-volume-csi/driver/node.go index cb08c4b2..70cba44d 100644 --- a/images/sds-local-volume-csi/driver/node.go +++ b/images/sds-local-volume-csi/driver/node.go @@ -20,19 +20,160 @@ import ( "context" "fmt" "sds-local-volume-csi/internal" + "strings" "github.com/container-storage-interface/spec/lib/go/csi" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) +const ( + // default file system type to be used when it is not provided + defaultFsType = internal.FSTypeExt4 + + // VolumeOperationAlreadyExists is message fmt returned to CO when there is another in-flight call on the given volumeID + VolumeOperationAlreadyExists = "An operation with the given volume=%q is already in progress" +) + +var ( + // nodeCaps represents the capability of node service. + nodeCaps = []csi.NodeServiceCapability_RPC_Type{ + csi.NodeServiceCapability_RPC_STAGE_UNSTAGE_VOLUME, + csi.NodeServiceCapability_RPC_EXPAND_VOLUME, + csi.NodeServiceCapability_RPC_GET_VOLUME_STATS, + } + + ValidFSTypes = map[string]struct{}{ + internal.FSTypeExt4: {}, + } +) + func (d *Driver) NodeStageVolume(ctx context.Context, request *csi.NodeStageVolumeRequest) (*csi.NodeStageVolumeResponse, error) { - d.log.Info("method NodeStageVolume") + d.log.Debug("[NodeStageVolume] method called with request: %v", request) + + volumeID := request.GetVolumeId() + if len(volumeID) == 0 { + return nil, status.Error(codes.InvalidArgument, "[NodeStageVolume] Volume id cannot be empty") + } + + target := request.GetStagingTargetPath() + if len(target) == 0 { + return nil, status.Error(codes.InvalidArgument, "[NodeStageVolume] Staging target path cannot be empty") + } + + volCap := request.GetVolumeCapability() + if volCap == nil { + return nil, status.Error(codes.InvalidArgument, "[NodeStageVolume] Volume capability cannot be empty") + } + + vgName, ok := request.GetVolumeContext()[internal.VGNameKey] + if !ok { + return nil, status.Error(codes.InvalidArgument, "[NodeStageVolume] Volume group name cannot be empty") + } + + if volCap.GetBlock() != nil { + d.log.Info("[NodeStageVolume] Block volume detected. Skipping staging.") + return &csi.NodeStageVolumeResponse{}, nil + } + + isBlock := false + + mountVolume := volCap.GetMount() + if mountVolume == nil { + return nil, status.Error(codes.InvalidArgument, "[NodeStageVolume] Volume capability mount cannot be empty") + } + + fsType := mountVolume.GetFsType() + if fsType == "" { + fsType = defaultFsType + } + + _, ok = ValidFSTypes[strings.ToLower(fsType)] + if !ok { + return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("[NodeStageVolume] Invalid fsType: %s. Supported values: %v", fsType, ValidFSTypes)) + } + + mountOptions := collectMountOptions(fsType, mountVolume.GetMountFlags()) + + d.log.Debug("[NodeStageVolume] Volume %s operation started", volumeID) + ok = d.inFlight.Insert(volumeID) + if !ok { + return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID) + } + defer func() { + d.log.Debug("[NodeStageVolume] Volume %s operation completed", volumeID) + d.inFlight.Delete(volumeID) + }() + + devPath := fmt.Sprintf("/dev/%s/%s", vgName, request.VolumeId) + d.log.Debug("[NodeStageVolume] Checking if device exists: %s", devPath) + exists, err := d.storeManager.PathExists(devPath) + if err != nil { + return nil, status.Errorf(codes.Internal, "[NodeStageVolume] Error checking if device exists: %v", err) + } + if !exists { + return nil, status.Errorf(codes.NotFound, "[NodeStageVolume] Device %s not found", devPath) + } + + lvmType := request.GetVolumeContext()[internal.LvmTypeKey] + lvmThinPoolName := request.GetVolumeContext()[internal.ThinPoolNameKey] + + d.log.Trace(fmt.Sprintf("mountOptions = %s", mountOptions)) + d.log.Trace(fmt.Sprintf("lvmType = %s", lvmType)) + d.log.Trace(fmt.Sprintf("lvmThinPoolName = %s", lvmThinPoolName)) + d.log.Trace(fmt.Sprintf("fsType = %s", fsType)) + + err = d.storeManager.Mount(devPath, target, isBlock, fsType, false, mountOptions, lvmType, lvmThinPoolName) + if err != nil { + d.log.Error(err, "[NodeStageVolume] Error mounting volume") + return nil, status.Errorf(codes.Internal, "[NodeStageVolume] Error format device %q and mounting volume at %q: %v", devPath, target, err) + } + + needResize, err := d.storeManager.NeedResize(devPath, target) + if err != nil { + d.log.Error(err, "[NodeStageVolume] Error checking if volume needs resize") + return nil, status.Errorf(codes.Internal, "[NodeStageVolume] Error checking if the volume %q (%q) mounted at %q needs resizing: %v", volumeID, devPath, target, err) + } + + if needResize { + d.log.Info("[NodeStageVolume] Resizing volume %q (%q) mounted at %q", volumeID, devPath, target) + err = d.storeManager.ResizeFS(target) + if err != nil { + return nil, status.Errorf(codes.Internal, "[NodeStageVolume] Error resizing volume %q (%q) mounted at %q: %v", volumeID, devPath, target, err) + } + } + + d.log.Info("[NodeStageVolume] Volume %q (%q) successfully staged at %s. FsType: %s", volumeID, devPath, target, fsType) + return &csi.NodeStageVolumeResponse{}, nil } func (d *Driver) NodeUnstageVolume(ctx context.Context, request *csi.NodeUnstageVolumeRequest) (*csi.NodeUnstageVolumeResponse, error) { - d.log.Info("method NodeUnstageVolume") + d.log.Debug("[NodeUnstageVolume] method called with request: %v", request) + volumeID := request.GetVolumeId() + if len(volumeID) == 0 { + return nil, status.Error(codes.InvalidArgument, "[NodeUnstageVolume] Volume id cannot be empty") + } + + target := request.GetStagingTargetPath() + if len(target) == 0 { + return nil, status.Error(codes.InvalidArgument, "[NodeUnstageVolume] Staging target path cannot be empty") + } + + d.log.Debug("[NodeUnstageVolume] Volume %s operation started", volumeID) + ok := d.inFlight.Insert(volumeID) + if !ok { + return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID) + } + defer func() { + d.log.Debug("[NodeUnstageVolume] Volume %s operation completed", volumeID) + d.inFlight.Delete(volumeID) + }() + err := d.storeManager.Unstage(target) + if err != nil { + return nil, status.Errorf(codes.Internal, "[NodeUnstageVolume] Error unmounting volume %q mounted at %q: %v", volumeID, target, err) + } + return &csi.NodeUnstageVolumeResponse{}, nil } @@ -42,55 +183,115 @@ func (d *Driver) NodePublishVolume(ctx context.Context, request *csi.NodePublish d.log.Trace(request.String()) d.log.Trace("------------- NodePublishVolume --------------") - dev := fmt.Sprintf("/dev/%s/%s", request.GetVolumeContext()[internal.VGNameKey], request.VolumeId) - lvmType := request.GetVolumeContext()[internal.LvmTypeKey] - lvmThinPoolName := request.GetVolumeContext()[internal.ThinPoolNameKey] - d.log.Info(fmt.Sprintf("dev = %s", dev)) + volumeID := request.GetVolumeId() + if len(volumeID) == 0 { + return nil, status.Error(codes.InvalidArgument, "[NodePublishVolume] Volume id cannot be empty") + } - var mountOptions []string - if request.GetReadonly() { - mountOptions = append(mountOptions, "ro") + source := request.GetStagingTargetPath() + if len(source) == 0 { + return nil, status.Error(codes.InvalidArgument, "[NodePublishVolume] Staging target path cannot be empty") } - var fsType string - var IsBlock bool - if request.GetVolumeCapability().GetBlock() != nil { - mountOptions = []string{"bind"} - IsBlock = true + target := request.GetTargetPath() + if len(target) == 0 { + return nil, status.Error(codes.InvalidArgument, "[NodePublishVolume] Target path cannot be empty") } - if mnt := request.GetVolumeCapability().GetMount(); mnt != nil { - fsType = request.VolumeCapability.GetMount().FsType - mountOptions = append(mountOptions, mnt.GetMountFlags()...) + volCap := request.GetVolumeCapability() + if volCap == nil { + return nil, status.Error(codes.InvalidArgument, "[NodePublishVolume] Volume capability cannot be empty") } - d.log.Info(fmt.Sprintf("mountOptions = %s", mountOptions)) - d.log.Info(fmt.Sprintf("lvmType = %s", lvmType)) - d.log.Info(fmt.Sprintf("lvmThinPoolName = %s", lvmThinPoolName)) - d.log.Info(fmt.Sprintf("fsType = %s", fsType)) - d.log.Info(fmt.Sprintf("IsBlock = %t", IsBlock)) + // vgName, ok := request.GetVolumeContext()[internal.VGNameKey] + // if !ok { + // return nil, status.Error(codes.InvalidArgument, "[NodePublishVolume] Volume group name cannot be empty") + // } - err := d.storeManager.Mount(dev, request.GetTargetPath(), IsBlock, fsType, false, mountOptions, lvmType, lvmThinPoolName) - if err != nil { - d.log.Error(err, "d.mounter.Mount :") - return nil, err + mountOptions := []string{"bind"} + if request.GetReadonly() { + mountOptions = append(mountOptions, "ro") } - d.log.Info("Success method NodePublishVolume") + d.log.Debug("[NodePublishVolume] Volume %s operation started", volumeID) + ok := d.inFlight.Insert(volumeID) + if !ok { + return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID) + } + defer func() { + d.log.Debug("[NodePublishVolume] Volume %s operation completed", volumeID) + d.inFlight.Delete(volumeID) + }() + + switch volCap.GetAccessType().(type) { + case *csi.VolumeCapability_Block: + d.log.Trace("NodePublishVolume[] Block volume detected.") + err := d.storeManager.Mount(source, target, true, "", request.GetReadonly(), mountOptions, "", "") + if err != nil { + return nil, status.Errorf(codes.Internal, "[NodePublishVolume] Error bind mounting block volume %q. Source: %q. Target: %q: %v", volumeID, source, target, err) + } + case *csi.VolumeCapability_Mount: + mountVolume := volCap.GetMount() + if mountVolume == nil { + return nil, status.Error(codes.InvalidArgument, "[NodePublishVolume] Volume capability mount cannot be empty") + } + fsType := mountVolume.GetFsType() + if fsType == "" { + fsType = defaultFsType + } + + _, ok = ValidFSTypes[strings.ToLower(fsType)] + if !ok { + return nil, status.Errorf(codes.InvalidArgument, fmt.Sprintf("[NodeStageVolume] Invalid fsType: %s. Supported values: %v", fsType, ValidFSTypes)) + } + + for _, mnt := range mountVolume.GetMountFlags() { + if !hasMountOption(mountOptions, mnt) { + mountOptions = append(mountOptions, mnt) + } + } + + err := d.storeManager.Mount(source, target, false, fsType, request.GetReadonly(), mountOptions, "", "") + if err != nil { + return nil, status.Errorf(codes.Internal, "[NodePublishVolume] Error bind mounting volume %q. Source: %q. Target: %q: %v", volumeID, source, target, err) + } + + } return &csi.NodePublishVolumeResponse{}, nil } func (d *Driver) NodeUnpublishVolume(ctx context.Context, request *csi.NodeUnpublishVolumeRequest) (*csi.NodeUnpublishVolumeResponse, error) { - d.log.Info("method NodeUnpublishVolume") + d.log.Debug("[NodeUnpublishVolume] method called with request: %v", request) d.log.Trace("------------- NodeUnpublishVolume --------------") d.log.Trace(request.String()) d.log.Trace("------------- NodeUnpublishVolume --------------") - err := d.storeManager.Unmount(request.GetTargetPath()) + volumeID := request.GetVolumeId() + if len(volumeID) == 0 { + return nil, status.Error(codes.InvalidArgument, "[NodeUnpublishVolume] Volume id cannot be empty") + } + + target := request.GetTargetPath() + if len(target) == 0 { + return nil, status.Error(codes.InvalidArgument, "[NodeUnpublishVolume] Staging target path cannot be empty") + } + + d.log.Debug("[NodeUnpublishVolume] Volume %s operation started", volumeID) + ok := d.inFlight.Insert(volumeID) + if !ok { + return nil, status.Errorf(codes.Aborted, VolumeOperationAlreadyExists, volumeID) + } + defer func() { + d.log.Debug("[NodeUnpublishVolume] Volume %s operation completed", volumeID) + d.inFlight.Delete(volumeID) + }() + + err := d.storeManager.Unpublish(target) if err != nil { - d.log.Error(err, "NodeUnpublishVolume err ") + return nil, status.Errorf(codes.Internal, "[NodeUnpublishVolume] Error unmounting volume %q mounted at %q: %v", volumeID, target, err) } + return &csi.NodeUnpublishVolumeResponse{}, nil } @@ -125,16 +326,11 @@ func (d *Driver) NodeExpandVolume(ctx context.Context, request *csi.NodeExpandVo } func (d *Driver) NodeGetCapabilities(ctx context.Context, request *csi.NodeGetCapabilitiesRequest) (*csi.NodeGetCapabilitiesResponse, error) { - d.log.Info("method NodeGetCapabilities") + d.log.Debug("[NodeGetCapabilities] method called with request: %v", request) - capabilities := []csi.NodeServiceCapability_RPC_Type{ - csi.NodeServiceCapability_RPC_GET_VOLUME_STATS, - csi.NodeServiceCapability_RPC_EXPAND_VOLUME, - } - - csiCaps := make([]*csi.NodeServiceCapability, len(capabilities)) - for i, capability := range capabilities { - csiCaps[i] = &csi.NodeServiceCapability{ + caps := make([]*csi.NodeServiceCapability, len(nodeCaps)) + for i, capability := range nodeCaps { + caps[i] = &csi.NodeServiceCapability{ Type: &csi.NodeServiceCapability_Rpc{ Rpc: &csi.NodeServiceCapability_RPC{ Type: capability, @@ -144,7 +340,7 @@ func (d *Driver) NodeGetCapabilities(ctx context.Context, request *csi.NodeGetCa } return &csi.NodeGetCapabilitiesResponse{ - Capabilities: csiCaps, + Capabilities: caps, }, nil } @@ -162,3 +358,36 @@ func (d *Driver) NodeGetInfo(ctx context.Context, request *csi.NodeGetInfoReques }, }, nil } + +// hasMountOption returns a boolean indicating whether the given +// slice already contains a mount option. This is used to prevent +// passing duplicate option to the mount command. +func hasMountOption(options []string, opt string) bool { + for _, o := range options { + if o == opt { + return true + } + } + return false +} + +// collectMountOptions returns array of mount options from +// VolumeCapability_MountVolume and special mount options for +// given filesystem. +func collectMountOptions(fsType string, mntFlags []string) []string { + var options []string + for _, opt := range mntFlags { + if !hasMountOption(options, opt) { + options = append(options, opt) + } + } + + // // By default, xfs does not allow mounting of two volumes with the same filesystem uuid. + // // Force ignore this uuid to be able to mount volume + its clone / restored snapshot on the same node. + // if fsType == FSTypeXfs { + // if !hasMountOption(options, "nouuid") { + // options = append(options, "nouuid") + // } + // } + return options +} diff --git a/images/sds-local-volume-csi/internal/const.go b/images/sds-local-volume-csi/internal/const.go index 527c6094..7d617c68 100644 --- a/images/sds-local-volume-csi/internal/const.go +++ b/images/sds-local-volume-csi/internal/const.go @@ -32,4 +32,6 @@ const ( BindingModeWFFC = "WaitForFirstConsumer" BindingModeI = "Immediate" ResizeDelta = "32Mi" + // FSTypeExt4 represents the ext4 filesystem type + FSTypeExt4 = "ext4" ) diff --git a/images/sds-local-volume-csi/internal/inflight.go b/images/sds-local-volume-csi/internal/inflight.go new file mode 100644 index 00000000..c6eda006 --- /dev/null +++ b/images/sds-local-volume-csi/internal/inflight.go @@ -0,0 +1,75 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package internal + +import ( + "sync" + + "k8s.io/klog/v2" +) + +// Idempotent is the interface required to manage in flight requests. +type Idempotent interface { + // The CSI data types are generated using a protobuf. + // The generated structures are guaranteed to implement the Stringer interface. + // Example: https://github.com/container-storage-interface/spec/blob/master/lib/go/csi/csi.pb.go#L3508 + // We can use the generated string as the key of our internal inflight database of requests. + String() string +} + +const ( + VolumeOperationAlreadyExistsErrorMsg = "An operation with the given Volume %s already exists" +) + +// InFlight is a struct used to manage in flight requests for a unique identifier. +type InFlight struct { + mux *sync.Mutex + inFlight map[string]bool +} + +// NewInFlight instanciates a InFlight structures. +func NewInFlight() *InFlight { + return &InFlight{ + mux: &sync.Mutex{}, + inFlight: make(map[string]bool), + } +} + +// Insert inserts the entry to the current list of inflight, request key is a unique identifier. +// Returns false when the key already exists. +func (db *InFlight) Insert(key string) bool { + db.mux.Lock() + defer db.mux.Unlock() + + _, ok := db.inFlight[key] + if ok { + return false + } + + db.inFlight[key] = true + return true +} + +// Delete removes the entry from the inFlight entries map. +// It doesn't return anything, and will do nothing if the specified key doesn't exist. +func (db *InFlight) Delete(key string) { + db.mux.Lock() + defer db.mux.Unlock() + + delete(db.inFlight, key) + klog.V(4).InfoS("Node Service: volume operation finished", "key", key) +} diff --git a/images/sds-local-volume-csi/internal/inflight_test.go b/images/sds-local-volume-csi/internal/inflight_test.go new file mode 100644 index 00000000..4373c59d --- /dev/null +++ b/images/sds-local-volume-csi/internal/inflight_test.go @@ -0,0 +1,112 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package internal + +import ( + "testing" +) + +type testRequest struct { + volumeId string + extra string + expResp bool + delete bool +} + +func TestInFlight(t *testing.T) { + testCases := []struct { + name string + requests []testRequest + }{ + { + name: "success normal", + requests: []testRequest{ + { + + volumeId: "random-vol-name", + expResp: true, + }, + }, + }, + { + name: "success adding request with different volumeId", + requests: []testRequest{ + { + volumeId: "random-vol-foobar", + expResp: true, + }, + { + volumeId: "random-vol-name-foobar", + expResp: true, + }, + }, + }, + { + name: "failed adding request with same volumeId", + requests: []testRequest{ + { + volumeId: "random-vol-name-foobar", + expResp: true, + }, + { + volumeId: "random-vol-name-foobar", + expResp: false, + }, + }, + }, + + { + name: "success add, delete, add copy", + requests: []testRequest{ + { + volumeId: "random-vol-name", + extra: "random-node-id", + expResp: true, + }, + { + volumeId: "random-vol-name", + extra: "random-node-id", + expResp: false, + delete: true, + }, + { + volumeId: "random-vol-name", + extra: "random-node-id", + expResp: true, + }, + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + db := NewInFlight() + for _, r := range tc.requests { + var resp bool + if r.delete { + db.Delete(r.volumeId) + } else { + resp = db.Insert(r.volumeId) + } + if r.expResp != resp { + t.Fatalf("expected insert to be %+v, got %+v", r.expResp, resp) + } + } + }) + + } +} diff --git a/images/sds-local-volume-csi/pkg/utils/volume.go b/images/sds-local-volume-csi/pkg/utils/volume.go index 3444e87a..d0407b94 100644 --- a/images/sds-local-volume-csi/pkg/utils/volume.go +++ b/images/sds-local-volume-csi/pkg/utils/volume.go @@ -23,27 +23,30 @@ import ( "sds-local-volume-csi/pkg/logger" "strings" - mu "k8s.io/mount-utils" + mountutils "k8s.io/mount-utils" utilexec "k8s.io/utils/exec" ) type NodeStoreManager interface { Mount(source, target string, isBlock bool, fsType string, readonly bool, mntOpts []string, lvmType, lvmThinPoolName string) error - Unmount(target string) error + Unstage(target string) error + Unpublish(target string) error IsNotMountPoint(target string) (bool, error) ResizeFS(target string) error + PathExists(path string) (bool, error) + NeedResize(devicePath string, deviceMountPath string) (bool, error) } type Store struct { Log *logger.Logger - NodeStorage mu.SafeFormatAndMount + NodeStorage mountutils.SafeFormatAndMount } func NewStore(logger *logger.Logger) *Store { return &Store{ Log: logger, - NodeStorage: mu.SafeFormatAndMount{ - Interface: mu.New("/bin/mount"), + NodeStorage: mountutils.SafeFormatAndMount{ + Interface: mountutils.New("/bin/mount"), Exec: utilexec.New(), }, } @@ -77,8 +80,15 @@ func (s *Store) Mount(devSourcePath, target string, isBlock bool, fsType string, s.Log.Trace("≈≈≈≈≈≈≈≈≈≈≈≈≈≈≈≈≈≈≈≈ FS MOUNT ≈≈≈≈≈≈≈≈≈≈≈≈≈≈≈≈≈≈≈≈≈≈≈≈") s.Log.Trace("-----------------== start MkdirAll ==-----------------") s.Log.Trace("mkdir create dir =" + target) - if err := os.MkdirAll(target, os.FileMode(0755)); err != nil { - return fmt.Errorf("[MkdirAll] could not create target directory %s: %w", target, err) + exists, err := s.PathExists(target) + if err != nil { + return fmt.Errorf("[PathExists] could not check if target directory %s exists: %w", target, err) + } + if !exists { + s.Log.Debug(fmt.Sprintf("Creating target directory %s", target)) + if err := os.MkdirAll(target, os.FileMode(0755)); err != nil { + return fmt.Errorf("[MkdirAll] could not create target directory %s: %w", target, err) + } } s.Log.Trace("-----------------== stop MkdirAll ==-----------------") @@ -95,7 +105,7 @@ func (s *Store) Mount(devSourcePath, target string, isBlock bool, fsType string, mapperSourcePath := toMapperPath(devSourcePath) s.Log.Trace(fmt.Sprintf("Target %s is a mount point. Checking if it is already mounted to source %s or %s", target, devSourcePath, mapperSourcePath)) - mountedDevicePath, _, err := mu.GetDeviceNameFromMount(s.NodeStorage.Interface, target) + mountedDevicePath, _, err := mountutils.GetDeviceNameFromMount(s.NodeStorage.Interface, target) if err != nil { return fmt.Errorf("failed to find the device mounted at %s: %w", target, err) } @@ -112,7 +122,7 @@ func (s *Store) Mount(devSourcePath, target string, isBlock bool, fsType string, s.Log.Trace("-----------------== start FormatAndMount ==---------------") if lvmType == internal.LVMTypeThin { - s.Log.Trace(fmt.Sprintf("LVM type is Thin. Ckecking free space in thin pool %s", lvmThinPoolName)) + s.Log.Trace(fmt.Sprintf("LVM type is Thin. Thin pool name: %s", lvmThinPoolName)) } err = s.NodeStorage.FormatAndMount(devSourcePath, target, fsType, mntOpts) if err != nil { @@ -148,15 +158,23 @@ func (s *Store) Mount(devSourcePath, target string, isBlock bool, fsType string, return nil } -func (s *Store) Unmount(target string) error { - s.Log.Info(fmt.Sprintf("[unmount volume] target=%s", target)) +func (s *Store) Unpublish(target string) error { + return s.Unstage(target) +} - err := s.NodeStorage.Unmount(target) - if err != nil { - s.Log.Error(err, "[s.NodeStorage.Unmount]: ") +func (s *Store) Unstage(target string) error { + s.Log.Info(fmt.Sprintf("[unmount volume] target=%s", target)) + err := mountutils.CleanupMountPoint(target, s.NodeStorage.Interface, false) + // Ignore the error when it contains "not mounted", because that indicates the + // world is already in the desired state + // + // mount-utils attempts to detect this on its own but fails when running on + // a read-only root filesystem + if err == nil || strings.Contains(fmt.Sprint(err), "not mounted") { + return nil + } else { return err } - return nil } func (s *Store) IsNotMountPoint(target string) (bool, error) { @@ -172,7 +190,7 @@ func (s *Store) IsNotMountPoint(target string) (bool, error) { func (s *Store) ResizeFS(mountTarget string) error { s.Log.Info(" ----== Resize FS ==---- ") - devicePath, _, err := mu.GetDeviceNameFromMount(s.NodeStorage.Interface, mountTarget) + devicePath, _, err := mountutils.GetDeviceNameFromMount(s.NodeStorage.Interface, mountTarget) if err != nil { s.Log.Error(err, "Failed to find the device mounted at mountTarget", "mountTarget", mountTarget) return fmt.Errorf("failed to find the device mounted at %s: %w", mountTarget, err) @@ -180,7 +198,7 @@ func (s *Store) ResizeFS(mountTarget string) error { s.Log.Info("Found device for resizing", "devicePath", devicePath, "mountTarget", mountTarget) - _, err = mu.NewResizeFs(s.NodeStorage.Exec).Resize(devicePath, mountTarget) + _, err = mountutils.NewResizeFs(s.NodeStorage.Exec).Resize(devicePath, mountTarget) if err != nil { s.Log.Error(err, "Failed to resize filesystem", "devicePath", devicePath, "mountTarget", mountTarget) return fmt.Errorf("failed to resize filesystem %s on device %s: %w", mountTarget, devicePath, err) @@ -190,6 +208,14 @@ func (s *Store) ResizeFS(mountTarget string) error { return nil } +func (s *Store) PathExists(path string) (bool, error) { + return mountutils.PathExists(path) +} + +func (s *Store) NeedResize(devicePath string, deviceMountPath string) (bool, error) { + return mountutils.NewResizeFs(s.NodeStorage.Exec).NeedResize(devicePath, deviceMountPath) +} + func toMapperPath(devPath string) string { if !strings.HasPrefix(devPath, "/dev/") { return ""