diff --git a/pkg/apis/openebs.io/core/v1alpha1/csivolume.go b/pkg/apis/openebs.io/core/v1alpha1/csivolume.go index 004b74f0a..8c456d038 100644 --- a/pkg/apis/openebs.io/core/v1alpha1/csivolume.go +++ b/pkg/apis/openebs.io/core/v1alpha1/csivolume.go @@ -29,7 +29,8 @@ type CSIVolume struct { metav1.TypeMeta `json:",inline"` metav1.ObjectMeta `json:"metadata,omitempty"` - Spec CSIVolumeSpec `json:"spec"` + Spec CSIVolumeSpec `json:"spec"` + Status CSIVolumeStatus `json:"status"` } // CSIVolumeSpec is the spec for a CStorVolume resource @@ -40,7 +41,7 @@ type CSIVolumeSpec struct { // ISCSIInfo specific to ISCSI protocol, // this is filled only if the volume type // is iSCSI - ISCSI ISCSIInfo `json: "iscsi"` + ISCSI ISCSIInfo `json:"iscsi"` } // VolumeInfo contains the volume related info @@ -114,6 +115,35 @@ type ISCSIInfo struct { Lun string `json:"lun"` } +// CSIVolumeStatus status represents the current mount status of the volume +type CSIVolumeStatus string + +// CSIVolumeStatusMounting indicated that a mount operation has been triggered +// on the volume and is under progress +const ( + // CSIVolumeStatusUninitialized indicates that no operation has been + // performed on the volume yet on this node + CSIVolumeStatusUninitialized CSIVolumeStatus = "" + // CSIVolumeStatusMountUnderProgress indicates that the volume is busy and + // unavailable for use by other goroutines, an iSCSI login followed by mount + // is under progress on this volume + CSIVolumeStatusMountUnderProgress CSIVolumeStatus = "MountUnderProgress" + // CSIVolumeStatusMounteid indicated that the volume has been successfulled + // mounted on the node + CSIVolumeStatusMounted CSIVolumeStatus = "Mounted" + // CSIVolumeStatusUnMounted indicated that the volume has been successfuly + // unmounted and logged out of the node + CSIVolumeStatusUnMounted CSIVolumeStatus = "UnMounted" + // CSIVolumeStatusRaw indicates that the volume is being used in raw format + // by the application, therefore CSI has only performed iSCSI login + // operation on this volume and avoided filesystem creation and mount. + CSIVolumeStatusRaw CSIVolumeStatus = "Raw" + // CSIVolumeStatusMountFailed indicates that login and mount process from + // the volume has bben started but failed kubernetes needs to retry sending + // nodepublish + CSIVolumeStatusMountFailed CSIVolumeStatus = "MountFailed" +) + // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object // +resource:path=csivolumes diff --git a/pkg/generated/clientset/core/internalclientset/typed/core/v1alpha1/csivolume.go b/pkg/generated/clientset/core/internalclientset/typed/core/v1alpha1/csivolume.go index 222817bf1..ac7f50513 100644 --- a/pkg/generated/clientset/core/internalclientset/typed/core/v1alpha1/csivolume.go +++ b/pkg/generated/clientset/core/internalclientset/typed/core/v1alpha1/csivolume.go @@ -39,6 +39,7 @@ type CSIVolumesGetter interface { type CSIVolumeInterface interface { Create(*v1alpha1.CSIVolume) (*v1alpha1.CSIVolume, error) Update(*v1alpha1.CSIVolume) (*v1alpha1.CSIVolume, error) + UpdateStatus(*v1alpha1.CSIVolume) (*v1alpha1.CSIVolume, error) Delete(name string, options *v1.DeleteOptions) error DeleteCollection(options *v1.DeleteOptions, listOptions v1.ListOptions) error Get(name string, options v1.GetOptions) (*v1alpha1.CSIVolume, error) @@ -132,6 +133,22 @@ func (c *cSIVolumes) Update(cSIVolume *v1alpha1.CSIVolume) (result *v1alpha1.CSI return } +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). + +func (c *cSIVolumes) UpdateStatus(cSIVolume *v1alpha1.CSIVolume) (result *v1alpha1.CSIVolume, err error) { + result = &v1alpha1.CSIVolume{} + err = c.client.Put(). + Namespace(c.ns). + Resource("csivolumes"). + Name(cSIVolume.Name). + SubResource("status"). + Body(cSIVolume). + Do(). + Into(result) + return +} + // Delete takes name of the cSIVolume and deletes it. Returns an error if one occurs. func (c *cSIVolumes) Delete(name string, options *v1.DeleteOptions) error { return c.client.Delete(). diff --git a/pkg/generated/clientset/core/internalclientset/typed/core/v1alpha1/fake/fake_csivolume.go b/pkg/generated/clientset/core/internalclientset/typed/core/v1alpha1/fake/fake_csivolume.go index 51b545f30..6109ce865 100644 --- a/pkg/generated/clientset/core/internalclientset/typed/core/v1alpha1/fake/fake_csivolume.go +++ b/pkg/generated/clientset/core/internalclientset/typed/core/v1alpha1/fake/fake_csivolume.go @@ -100,6 +100,18 @@ func (c *FakeCSIVolumes) Update(cSIVolume *v1alpha1.CSIVolume) (result *v1alpha1 return obj.(*v1alpha1.CSIVolume), err } +// UpdateStatus was generated because the type contains a Status member. +// Add a +genclient:noStatus comment above the type to avoid generating UpdateStatus(). +func (c *FakeCSIVolumes) UpdateStatus(cSIVolume *v1alpha1.CSIVolume) (*v1alpha1.CSIVolume, error) { + obj, err := c.Fake. + Invokes(testing.NewUpdateSubresourceAction(csivolumesResource, "status", c.ns, cSIVolume), &v1alpha1.CSIVolume{}) + + if obj == nil { + return nil, err + } + return obj.(*v1alpha1.CSIVolume), err +} + // Delete takes name of the cSIVolume and deletes it. Returns an error if one occurs. func (c *FakeCSIVolumes) Delete(name string, options *v1.DeleteOptions) error { _, err := c.Fake. diff --git a/pkg/service/v1alpha1/node.go b/pkg/service/v1alpha1/node.go index b92a648cc..60ad911cb 100644 --- a/pkg/service/v1alpha1/node.go +++ b/pkg/service/v1alpha1/node.go @@ -17,10 +17,12 @@ limitations under the License. package v1alpha1 import ( + "fmt" "time" "github.com/Sirupsen/logrus" "github.com/container-storage-interface/spec/lib/go/csi" + apis "github.com/openebs/csi/pkg/apis/openebs.io/core/v1alpha1" iscsi "github.com/openebs/csi/pkg/iscsi/v1alpha1" "github.com/openebs/csi/pkg/utils/v1alpha1" csivol "github.com/openebs/csi/pkg/volume/v1alpha1" @@ -43,61 +45,46 @@ func NewNode(d *CSIDriver) csi.NodeServer { } } -// NodePublishVolume publishes (mounts) the volume -// at the corresponding node at a given path -// -// This implements csi.NodeServer -func (ns *node) NodePublishVolume( - ctx context.Context, +func prepareVolSpecAndWaitForVolumeReady( req *csi.NodePublishVolumeRequest, -) (*csi.NodePublishVolumeResponse, error) { - - var ( - err error - reVerified bool - devicePath string - ) - - if err = ns.validateNodePublishReq(req); err != nil { - return nil, err - } - - mountPath := req.GetTargetPath() + nodeID string, +) (*apis.CSIVolume, error) { volumeID := req.GetVolumeId() + labels := map[string]string{ + "nodeID": nodeID, + } vol, err := csivol.NewBuilder(). WithName(req.GetVolumeId()). + WithLabels(labels). WithVolName(req.GetVolumeId()). WithMountPath(req.GetTargetPath()). WithFSType(req.GetVolumeCapability().GetMount().GetFsType()). WithMountOptions(req.GetVolumeCapability().GetMount().GetMountFlags()). WithReadOnly(req.GetReadonly()).Build() if err != nil { - return nil, status.Error(codes.InvalidArgument, err.Error()) + return nil, err } - if err = utils.PatchCVCNodeID(volumeID, ns.driver.config.NodeID); err != nil { - return nil, - status.Error(codes.Internal, err.Error()) + if err = utils.PatchCVCNodeID(volumeID, nodeID); err != nil { + return nil, err } if isCVCBound, err := utils.IsCVCBound(volumeID); err != nil { return nil, status.Error(codes.Internal, err.Error()) } else if !isCVCBound { time.Sleep(10 * time.Second) - return nil, status.Error(codes.Internal, "Waiting for CVC to be bound") + return nil, fmt.Errorf("Waiting for CVC to be bound") } if err = utils.FetchAndUpdateISCSIDetails(volumeID, vol); err != nil { - return nil, - status.Error(codes.Internal, err.Error()) + return nil, err } //Check if volume is ready to serve IOs, //info is fetched from the cstorvolume CR if err := utils.WaitForVolumeToBeReady(volumeID); err != nil { - return nil, - status.Error(codes.Internal, err.Error()) + return nil, err } // A temporary TCP connection is made to the volume to check if its @@ -108,64 +95,157 @@ func (ns *node) NodePublishVolume( return nil, status.Error(codes.Internal, err.Error()) } + return vol, nil +} - // TODO put this tag in a function and defer to unlock in this function -verifyPublish: +func cleanup(vol *apis.CSIVolume, nodeID string) error { utils.VolumesListLock.Lock() + vol.Status = apis.CSIVolumeStatusMountFailed + if err := utils.DeleteOldCSIVolumeCR( + vol, nodeID, + ); err != nil { + utils.VolumesListLock.Unlock() + return err + } + delete(utils.Volumes, vol.Spec.Volume.Name) + utils.VolumesListLock.Unlock() + return nil +} + +func updateCSIVolume( + vol *apis.CSIVolume, + volStatus apis.CSIVolumeStatus, + devicePath string, +) error { + // Setting the devicePath in the volume spec is an indication that the mount + // operation for the volume has been completed for the first time. This + // helps in 2 ways: + // 1) Duplicate nodePublish requests from kubernetes are responded with + // success response if this path is set + // 2) The volumeMonitoring thread doesn't attemp remount unless this path is + // set + utils.VolumesListLock.Lock() + vol.Status = volStatus + vol.Spec.Volume.DevicePath = devicePath + err := utils.UpdateCSIVolumeCR(vol) + if err != nil { + utils.VolumesListLock.Unlock() + return err + } + utils.VolumesListLock.Unlock() + return nil +} + +func wait() { + utils.VolumesListLock.Unlock() + time.Sleep( + utils.VolumeWaitRetryCount * utils.VolumeWaitTimeout * time.Second, + ) + utils.VolumesListLock.Lock() +} + +func verifyInprogressAndRecreateCSIVolumeCR(vol *apis.CSIVolume) (bool, error) { + var ( + reVerified bool + err error + ) + mountPath := vol.Spec.Volume.MountPath + volumeID := vol.Spec.Volume.Name + nodeID := vol.Labels["nodeID"] + utils.VolumesListLock.Lock() + defer utils.VolumesListLock.Unlock() +verifyPublish: // Check if the volume has already been published(mounted) or if the mount // is in progress if info, ok := utils.Volumes[volumeID]; ok { // The volume appears to be present in the inmomory list of volumes // which implies that either the mount operation is complete // or under progress. - // Lets verify if the mount is already completed - if info.Spec.Volume.DevicePath != "" { + if info.Spec.Volume.MountPath != mountPath { + // The volume appears to be mounted on a different path, which + // implies it is being used by some other pod on the same node. + // Let's wait fo the volume to be unmounted from the other path and + // then retry checking + if !reVerified { + reVerified = true + wait() + goto verifyPublish + } + return false, fmt.Errorf( + "Volume Mounted by a different pod on same node") + // Lets verify if the mount is already completed + } else if info.Spec.Volume.DevicePath != "" { // Once the devicePath is set implies the volume mount has been // completed, a success response can be sent back - utils.VolumesListLock.Unlock() - return &csi.NodePublishVolumeResponse{}, nil + return true, nil + } else if info.Status == apis.CSIVolumeStatusMountUnderProgress { + // The mount appears to be under progress lets wait for 13 seconds + // and reverify. 13s was decided based on the kubernetes timeout + // values which is 15s. Lets reply to kubernetes before it reattempts + // a duplicate request + if !reVerified { + wait() + reVerified = true + goto verifyPublish + } + // It appears that the mount will still take some more time, + // lets convey the same to kubernetes. The message responded will be + // added to the app description which has requested this volume + return false, fmt.Errorf("Mount under progress") } - // The mount appears to be under progress lets wait for 13 seconds and - // reverify. 13s was decided based on the kubernetes timeout values - // which is 15s. Lets reply to kubernetes before it reattempts a - // duplicate request - utils.VolumesListLock.Unlock() - if !reVerified { - time.Sleep( - utils.VolumeWaitRetryCount * utils.VolumeWaitTimeout * time.Second, - ) - reVerified = true - goto verifyPublish - } - // It appears that the mount will still take some more time, - // lets convey the same to kubernetes. The message responded will be - // added to the app description which has requested this volume - - return nil, status.Error(codes.Internal, "Mount under progress") } - // This helps in cases when the node on which the volume was originally // mounted is down. When that node is down, kubelet would not have been able // to trigger an unpublish event on that node due to which when it comes up // it starts remounting that volume. If the node's CSIVolume CR is marked // for deletion that node will not reattempt to mount this volume again. if err = utils.DeleteOldCSIVolumeCR( - vol, ns.driver.config.NodeID, + vol, nodeID, ); err != nil { - utils.VolumesListLock.Unlock() - return nil, status.Error(codes.Internal, err.Error()) + return false, err } // This CR creation will help iSCSI target(istgt) identify // the current owner node of the volume and accordingly the target will // allow only that node to login to the volume - err = utils.CreateCSIVolumeCR(vol, ns.driver.config.NodeID, mountPath) + vol.Status = apis.CSIVolumeStatusMountUnderProgress + err = utils.CreateCSIVolumeCR(vol, nodeID, mountPath) if err != nil { - utils.VolumesListLock.Unlock() - return nil, status.Error(codes.Internal, err.Error()) + return false, err } utils.Volumes[volumeID] = vol - utils.VolumesListLock.Unlock() + return false, nil +} + +// NodePublishVolume publishes (mounts) the volume +// at the corresponding node at a given path +// +// This implements csi.NodeServer +func (ns *node) NodePublishVolume( + ctx context.Context, + req *csi.NodePublishVolumeRequest, +) (*csi.NodePublishVolumeResponse, error) { + + var ( + err error + devicePath string + isMounted bool + ) + + if err = ns.validateNodePublishReq(req); err != nil { + return nil, err + } + nodeID := ns.driver.config.NodeID + + vol, err := prepareVolSpecAndWaitForVolumeReady(req, nodeID) + if err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } + if isMounted, err = verifyInprogressAndRecreateCSIVolumeCR(vol); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } else if isMounted { + goto CreateVolumeResponseSuccess + } // Permission is changed for the local directory before the volume is // mounted on the node. This helps to resolve cases when the CSI driver // Unmounts the volume to remount again in required mount mode(ro/rw), @@ -175,25 +255,27 @@ verifyPublish: // And as soon as it is unmounted permissions change // back to what we are setting over here. if err = utils.ChmodMountPath(vol.Spec.Volume.MountPath); err != nil { + if errCleanup := cleanup(vol, nodeID); errCleanup != nil { + return nil, status.Error(codes.Internal, errCleanup.Error()) + } return nil, status.Error(codes.Internal, err.Error()) } // Login to the volume and attempt mount operation on the requested path if devicePath, err = iscsi.AttachAndMountDisk(vol); err != nil { + if errCleanup := cleanup(vol, nodeID); errCleanup != nil { + return nil, status.Error(codes.Internal, errCleanup.Error()) + } return nil, status.Error(codes.Internal, err.Error()) } - // Setting the devicePath in the volume spec is an indication that the mount - // operation for the volume has been completed for the first time. This - // helps in 2 ways: - // 1) Duplicate nodePublish requests from kubernetes are responded with - // success response if this path is set - // 2) The volumeMonitoring thread doesn't attemp remount unless this path is - // set - utils.VolumesListLock.Lock() - // TODO set this state in etcd also - vol.Spec.Volume.DevicePath = devicePath - utils.VolumesListLock.Unlock() + if err := updateCSIVolume( + vol, apis.CSIVolumeStatusMounted, + devicePath, + ); err != nil { + return nil, status.Error(codes.Internal, err.Error()) + } +CreateVolumeResponseSuccess: return &csi.NodePublishVolumeResponse{}, nil } @@ -221,7 +303,6 @@ func (ns *node) NodeUnpublishVolume( return &csi.NodeUnpublishVolumeResponse{}, nil } - delete(utils.Volumes, volumeID) utils.VolumesListLock.Unlock() // if node driver restarts before this step Kubelet will trigger the @@ -239,18 +320,21 @@ func (ns *node) NodeUnpublishVolume( // target(istgt) will pick up the new one and allow only that node to login, // so all the cases are handled if err = iscsi.UnmountAndDetachDisk(vol, req.GetTargetPath()); err != nil { - // TODO If this error occurs then the stale entry will never get deleted return nil, status.Error(codes.Internal, err.Error()) } // It is safe to delete the CSIVolume CR now since the volume has already // been unmounted and logged out + utils.VolumesListLock.Lock() err = utils.DeleteCSIVolumeCR(vol) if err != nil { + utils.VolumesListLock.Unlock() return nil, status.Error(codes.Internal, err.Error()) } + delete(utils.Volumes, volumeID) + utils.VolumesListLock.Unlock() logrus.Infof("hostpath: volume %s/%s has been unmounted.", targetPath, volumeID) @@ -351,7 +435,9 @@ func (ns *node) NodeGetVolumeStats( return nil, status.Error(codes.Unimplemented, "") } -func (ns *node) validateNodePublishReq(req *csi.NodePublishVolumeRequest) error { +func (ns *node) validateNodePublishReq( + req *csi.NodePublishVolumeRequest, +) error { if req.GetVolumeCapability() == nil { return status.Error(codes.InvalidArgument, "Volume capability missing in request") @@ -364,7 +450,9 @@ func (ns *node) validateNodePublishReq(req *csi.NodePublishVolumeRequest) error return nil } -func (ns *node) validateNodeUnpublishReq(req *csi.NodeUnpublishVolumeRequest) error { +func (ns *node) validateNodeUnpublishReq( + req *csi.NodeUnpublishVolumeRequest, +) error { if req.GetVolumeId() == "" { return status.Error(codes.InvalidArgument, "Volume ID missing in request") diff --git a/pkg/service/v1alpha1/service.go b/pkg/service/v1alpha1/service.go index 4088d85be..85ccf33d2 100644 --- a/pkg/service/v1alpha1/service.go +++ b/pkg/service/v1alpha1/service.go @@ -17,6 +17,8 @@ limitations under the License. package v1alpha1 import ( + "log" + "github.com/Sirupsen/logrus" "github.com/container-storage-interface/spec/lib/go/csi" config "github.com/openebs/csi/pkg/config/v1alpha1" @@ -85,7 +87,9 @@ func New(config *config.Config) *CSIDriver { driver.cs = NewController(driver) case "node": - // utils.FetchAndUpdateVolInfos(config.NodeID) + if err := utils.FetchAndUpdateVolInfos(config.NodeID); err != nil { + log.Fatalln(err) + } // Start monitor goroutine to monitor the // mounted paths. If a path goes down or diff --git a/pkg/utils/v1alpha1/kubernetes.go b/pkg/utils/v1alpha1/kubernetes.go index e1a3bd15e..e2edf9006 100644 --- a/pkg/utils/v1alpha1/kubernetes.go +++ b/pkg/utils/v1alpha1/kubernetes.go @@ -86,6 +86,20 @@ func CreateCSIVolumeCR(csivol *apis.CSIVolume, nodeID, mountPath string) (err er return } +// UpdateCSIVolumeCR updates CSIVolume CR related to current nodeID +func UpdateCSIVolumeCR(csivol *apis.CSIVolume) error { + + oldcsivol, err := csivolume.NewKubeclient().WithNamespace(OpenEBSNamespace).Get(csivol.Name, v1.GetOptions{}) + if err != nil { + return err + } + oldcsivol.Spec.Volume.DevicePath = csivol.Spec.Volume.DevicePath + oldcsivol.Status = csivol.Status + + _, err = csivolume.NewKubeclient().WithNamespace(OpenEBSNamespace).Update(oldcsivol) + return err +} + // DeleteOldCSIVolumeCR deletes all CSIVolumes // related to this volume so that a new one // can be created with node as current nodeID @@ -100,7 +114,10 @@ func DeleteOldCSIVolumeCR(vol *apis.CSIVolume, nodeID string) (err error) { return } - // TODO Add description why multiple CRs can be there for one volume + // If a node goes down and kubernetes is unable to send an Unpublish request + // to this node, the CR is marked for deletion but finalizer is not removed + // and a new CR is created for current node. When the degraded node comes up + // it removes the finalizer and the CR is deleted. for _, csivol := range csivols.Items { if csivol.Labels["nodeID"] == nodeID { csivol.Finalizers = nil @@ -173,7 +190,13 @@ func FetchAndUpdateVolInfos(nodeID string) (err error) { } for _, csivol := range csivols.Items { + if !csivol.DeletionTimestamp.IsZero() { + continue + } vol := csivol + if vol.Status == apis.CSIVolumeStatusMountUnderProgress { + vol.Status = apis.CSIVolumeStatusUninitialized + } Volumes[csivol.Spec.Volume.Name] = &vol } diff --git a/pkg/volume/v1alpha1/build.go b/pkg/volume/v1alpha1/build.go index 6d5d2e3ba..21d39efd0 100644 --- a/pkg/volume/v1alpha1/build.go +++ b/pkg/volume/v1alpha1/build.go @@ -282,6 +282,53 @@ func (b *Builder) WithReadOnly(readOnly bool) *Builder { return b } +// WithLabels merges existing labels of csi volume if any +// with the ones that are provided here +func (b *Builder) WithLabels(labels map[string]string) *Builder { + if len(labels) == 0 { + b.errs = append( + b.errs, + errors.New( + "failed to build csi volume object: missing labels", + ), + ) + return b + } + + if b.volume.Object.Labels == nil { + return b.WithLabelsNew(labels) + } + + for key, value := range labels { + b.volume.Object.Labels[key] = value + } + return b +} + +// WithLabelsNew resets existing labels of csi volume if any with +// ones that are provided here +func (b *Builder) WithLabelsNew(labels map[string]string) *Builder { + if len(labels) == 0 { + b.errs = append( + b.errs, + errors.New( + "failed to build csi volume object: no new labels", + ), + ) + return b + } + + // copy of original map + newlbls := map[string]string{} + for key, value := range labels { + newlbls[key] = value + } + + // override + b.volume.Object.Labels = newlbls + return b +} + // Build returns csi volume API object func (b *Builder) Build() (*apis.CSIVolume, error) { if len(b.errs) > 0 {