From 9456cd64e96193b6f17b0869dd0675baee0e4647 Mon Sep 17 00:00:00 2001 From: payes Date: Tue, 27 Aug 2019 17:58:18 +0530 Subject: [PATCH] chore(publish/unpublish): refactor code for node publish and unpublish (#32) Signed-off-by: Payes --- .../openebs.io/core/v1alpha1/csivolume.go | 12 +- pkg/iscsi/v1alpha1/iscsi_util.go | 68 ++-- pkg/iscsi/v1alpha1/mount.go | 23 +- pkg/service/v1alpha1/node.go | 309 ++++++++---------- pkg/service/v1alpha1/service.go | 6 - pkg/utils/v1alpha1/kubernetes.go | 187 +++++------ pkg/utils/v1alpha1/utils.go | 158 ++++++--- 7 files changed, 396 insertions(+), 367 deletions(-) diff --git a/pkg/apis/openebs.io/core/v1alpha1/csivolume.go b/pkg/apis/openebs.io/core/v1alpha1/csivolume.go index 8c456d038..7943b77ab 100644 --- a/pkg/apis/openebs.io/core/v1alpha1/csivolume.go +++ b/pkg/apis/openebs.io/core/v1alpha1/csivolume.go @@ -133,7 +133,7 @@ const ( CSIVolumeStatusMounted CSIVolumeStatus = "Mounted" // CSIVolumeStatusUnMounted indicated that the volume has been successfuly // unmounted and logged out of the node - CSIVolumeStatusUnMounted CSIVolumeStatus = "UnMounted" + CSIVolumeStatusUnmounted CSIVolumeStatus = "Unmounted" // CSIVolumeStatusRaw indicates that the volume is being used in raw format // by the application, therefore CSI has only performed iSCSI login // operation on this volume and avoided filesystem creation and mount. @@ -142,6 +142,16 @@ const ( // the volume has bben started but failed kubernetes needs to retry sending // nodepublish CSIVolumeStatusMountFailed CSIVolumeStatus = "MountFailed" + // CSIVolumeStatusUnmountInProgress indicates that the volume is busy and + // unavailable for use by other goroutines, an unmount operation on volume + // is under progress + CSIVolumeStatusUnmountUnderProgress CSIVolumeStatus = "UnmountUnderProgress" + // CSIVolumeStatusWaitingForCVCBound indicates that the volume components + // are still being created + CSIVolumeStatusWaitingForCVCBound CSIVolumeStatus = "WaitingForCVCBound" + // CSIVolumeStatusWaitingForVolumeToBeReady indicates that the replicas are + // yet to connect to target + CSIVolumeStatusWaitingForVolumeToBeReady CSIVolumeStatus = "WaitingForVolumeToBeReady" ) // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/iscsi/v1alpha1/iscsi_util.go b/pkg/iscsi/v1alpha1/iscsi_util.go index 0a9fb708b..882e6259f 100644 --- a/pkg/iscsi/v1alpha1/iscsi_util.go +++ b/pkg/iscsi/v1alpha1/iscsi_util.go @@ -407,16 +407,17 @@ func (util *ISCSIUtil) AttachDisk(b iscsiDiskMounter) (string, error) { return "", nil } - if err := os.MkdirAll(mntPath, 0750); err != nil { - glog.Errorf("iscsi: failed to mkdir %s, error", mntPath) - return "", err - } - - // Persist iscsi disk config to json file for DetachDisk path - if err := util.persistISCSI(*(b.iscsiDisk), b.targetPath); err != nil { - glog.Errorf("iscsi: failed to save iscsi config with error: %v", err) - return "", err - } + /* + if err := os.MkdirAll(mntPath, 0750); err != nil { + glog.Errorf("iscsi: failed to mkdir %s, error", mntPath) + return "", err + } + // Persist iscsi disk config to json file for DetachDisk path + if err := util.persistISCSI(*(b.iscsiDisk), b.targetPath); err != nil { + glog.Errorf("iscsi: failed to save iscsi config with error: %v", err) + return "", err + } + */ for _, path := range devicePaths { // There shouldnt be any empty device paths. However adding this check @@ -492,18 +493,23 @@ func (util *ISCSIUtil) DetachDisk( var volName, iqn, iface, initiatorName string found := true - // load iscsi disk config from json file - if err := util.loadISCSI(c.iscsiDisk, targetPath); err == nil { - bkpPortal, iqn, iface, volName = c.iscsiDisk.Portals, c.iscsiDisk.Iqn, - c.iscsiDisk.Iface, c.iscsiDisk.VolName - initiatorName = c.iscsiDisk.InitiatorName - } else { - glog.Errorf( - "iscsi detach disk: failed to get iscsi config from path %s Error: %v", - targetPath, err, - ) - return err - } + /* + // load iscsi disk config from json file + if err := util.loadISCSI(c.iscsiDisk, targetPath); err == nil { + bkpPortal, iqn, iface, volName = c.iscsiDisk.Portals, c.iscsiDisk.Iqn, + c.iscsiDisk.Iface, c.iscsiDisk.VolName + initiatorName = c.iscsiDisk.InitiatorName + } else { + glog.Errorf( + "iscsi detach disk: failed to get iscsi config from path %s Error: %v", + targetPath, err, + ) + return err + } + */ + bkpPortal, iqn, iface, volName = c.iscsiDisk.Portals, c.iscsiDisk.Iqn, + c.iscsiDisk.Iface, c.iscsiDisk.VolName + initiatorName = c.iscsiDisk.InitiatorName portals := removeDuplicate(bkpPortal) if len(portals) == 0 { return fmt.Errorf( @@ -660,3 +666,21 @@ func cloneIface(b iscsiDiskMounter, newIface string) error { } return lastErr } + +// UnmountDisk logs out of the iSCSI volume and the corresponding path is removed +func (util *ISCSIUtil) UnmountDisk( + c iscsiDiskUnmounter, + targetPath string, +) error { + if pathExists, pathErr := mount.PathExists(targetPath); pathErr != nil { + return fmt.Errorf("Error checking if path exists: %v", pathErr) + } else if !pathExists { + glog.Warningf( + "Warning: Unmount skipped because path does not exist: %v", + targetPath, + ) + return nil + } + + return c.mounter.Unmount(targetPath) +} diff --git a/pkg/iscsi/v1alpha1/mount.go b/pkg/iscsi/v1alpha1/mount.go index db34e9432..dd032aff7 100644 --- a/pkg/iscsi/v1alpha1/mount.go +++ b/pkg/iscsi/v1alpha1/mount.go @@ -15,6 +15,7 @@ func UnmountAndDetachDisk(vol *apis.CSIVolume, path string) error { Portals: []string{vol.Spec.ISCSI.TargetPortal}, Iqn: vol.Spec.ISCSI.Iqn, lun: vol.Spec.ISCSI.Lun, + Iface: vol.Spec.ISCSI.IscsiInterface, } diskUnmounter := &iscsiDiskUnmounter{ @@ -23,11 +24,7 @@ func UnmountAndDetachDisk(vol *apis.CSIVolume, path string) error { exec: mount.NewOsExec(), } util := &ISCSIUtil{} - err := util.DetachDisk(*diskUnmounter, path) - if err != nil { - return status.Error(codes.Internal, err.Error()) - } - return nil + return util.DetachDisk(*diskUnmounter, path) } // AttachAndMountDisk logs in to the iSCSI Volume @@ -38,14 +35,20 @@ func AttachAndMountDisk(vol *apis.CSIVolume) (string, error) { } iscsiInfo, err := getISCSIInfo(vol) if err != nil { - return "", status.Error(codes.Internal, err.Error()) + return "", err } diskMounter := getISCSIDiskMounter(iscsiInfo, vol) util := &ISCSIUtil{} - devicePath, err := util.AttachDisk(*diskMounter) - if err != nil { - return "", status.Error(codes.Internal, err.Error()) + return util.AttachDisk(*diskMounter) +} + +// Unmount unmounts the path provided +func Unmount(path string) error { + diskUnmounter := &iscsiDiskUnmounter{ + mounter: &mount.SafeFormatAndMount{Interface: mount.New(""), Exec: mount.NewOsExec()}, + exec: mount.NewOsExec(), } - return devicePath, err + util := &ISCSIUtil{} + return util.UnmountDisk(*diskUnmounter, path) } diff --git a/pkg/service/v1alpha1/node.go b/pkg/service/v1alpha1/node.go index 60ad911cb..1469b0acd 100644 --- a/pkg/service/v1alpha1/node.go +++ b/pkg/service/v1alpha1/node.go @@ -17,6 +17,7 @@ limitations under the License. package v1alpha1 import ( + "errors" "fmt" "time" @@ -60,7 +61,6 @@ func prepareVolSpecAndWaitForVolumeReady( WithVolName(req.GetVolumeId()). WithMountPath(req.GetTargetPath()). WithFSType(req.GetVolumeCapability().GetMount().GetFsType()). - WithMountOptions(req.GetVolumeCapability().GetMount().GetMountFlags()). WithReadOnly(req.GetReadonly()).Build() if err != nil { return nil, err @@ -73,8 +73,9 @@ func prepareVolSpecAndWaitForVolumeReady( if isCVCBound, err := utils.IsCVCBound(volumeID); err != nil { return nil, status.Error(codes.Internal, err.Error()) } else if !isCVCBound { + utils.TransitionVolList[volumeID] = apis.CSIVolumeStatusWaitingForCVCBound time.Sleep(10 * time.Second) - return nil, fmt.Errorf("Waiting for CVC to be bound") + return nil, errors.New("Waiting for CVC to be bound") } if err = utils.FetchAndUpdateISCSIDetails(volumeID, vol); err != nil { @@ -98,122 +99,50 @@ func prepareVolSpecAndWaitForVolumeReady( return vol, nil } -func cleanup(vol *apis.CSIVolume, nodeID string) error { - utils.VolumesListLock.Lock() - vol.Status = apis.CSIVolumeStatusMountFailed - if err := utils.DeleteOldCSIVolumeCR( - vol, nodeID, - ); err != nil { - utils.VolumesListLock.Unlock() - return err - } - delete(utils.Volumes, vol.Spec.Volume.Name) - utils.VolumesListLock.Unlock() - return nil +func removeVolumeFromTransitionList(volumeID string) { + utils.TransitionVolListLock.Lock() + defer utils.TransitionVolListLock.Unlock() + delete(utils.TransitionVolList, volumeID) } -func updateCSIVolume( - vol *apis.CSIVolume, - volStatus apis.CSIVolumeStatus, - devicePath string, -) error { - // Setting the devicePath in the volume spec is an indication that the mount - // operation for the volume has been completed for the first time. This - // helps in 2 ways: - // 1) Duplicate nodePublish requests from kubernetes are responded with - // success response if this path is set - // 2) The volumeMonitoring thread doesn't attemp remount unless this path is - // set - utils.VolumesListLock.Lock() - vol.Status = volStatus - vol.Spec.Volume.DevicePath = devicePath - err := utils.UpdateCSIVolumeCR(vol) - if err != nil { - utils.VolumesListLock.Unlock() - return err +func addVolumeToTransitionList(volumeID string, status apis.CSIVolumeStatus) error { + utils.TransitionVolListLock.Lock() + defer utils.TransitionVolListLock.Unlock() + + if _, ok := utils.TransitionVolList[volumeID]; ok { + return fmt.Errorf("Volume Busy, status: %v", + utils.TransitionVolList[volumeID]) } - utils.VolumesListLock.Unlock() + utils.TransitionVolList[volumeID] = status return nil } -func wait() { - utils.VolumesListLock.Unlock() - time.Sleep( - utils.VolumeWaitRetryCount * utils.VolumeWaitTimeout * time.Second, - ) - utils.VolumesListLock.Lock() -} - -func verifyInprogressAndRecreateCSIVolumeCR(vol *apis.CSIVolume) (bool, error) { +// VerifyIfMountRequired returns true if volume is already mounted on targetPath +// and unmounts if it is mounted on a different path +func VerifyIfMountRequired(volumeID, targetPath string) (bool, error) { var ( - reVerified bool - err error + currentMounts []string + err error ) - mountPath := vol.Spec.Volume.MountPath - volumeID := vol.Spec.Volume.Name - nodeID := vol.Labels["nodeID"] - utils.VolumesListLock.Lock() - defer utils.VolumesListLock.Unlock() -verifyPublish: - // Check if the volume has already been published(mounted) or if the mount - // is in progress - if info, ok := utils.Volumes[volumeID]; ok { - // The volume appears to be present in the inmomory list of volumes - // which implies that either the mount operation is complete - // or under progress. - if info.Spec.Volume.MountPath != mountPath { - // The volume appears to be mounted on a different path, which - // implies it is being used by some other pod on the same node. - // Let's wait fo the volume to be unmounted from the other path and - // then retry checking - if !reVerified { - reVerified = true - wait() - goto verifyPublish - } - return false, fmt.Errorf( - "Volume Mounted by a different pod on same node") - // Lets verify if the mount is already completed - } else if info.Spec.Volume.DevicePath != "" { - // Once the devicePath is set implies the volume mount has been - // completed, a success response can be sent back - return true, nil - } else if info.Status == apis.CSIVolumeStatusMountUnderProgress { - // The mount appears to be under progress lets wait for 13 seconds - // and reverify. 13s was decided based on the kubernetes timeout - // values which is 15s. Lets reply to kubernetes before it reattempts - // a duplicate request - if !reVerified { - wait() - reVerified = true - goto verifyPublish - } - // It appears that the mount will still take some more time, - // lets convey the same to kubernetes. The message responded will be - // added to the app description which has requested this volume - return false, fmt.Errorf("Mount under progress") - } - } - // This helps in cases when the node on which the volume was originally - // mounted is down. When that node is down, kubelet would not have been able - // to trigger an unpublish event on that node due to which when it comes up - // it starts remounting that volume. If the node's CSIVolume CR is marked - // for deletion that node will not reattempt to mount this volume again. - if err = utils.DeleteOldCSIVolumeCR( - vol, nodeID, - ); err != nil { - return false, err - } - // This CR creation will help iSCSI target(istgt) identify - // the current owner node of the volume and accordingly the target will - // allow only that node to login to the volume - vol.Status = apis.CSIVolumeStatusMountUnderProgress - err = utils.CreateCSIVolumeCR(vol, nodeID, mountPath) + currentMounts, err = utils.GetMounts(volumeID) if err != nil { return false, err } - utils.Volumes[volumeID] = vol - return false, nil + if len(currentMounts) > 1 { + logrus.Fatalf( + "More than one mounts for volume:%s mounts: %v", + volumeID, currentMounts, + ) + } + if len(currentMounts) == 1 { + if currentMounts[0] == targetPath { + return false, nil + } + if err = iscsi.Unmount(currentMounts[0]); err != nil { + return false, err + } + } + return true, nil } // NodePublishVolume publishes (mounts) the volume @@ -226,59 +155,88 @@ func (ns *node) NodePublishVolume( ) (*csi.NodePublishVolumeResponse, error) { var ( - err error - devicePath string - isMounted bool + err error + devicePath string + vol *apis.CSIVolume + isMountRequired bool ) if err = ns.validateNodePublishReq(req); err != nil { return nil, err } + volumeID := req.GetVolumeId() + targetPath := req.GetTargetPath() nodeID := ns.driver.config.NodeID - vol, err := prepareVolSpecAndWaitForVolumeReady(req, nodeID) + err = addVolumeToTransitionList(volumeID, apis.CSIVolumeStatusUninitialized) if err != nil { return nil, status.Error(codes.Internal, err.Error()) } - if isMounted, err = verifyInprogressAndRecreateCSIVolumeCR(vol); err != nil { + defer removeVolumeFromTransitionList(volumeID) + + if vol, err = prepareVolSpecAndWaitForVolumeReady(req, nodeID); err != nil { return nil, status.Error(codes.Internal, err.Error()) - } else if isMounted { - goto CreateVolumeResponseSuccess } - // Permission is changed for the local directory before the volume is - // mounted on the node. This helps to resolve cases when the CSI driver - // Unmounts the volume to remount again in required mount mode(ro/rw), - // the app starts writing directly in the local directory. - // As soon as the volume is mounted the permissions of this directory are - // automatically changed to allow Reads and writes. - // And as soon as it is unmounted permissions change - // back to what we are setting over here. - if err = utils.ChmodMountPath(vol.Spec.Volume.MountPath); err != nil { - if errCleanup := cleanup(vol, nodeID); errCleanup != nil { - return nil, status.Error(codes.Internal, errCleanup.Error()) - } + isMountRequired, err = VerifyIfMountRequired(volumeID, targetPath) + if err != nil { return nil, status.Error(codes.Internal, err.Error()) } - // Login to the volume and attempt mount operation on the requested path - if devicePath, err = iscsi.AttachAndMountDisk(vol); err != nil { - if errCleanup := cleanup(vol, nodeID); errCleanup != nil { - return nil, status.Error(codes.Internal, errCleanup.Error()) + if isMountRequired { + // Permission is changed for the local directory before the volume is + // mounted on the node. This helps to resolve cases when the CSI driver + // Unmounts the volume to remount again in required mount mode(ro/rw), + // the app starts writing directly in the local directory. + // As soon as the volume is mounted the permissions of this directory are + // automatically changed to allow Reads and writes. + // And as soon as it is unmounted permissions change + // back to what we are setting over here. + if err = utils.ChmodMountPath(vol.Spec.Volume.MountPath); err != nil { + return nil, status.Error(codes.Internal, err.Error()) } - return nil, status.Error(codes.Internal, err.Error()) + utils.TransitionVolList[volumeID] = apis.CSIVolumeStatusMountUnderProgress + // Login to the volume and attempt mount operation on the requested path + if devicePath, err = iscsi.AttachAndMountDisk(vol); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + // TODO update status also in below function + vol.Spec.Volume.DevicePath = devicePath + vol.Status = apis.CSIVolumeStatusMounted + utils.TransitionVolList[volumeID] = apis.CSIVolumeStatusMounted } - if err := updateCSIVolume( - vol, apis.CSIVolumeStatusMounted, - devicePath, - ); err != nil { + err = utils.CreateOrUpdateCSIVolumeCR(vol) + if err != nil { return nil, status.Error(codes.Internal, err.Error()) } - -CreateVolumeResponseSuccess: return &csi.NodePublishVolumeResponse{}, nil } +// IsUnmountRequired returns true if the volume needs to be unmounted +func IsUnmountRequired(volumeID, targetPath string) (bool, error) { + var ( + currentMounts []string + err error + ) + currentMounts, err = utils.GetMounts(volumeID) + if err != nil { + return false, err + } + if len(currentMounts) > 1 { + logrus.Fatalf( + "More than one mounts for volume:%s mounts: %v", + volumeID, currentMounts, + ) + } + if len(currentMounts) == 0 { + return false, nil + } + if currentMounts[0] != targetPath { + return false, err + } + return true, nil +} + // NodeUnpublishVolume unpublishes (unmounts) the volume // from the corresponding node from the given path // @@ -288,7 +246,11 @@ func (ns *node) NodeUnpublishVolume( req *csi.NodeUnpublishVolumeRequest, ) (*csi.NodeUnpublishVolumeResponse, error) { - var err error + var ( + err error + vol *apis.CSIVolume + unmountRequired bool + ) if err = ns.validateNodeUnpublishReq(req); err != nil { return nil, err @@ -296,48 +258,49 @@ func (ns *node) NodeUnpublishVolume( targetPath := req.GetTargetPath() volumeID := req.GetVolumeId() - utils.VolumesListLock.Lock() - vol, ok := utils.Volumes[volumeID] - if !ok { - utils.VolumesListLock.Unlock() - return &csi.NodeUnpublishVolumeResponse{}, nil + + err = addVolumeToTransitionList(volumeID, apis.CSIVolumeStatusUninitialized) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) } + defer removeVolumeFromTransitionList(volumeID) - utils.VolumesListLock.Unlock() - - // if node driver restarts before this step Kubelet will trigger the - // NodeUnpublish command again so there is no need to worry that when this - // driver restarts it will pick up the CSIVolume CR and start monitoring - // mount point again. - // If the node is down for some time, other node driver will first delete - // this node's CSIVolume CR and then only will start its mount process. - // If there is a case that this node comes up and CSIVolume CR is picked and - // this node starts monitoring the mount point while the other node is also - // trying to mount which appears to be a race condition but is not since - // first of all this CR will be marked for deletion when the other node - // starts mounting. But lets say this node started monitoring and - // immediately other node deleted this node's CR, in that case iSCSI - // target(istgt) will pick up the new one and allow only that node to login, - // so all the cases are handled - if err = iscsi.UnmountAndDetachDisk(vol, req.GetTargetPath()); err != nil { - return nil, status.Error(codes.Internal, - err.Error()) + unmountRequired, err = IsUnmountRequired(volumeID, targetPath) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) } + if unmountRequired { + if vol, err = utils.GetCSIVolume(volumeID); (err != nil) || (vol == nil) { + return nil, status.Error(codes.Internal, err.Error()) + } + // if node driver restarts before this step Kubelet will trigger the + // NodeUnpublish command again so there is no need to worry that when this + // driver restarts it will pick up the CSIVolume CR and start monitoring + // mount point again. + // If the node is down for some time, other node driver will first delete + // this node's CSIVolume CR and then only will start its mount process. + // If there is a case that this node comes up and CSIVolume CR is picked and + // this node starts monitoring the mount point while the other node is also + // trying to mount which appears to be a race condition but is not since + // first of all this CR will be marked for deletion when the other node + // starts mounting. But lets say this node started monitoring and + // immediately other node deleted this node's CR, in that case iSCSI + // target(istgt) will pick up the new one and allow only that node to login, + // so all the cases are handled + utils.TransitionVolList[volumeID] = apis.CSIVolumeStatusUnmountUnderProgress + if err = iscsi.UnmountAndDetachDisk(vol, req.GetTargetPath()); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + utils.TransitionVolList[volumeID] = apis.CSIVolumeStatusUnmounted + } // It is safe to delete the CSIVolume CR now since the volume has already // been unmounted and logged out - utils.VolumesListLock.Lock() - err = utils.DeleteCSIVolumeCR(vol) - if err != nil { - utils.VolumesListLock.Unlock() - return nil, status.Error(codes.Internal, - err.Error()) + if err = utils.DeleteCSIVolumeCRForPath(volumeID, targetPath); err != nil { + return nil, status.Error(codes.Internal, err.Error()) } - delete(utils.Volumes, volumeID) - utils.VolumesListLock.Unlock() - - logrus.Infof("hostpath: volume %s/%s has been unmounted.", - targetPath, volumeID) + logrus.Infof("hostpath: volume %s path: %s has been unmounted.", + volumeID, targetPath) return &csi.NodeUnpublishVolumeResponse{}, nil } diff --git a/pkg/service/v1alpha1/service.go b/pkg/service/v1alpha1/service.go index 85ccf33d2..6ddce3cd3 100644 --- a/pkg/service/v1alpha1/service.go +++ b/pkg/service/v1alpha1/service.go @@ -17,8 +17,6 @@ limitations under the License. package v1alpha1 import ( - "log" - "github.com/Sirupsen/logrus" "github.com/container-storage-interface/spec/lib/go/csi" config "github.com/openebs/csi/pkg/config/v1alpha1" @@ -87,10 +85,6 @@ func New(config *config.Config) *CSIDriver { driver.cs = NewController(driver) case "node": - if err := utils.FetchAndUpdateVolInfos(config.NodeID); err != nil { - log.Fatalln(err) - } - // Start monitor goroutine to monitor the // mounted paths. If a path goes down or // becomes read only (in case of RW mount diff --git a/pkg/utils/v1alpha1/kubernetes.go b/pkg/utils/v1alpha1/kubernetes.go index e2edf9006..75f170728 100644 --- a/pkg/utils/v1alpha1/kubernetes.go +++ b/pkg/utils/v1alpha1/kubernetes.go @@ -22,10 +22,22 @@ import ( pv "github.com/openebs/csi/pkg/maya/kubernetes/persistentvolume/v1alpha1" csivolume "github.com/openebs/csi/pkg/volume/v1alpha1" corev1 "k8s.io/api/core/v1" + k8serror "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) +const ( + // TODO csi.openebs.io/nodeid + + // NODEID is the node name on which this pod is currently scheduled + NODEID = "nodeID" + // TODO csi.openebs.io/nodeid + + // VOLNAME is the name of the provisioned volume + VOLNAME = "Volname" +) + // getNodeDetails fetches the nodeInfo for the current node func getNodeDetails(name string) (*corev1.Node, error) { return node.NewKubeClient().Get(name, metav1.GetOptions{}) @@ -43,7 +55,8 @@ func getVolStatus(volumeID string) (string, error) { LabelSelector: "openebs.io/persistent-volume=" + volumeID, } - volumeList, err := csv.NewKubeclient().WithNamespace(OpenEBSNamespace).List(listOptions) + volumeList, err := csv.NewKubeclient(). + WithNamespace(OpenEBSNamespace).List(listOptions) if err != nil { return "", err } @@ -59,17 +72,52 @@ func getVolStatus(volumeID string) (string, error) { return string(volumeList.Items[0].Status.Phase), nil } -// CreateCSIVolumeCR creates a new CSIVolume CR with this nodeID -func CreateCSIVolumeCR(csivol *apis.CSIVolume, nodeID, mountPath string) (err error) { +// GetVolList fetches the current Published Volume list +func GetVolList(volumeID string) (*apis.CSIVolumeList, error) { + listOptions := v1.ListOptions{ + LabelSelector: NODEID + "=" + NodeIDENV, + } + + return csivolume.NewKubeclient(). + WithNamespace(OpenEBSNamespace).List(listOptions) + +} + +// GetCSIVolume fetches the current Published csi Volume +func GetCSIVolume(volumeID string) (*apis.CSIVolume, error) { + csivolname := volumeID + "-" + NodeIDENV + return csivolume.NewKubeclient(). + WithNamespace(OpenEBSNamespace).Get(csivolname, v1.GetOptions{}) +} - csivol.Name = csivol.Spec.Volume.Name + "-" + nodeID +// CreateOrUpdateCSIVolumeCR creates a new CSIVolume CR with this nodeID +func CreateOrUpdateCSIVolumeCR(csivol *apis.CSIVolume) error { + var ( + err error + vol *apis.CSIVolume + ) + + vol, err = GetCSIVolume(csivol.Spec.Volume.Name) + + if err != nil && !k8serror.IsNotFound(err) { + return err + } + + if err == nil && vol != nil && vol.DeletionTimestamp.IsZero() { + vol.Spec.Volume.MountPath = csivol.Spec.Volume.MountPath + _, err = csivolume.NewKubeclient(). + WithNamespace(OpenEBSNamespace).Update(vol) + return err + } + + csivol.Name = csivol.Spec.Volume.Name + "-" + NodeIDENV csivol.Labels = make(map[string]string) - csivol.Spec.Volume.OwnerNodeID = nodeID - csivol.Labels["Volname"] = csivol.Spec.Volume.Name - csivol.Labels["nodeID"] = nodeID - nodeInfo, err := getNodeDetails(nodeID) + csivol.Spec.Volume.OwnerNodeID = NodeIDENV + csivol.Labels[VOLNAME] = csivol.Spec.Volume.Name + csivol.Labels[NODEID] = NodeIDENV + nodeInfo, err := getNodeDetails(NodeIDENV) if err != nil { - return + return err } csivol.OwnerReferences = []v1.OwnerReference{ @@ -80,125 +128,50 @@ func CreateCSIVolumeCR(csivol *apis.CSIVolume, nodeID, mountPath string) (err er UID: nodeInfo.UID, }, } - csivol.Finalizers = []string{nodeID} + csivol.Finalizers = []string{NodeIDENV} - _, err = csivolume.NewKubeclient().WithNamespace(OpenEBSNamespace).Create(csivol) - return + _, err = csivolume.NewKubeclient(). + WithNamespace(OpenEBSNamespace). + Create(csivol) + return err } // UpdateCSIVolumeCR updates CSIVolume CR related to current nodeID func UpdateCSIVolumeCR(csivol *apis.CSIVolume) error { - oldcsivol, err := csivolume.NewKubeclient().WithNamespace(OpenEBSNamespace).Get(csivol.Name, v1.GetOptions{}) + oldcsivol, err := csivolume.NewKubeclient(). + WithNamespace(OpenEBSNamespace). + Get(csivol.Name, v1.GetOptions{}) if err != nil { return err } oldcsivol.Spec.Volume.DevicePath = csivol.Spec.Volume.DevicePath oldcsivol.Status = csivol.Status - _, err = csivolume.NewKubeclient().WithNamespace(OpenEBSNamespace).Update(oldcsivol) + _, err = csivolume.NewKubeclient(). + WithNamespace(OpenEBSNamespace).Update(oldcsivol) return err } -// DeleteOldCSIVolumeCR deletes all CSIVolumes -// related to this volume so that a new one -// can be created with node as current nodeID -func DeleteOldCSIVolumeCR(vol *apis.CSIVolume, nodeID string) (err error) { - listOptions := v1.ListOptions{ - // TODO Update this label selector name as per naming standards - LabelSelector: "Volname=" + vol.Name, - } - - csivols, err := csivolume.NewKubeclient().WithNamespace(OpenEBSNamespace).List(listOptions) - if err != nil { - return - } - - // If a node goes down and kubernetes is unable to send an Unpublish request - // to this node, the CR is marked for deletion but finalizer is not removed - // and a new CR is created for current node. When the degraded node comes up - // it removes the finalizer and the CR is deleted. - for _, csivol := range csivols.Items { - if csivol.Labels["nodeID"] == nodeID { - csivol.Finalizers = nil - _, err = csivolume.NewKubeclient().WithNamespace(OpenEBSNamespace).Update(&csivol) - if err != nil { - return - } - } - - err = csivolume.NewKubeclient().WithNamespace(OpenEBSNamespace).Delete(csivol.Name) - if err != nil { - return - } - } - return -} - // TODO Explain when a create of csi volume happens & when it // gets deleted or replaced or updated -// -// DeleteCSIVolumeCR removes the CSIVolume with this nodeID as -// labelSelector from the list -func DeleteCSIVolumeCR(vol *apis.CSIVolume) (err error) { - var csivols *apis.CSIVolumeList - listOptions := v1.ListOptions{ - // TODO use label as per standards - LabelSelector: "Volname=" + vol.Spec.Volume.Name, - } - csivols, err = csivolume.NewKubeclient().WithNamespace(OpenEBSNamespace).List(listOptions) - if err != nil { - return +// DeleteCSIVolumeCRForPath removes the CSIVolumeCR for the specified path +func DeleteCSIVolumeCRForPath(volumeID, targetPath string) error { + csivol, err := GetCSIVolume(volumeID) + if k8serror.IsNotFound(err) { + return nil } - - for _, csivol := range csivols.Items { - if csivol.Spec.Volume.OwnerNodeID == vol.Spec.Volume.OwnerNodeID { - csivol.Finalizers = nil - _, err = csivolume.NewKubeclient().WithNamespace(OpenEBSNamespace).Update(&csivol) - if err != nil { - return - } - - err = csivolume.NewKubeclient().WithNamespace(OpenEBSNamespace).Delete(csivol.Name) - if err != nil { - return - } - } - } - return -} - -// FetchAndUpdateVolInfos gets the list of CSIVolInfos -// that are supposed to be mounted on this node and -// stores the info in memory. This is required when the -// CSI driver gets restarted & hence start monitoring all -// the existing volumes and at the same time use this -// logic to reject duplicate volume creation requests -func FetchAndUpdateVolInfos(nodeID string) (err error) { - var listOptions v1.ListOptions - - if nodeID != "" { - listOptions = v1.ListOptions{ - LabelSelector: "nodeID=" + nodeID, - } + if csivol.Spec.Volume.MountPath != targetPath { + return nil } - - csivols, err := csivolume.NewKubeclient().WithNamespace(OpenEBSNamespace).List(listOptions) + csivol.Finalizers = nil + _, err = csivolume.NewKubeclient(). + WithNamespace(OpenEBSNamespace).Update(csivol) if err != nil { - return - } - - for _, csivol := range csivols.Items { - if !csivol.DeletionTimestamp.IsZero() { - continue - } - vol := csivol - if vol.Status == apis.CSIVolumeStatusMountUnderProgress { - vol.Status = apis.CSIVolumeStatusUninitialized - } - Volumes[csivol.Spec.Volume.Name] = &vol + return err } - return + return csivolume.NewKubeclient(). + WithNamespace(OpenEBSNamespace).Delete(csivol.Name) } diff --git a/pkg/utils/v1alpha1/utils.go b/pkg/utils/v1alpha1/utils.go index e22526981..873a22e3d 100644 --- a/pkg/utils/v1alpha1/utils.go +++ b/pkg/utils/v1alpha1/utils.go @@ -20,12 +20,21 @@ import ( const ( // TODO make VolumeWaitTimeout as env + + // VolumeWaitTimeout indicates the timegap between two consecutive volume + // status check attempts VolumeWaitTimeout = 2 // TODO make VolumeWaitRetryCount as env + + // VolumeWaitRetryCount indicates the number of retries made to check the + // status of volume before erroring out VolumeWaitRetryCount = 6 // TODO make MonitorMountRetryTimeout as env + + // MonitorMountRetryTimeout indicates the time gap between two consecutive + //monitoring attempts MonitorMountRetryTimeout = 5 ) @@ -33,20 +42,19 @@ var ( // OpenEBSNamespace is openebs system namespace OpenEBSNamespace string - // Volumes contains the list of volumes created in case of controller plugin - // and list of volumes attached to this node in node plugin - // This list is protected by VolumesListLock - Volumes map[string]*apis.CSIVolume + // NodeIDENV is the NodeID of the node on which the pod is present + NodeIDENV string - // VolumesListLock is required to protect the above Volumes list - VolumesListLock sync.RWMutex + // TransitionVolList contains the list of volumes under transition + // This list is protected by TransitionVolListLock + TransitionVolList map[string]apis.CSIVolumeStatus + + // TransitionVolListLock is required to protect the above Volumes list + TransitionVolListLock sync.RWMutex // ReqMountList contains the list of volumes which are required // to be remounted. This list is secured by ReqMountListLock - ReqMountList map[string]bool - - // ReqMountListLock is required to protect the above ReqMount list - ReqMountListLock sync.RWMutex + ReqMountList map[string]apis.CSIVolumeStatus ) const ( @@ -60,15 +68,21 @@ func init() { if OpenEBSNamespace == "" { logrus.Fatalf("OPENEBS_NAMESPACE environment variable not set") } + NodeIDENV = os.Getenv("OPENEBS_NODE_ID") + if NodeIDENV == "" && os.Getenv("OPENEBS_NODE_DRIVER") != "" { + logrus.Fatalf("OPENEBS_NODE_ID not set") + } - Volumes = map[string]*apis.CSIVolume{} - ReqMountList = make(map[string]bool) + TransitionVolList = make(map[string]apis.CSIVolumeStatus) + ReqMountList = make(map[string]apis.CSIVolumeStatus) } -// parseEndpoint should have a valid prefix(unix/tcp) to return a valid endpoint parts +// parseEndpoint should have a valid prefix(unix/tcp) +// to return a valid endpoint parts func parseEndpoint(ep string) (string, string, error) { - if strings.HasPrefix(strings.ToLower(ep), "unix://") || strings.HasPrefix(strings.ToLower(ep), "tcp://") { + if strings.HasPrefix(strings.ToLower(ep), "unix://") || + strings.HasPrefix(strings.ToLower(ep), "tcp://") { s := strings.SplitN(ep, "://", 2) if s[1] != "" { return s[0], s[1], nil @@ -79,7 +93,10 @@ func parseEndpoint(ep string) (string, string, error) { // logGRPC logs all the grpc related errors, i.e the final errors // which are returned to the grpc clients -func logGRPC(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { +func logGRPC( + ctx context.Context, req interface{}, + info *grpc.UnaryServerInfo, handler grpc.UnaryHandler, +) (interface{}, error) { logrus.Infof("GRPC call: %s", info.FullMethod) logrus.Infof("GRPC request: %s", protosanitizer.StripSecrets(req)) resp, err := handler(ctx, req) @@ -119,11 +136,12 @@ func WaitForVolumeToBeReachable(targetPortal string) error { time.Sleep(VolumeWaitTimeout * time.Second) retries++ if retries >= VolumeWaitRetryCount { - // Let the caller function decide further if the volume is not reachable - // even after 12 seconds ( This number was arrived at + // Let the caller function decide further if the volume is + // not reachable even after 12 seconds ( This number was arrived at // based on the kubelets retrying logic. Kubelet retries to publish // volume after every 14s ) - return fmt.Errorf("iSCSI Target not reachable, TargetPortal %v, err:%v", + return fmt.Errorf( + "iSCSI Target not reachable, TargetPortal %v, err:%v", targetPortal, err) } } @@ -146,8 +164,11 @@ checkVolumeStatus: // ready to accdept IOs after 12 seconds ( This number was arrived at // based on the kubelets retrying logic. Kubelet retries to publish // volume after every 14s ) - return fmt.Errorf("Volume is not ready: Replicas yet to connect to controller") + return fmt.Errorf( + "Volume is not ready: Replicas yet to connect to controller", + ) } else { + TransitionVolList[volumeID] = apis.CSIVolumeStatusWaitingForVolumeToBeReady time.Sleep(VolumeWaitTimeout * time.Second) retries++ goto checkVolumeStatus @@ -155,6 +176,7 @@ checkVolumeStatus: return nil } +/* // GetVolumeByName fetches the volume from Volumes list based on th input name func GetVolumeByName(volName string) (*apis.CSIVolume, error) { for _, Vol := range Volumes { @@ -165,8 +187,10 @@ func GetVolumeByName(volName string) (*apis.CSIVolume, error) { return nil, fmt.Errorf("volume name %s does not exit in the volumes list", volName) } - -func listContains(mountPath string, list []mount.MountPoint) (*mount.MountPoint, bool) { +*/ +func listContains( + mountPath string, list []mount.MountPoint, +) (*mount.MountPoint, bool) { for _, info := range list { if info.Path == mountPath { mntInfo := info @@ -187,41 +211,53 @@ func listContains(mountPath string, list []mount.MountPoint) (*mount.MountPoint, // For each remount operation a new goroutine is created, so that if multiple // volumes have lost their original state they can all be remounted in parallel func MonitorMounts() { + var ( + err error + csivolList *apis.CSIVolumeList + mountList []mount.MountPoint + ) mounter := mount.New("") ticker := time.NewTicker(MonitorMountRetryTimeout * time.Second) for { select { case <-ticker.C: // Get list of mounted paths present with the node - list, _ := mounter.List() - VolumesListLock.RLock() - for _, vol := range Volumes { - if vol.Spec.Volume.DevicePath == "" { - // If device path is not set implies the node publish - // operation is not completed yet - continue - } + TransitionVolListLock.Lock() + if mountList, err = mounter.List(); err != nil { + break + } + if csivolList, err = GetVolList(NodeIDENV); err != nil { + break + } + for _, vol := range csivolList.Items { // Search the volume in the list of mounted volumes at the node // retrieved above - mountPoint, exists := listContains(vol.Spec.Volume.MountPath, list) + mountPoint, exists := listContains( + vol.Spec.Volume.MountPath, mountList, + ) // If the volume is present in the list verify its state if exists && verifyMountOpts(mountPoint.Opts, "rw") { // Continue with remaining volumes since this volume looks // to be in good shape continue } - // Skip remount if the volume is already being remounted - if _, isRemounting := ReqMountList[vol.Spec.Volume.Name]; isRemounting { - continue + if _, ok := TransitionVolList[vol.Spec.Volume.Name]; !ok { + TransitionVolList[vol.Spec.Volume.Name] = vol.Status + ReqMountList[vol.Spec.Volume.Name] = vol.Status + csivol := vol + go func() { + devicePath, err := RemountVolume( + exists, &csivol, mountPoint, + vol.Spec.Volume.MountPath, + ) + logrus.Errorf( + "Remount failed: DevPath: %v %v", + devicePath, err, + ) + }() } - // Add volume to the reqMountList and start a goroutine to - // remount it - ReqMountListLock.Lock() - ReqMountList[vol.Spec.Volume.Name] = true - ReqMountListLock.Unlock() - go RemountVolume(exists, vol, mountPoint, vol.Spec.Volume.MountPath) } - VolumesListLock.RUnlock() + TransitionVolListLock.Unlock() } } } @@ -231,19 +267,20 @@ func MonitorMounts() { // are met. This function stops the driver from overloading the OS with iSCSI // login commands. func WaitForVolumeReadyAndReachable(vol *apis.CSIVolume) { + var err error for { // This function return after 12s in case the volume is not ready - if err := WaitForVolumeToBeReady(vol.Spec.Volume.Name); err != nil { + if err = WaitForVolumeToBeReady(vol.Spec.Volume.Name); err != nil { logrus.Error(err) // Keep retrying until the volume is ready continue } // This function return after 12s in case the volume is not reachable - if err := WaitForVolumeToBeReachable(vol.Spec.ISCSI.TargetPortal); err == nil { + err = WaitForVolumeToBeReachable(vol.Spec.ISCSI.TargetPortal) + if err == nil { return - } else { - logrus.Error(err) } + logrus.Error(err) } } @@ -259,7 +296,11 @@ func verifyMountOpts(opts []string, desiredOpt string) bool { // RemountVolume unmounts the volume if it is already mounted in an undesired // state and then tries to mount again. If it is not mounted the volume, first // the disk will be attached via iSCSI login and then it will be mounted -func RemountVolume(exists bool, vol *apis.CSIVolume, mountPoint *mount.MountPoint, desiredMountOpt string) (devicePath string, err error) { +func RemountVolume( + exists bool, vol *apis.CSIVolume, + mountPoint *mount.MountPoint, + desiredMountOpt string, +) (devicePath string, err error) { mounter := mount.New("") options := []string{"rw"} // Wait until it is possible to chhange the state of mountpoint or when @@ -276,12 +317,33 @@ func RemountVolume(exists bool, vol *apis.CSIVolume, mountPoint *mount.MountPoin // A complete attach and mount is performed if for some reason disk is // not present in the mounted list with the OS. devicePath, err = iscsi.AttachAndMountDisk(vol) - //TODO Updadate devicePath in inmemory list and CR } - ReqMountListLock.Lock() + TransitionVolListLock.Lock() // Remove the volume from ReqMountList once the remount operation is // complete + delete(TransitionVolList, vol.Spec.Volume.Name) delete(ReqMountList, vol.Spec.Volume.Name) - ReqMountListLock.Unlock() + TransitionVolListLock.Unlock() return } + +// GetMounts gets mountpoints for the specified volume +func GetMounts(volumeID string) ([]string, error) { + + var ( + currentMounts []string + err error + mountList []mount.MountPoint + ) + mounter := mount.New("") + // Get list of mounted paths present with the node + if mountList, err = mounter.List(); err != nil { + return nil, err + } + for _, mntInfo := range mountList { + if strings.Contains(mntInfo.Path, volumeID) { + currentMounts = append(currentMounts, mntInfo.Path) + } + } + return currentMounts, nil +}