Skip to content

Commit

Permalink
cephfs: remove snapshot protect/unprotect
Browse files Browse the repository at this point in the history
This commit eliminates the code for protecting and unprotecting
snapshots, as the functionality to protect and unprotect snapshots
is being deprecated.

Signed-off-by: Praveen M <[email protected]>
  • Loading branch information
iPraveenParihar committed Oct 27, 2023
1 parent 0bfb324 commit 61478a2
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 185 deletions.
53 changes: 4 additions & 49 deletions internal/cephfs/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,7 @@ func (cs *ControllerServer) ControllerExpandVolume(
// CreateSnapshot creates the snapshot in backend and stores metadata
// in store
//
//nolint:gocognit,gocyclo,cyclop // golangci-lint did not catch this earlier, needs to get fixed late
//nolint:gocyclo,cyclop // golangci-lint did not catch this earlier, needs to get fixed late
func (cs *ControllerServer) CreateSnapshot(
ctx context.Context,
req *csi.CreateSnapshotRequest,
Expand Down Expand Up @@ -830,54 +830,19 @@ func (cs *ControllerServer) CreateSnapshot(
}
defer cs.VolumeLocks.Release(sourceVolID)
snapName := req.GetName()
sid, snapInfo, err := store.CheckSnapExists(ctx, parentVolOptions, cephfsSnap, cs.ClusterName, cs.SetMetadata, cr)
sid, _, err := store.CheckSnapExists(ctx, parentVolOptions, cephfsSnap, cs.ClusterName, cs.SetMetadata, cr)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}

// check are we able to retrieve the size of parent
// ceph fs subvolume info command got added in 14.2.10 and 15.+
// as we are not able to retrieve the parent size we are rejecting the
// request to create snapshot.
// TODO: For this purpose we could make use of cached clusterAdditionalInfo
// too.
volClient := core.NewSubVolume(parentVolOptions.GetConnection(), &parentVolOptions.SubVolume,
parentVolOptions.ClusterID, cs.ClusterName, cs.SetMetadata)
info, err := volClient.GetSubVolumeInfo(ctx)
if err != nil {
// Check error code value against ErrInvalidCommand to understand the cluster
// support it or not, It's safe to evaluate as the filtering
// is already done from GetSubVolumeInfo() and send out the error here.
if errors.Is(err, cerrors.ErrInvalidCommand) {
return nil, status.Error(
codes.FailedPrecondition,
"subvolume info command not supported in current ceph cluster")
}
if sid != nil {
errDefer := store.UndoSnapReservation(ctx, parentVolOptions, *sid, snapName, cr)
if errDefer != nil {
log.WarningLog(ctx, "failed undoing reservation of snapshot: %s (%s)",
requestName, errDefer)
}
}

return nil, status.Error(codes.Internal, err.Error())
}

metadata := k8s.GetSnapshotMetadata(req.GetParameters())
if sid != nil {
// check snapshot is protected
protected := true
snapClient := core.NewSnapshot(parentVolOptions.GetConnection(), sid.FsSnapshotName,
parentVolOptions.ClusterID, cs.ClusterName, cs.SetMetadata, &parentVolOptions.SubVolume)
if !(snapInfo.Protected == core.SnapshotIsProtected) {
err = snapClient.ProtectSnapshot(ctx)
if err != nil {
protected = false
log.WarningLog(ctx, "failed to protect snapshot of snapshot: %s (%s)",
sid.FsSnapshotName, err)
}
}

// Update snapshot-name/snapshot-namespace/snapshotcontent-name details on
// subvolume snapshot as metadata in case snapshot already exist
Expand All @@ -894,7 +859,7 @@ func (cs *ControllerServer) CreateSnapshot(
SnapshotId: sid.SnapshotID,
SourceVolumeId: req.GetSourceVolumeId(),
CreationTime: sid.CreationTime,
ReadyToUse: protected,
ReadyToUse: true,
},
}, nil
}
Expand Down Expand Up @@ -968,10 +933,6 @@ func (cs *ControllerServer) doSnapshot(
return snap, fmt.Errorf("failed to get snapshot info for snapshot:%s", snapID)
}
snap.CreationTime = timestamppb.New(snap.CreatedAt)
err = snapClient.ProtectSnapshot(ctx)
if err != nil {
log.ErrorLog(ctx, "failed to protect snapshot %s %v", snapID, err)
}

// Set snapshot-name/snapshot-namespace/snapshotcontent-name details
// on subvolume snapshot as metadata on create
Expand Down Expand Up @@ -1007,7 +968,7 @@ func (cs *ControllerServer) validateSnapshotReq(ctx context.Context, req *csi.Cr
// DeleteSnapshot deletes the snapshot in backend and removes the
// snapshot metadata from store.
//
//nolint:gocyclo,cyclop // TODO: reduce complexity
//nolint:cyclop // TODO: reduce complexity
func (cs *ControllerServer) DeleteSnapshot(
ctx context.Context,
req *csi.DeleteSnapshotRequest,
Expand Down Expand Up @@ -1102,12 +1063,6 @@ func (cs *ControllerServer) DeleteSnapshot(
}
snapClient := core.NewSnapshot(volOpt.GetConnection(), sid.FsSnapshotName,
volOpt.ClusterID, cs.ClusterName, cs.SetMetadata, &volOpt.SubVolume)
if snapInfo.Protected == core.SnapshotIsProtected {
err = snapClient.UnprotectSnapshot(ctx)
if err != nil {
return nil, status.Error(codes.Internal, err.Error())
}
}

needsDelete, err := store.UnrefSelfInSnapshotBackedVolumes(ctx, volOpt, sid.SnapshotID)
if err != nil {
Expand Down
60 changes: 5 additions & 55 deletions internal/cephfs/core/clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package core

import (
"context"
"errors"
"fmt"

cerrors "github.com/ceph/ceph-csi/internal/cephfs/errors"
Expand Down Expand Up @@ -73,43 +72,20 @@ func (s *subVolumeClient) CreateCloneFromSubvolume(

return err
}
var (
// if protectErr is not nil we will delete the snapshot as the protect fails
protectErr error
// if cloneErr is not nil we will unprotect the snapshot and delete the snapshot
cloneErr error
)
defer func() {
if protectErr != nil {
err = snapClient.DeleteSnapshot(ctx)
if err != nil {
log.ErrorLog(ctx, "failed to delete snapshot %s %v", snapshotID, err)
}
}

// if cloneErr is not nil we will unprotect the snapshot and delete the snapshot
var cloneErr error

defer func() {
if cloneErr != nil {
if err = s.PurgeVolume(ctx, true); err != nil {
log.ErrorLog(ctx, "failed to delete volume %s: %v", s.VolID, err)
}
if err = snapClient.UnprotectSnapshot(ctx); err != nil {
// In case the snap is already unprotected we get ErrSnapProtectionExist error code
// in that case we are safe and we could discard this error and we are good to go
// ahead with deletion
if !errors.Is(err, cerrors.ErrSnapProtectionExist) {
log.ErrorLog(ctx, "failed to unprotect snapshot %s %v", snapshotID, err)
}
}
if err = snapClient.DeleteSnapshot(ctx); err != nil {
log.ErrorLog(ctx, "failed to delete snapshot %s %v", snapshotID, err)
}
}
}()
protectErr = snapClient.ProtectSnapshot(ctx)
if protectErr != nil {
log.ErrorLog(ctx, "failed to protect snapshot %s %v", snapshotID, protectErr)

return protectErr
}
cloneErr = snapClient.CloneSnapshot(ctx, s.SubVolume)
if cloneErr != nil {
log.ErrorLog(ctx, "failed to clone snapshot %s %s to %s %v", parentvolOpt.VolID, snapshotID, s.VolID, cloneErr)
Expand Down Expand Up @@ -139,16 +115,6 @@ func (s *subVolumeClient) CreateCloneFromSubvolume(
}

// As we completed clone, remove the intermediate snap
if err = snapClient.UnprotectSnapshot(ctx); err != nil {
// In case the snap is already unprotected we get ErrSnapProtectionExist error code
// in that case we are safe and we could discard this error and we are good to go
// ahead with deletion
if !errors.Is(err, cerrors.ErrSnapProtectionExist) {
log.ErrorLog(ctx, "failed to unprotect snapshot %s %v", snapshotID, err)

return err
}
}
if err = snapClient.DeleteSnapshot(ctx); err != nil {
log.ErrorLog(ctx, "failed to delete snapshot %s %v", snapshotID, err)

Expand All @@ -166,24 +132,8 @@ func (s *subVolumeClient) CleanupSnapshotFromSubvolume(
// identified during PVC-PVC cloning.
snapShotID := s.VolID
snapClient := NewSnapshot(s.conn, snapShotID, s.clusterID, s.clusterName, s.enableMetadata, parentVol)
snapInfo, err := snapClient.GetSnapshotInfo(ctx)
if err != nil {
if errors.Is(err, cerrors.ErrSnapNotFound) {
return nil
}

return err
}

if snapInfo.Protected == SnapshotIsProtected {
err = snapClient.UnprotectSnapshot(ctx)
if err != nil {
log.ErrorLog(ctx, "failed to unprotect snapshot %s %v", snapShotID, err)

return err
}
}
err = snapClient.DeleteSnapshot(ctx)
err := snapClient.DeleteSnapshot(ctx)
if err != nil {
log.ErrorLog(ctx, "failed to delete snapshot %s %v", snapShotID, err)

Expand Down
81 changes: 0 additions & 81 deletions internal/cephfs/core/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,6 @@ import (
"github.com/golang/protobuf/ptypes/timestamp"
)

// autoProtect points to the snapshot auto-protect feature of
// the subvolume.
const (
autoProtect = "snapshot-autoprotect"
)

// SnapshotClient is the interface that holds the signature of snapshot methods
// that interacts with CephFS snapshot API's.
type SnapshotClient interface {
Expand All @@ -45,10 +39,6 @@ type SnapshotClient interface {
DeleteSnapshot(ctx context.Context) error
// GetSnapshotInfo returns the snapshot info of the subvolume.
GetSnapshotInfo(ctx context.Context) (SnapshotInfo, error)
// ProtectSnapshot protects the snapshot of the subvolume.
ProtectSnapshot(ctx context.Context) error
// UnprotectSnapshot unprotects the snapshot of the subvolume.
UnprotectSnapshot(ctx context.Context) error
// CloneSnapshot clones the snapshot of the subvolume.
CloneSnapshot(ctx context.Context, cloneVolOptions *SubVolume) error
// SetAllSnapshotMetadata set all the metadata from arg parameters on
Expand Down Expand Up @@ -139,7 +129,6 @@ type SnapshotInfo struct {
CreatedAt time.Time
CreationTime *timestamp.Timestamp
HasPendingClones string
Protected string
}

// GetSnapshotInfo returns the snapshot info of the subvolume.
Expand Down Expand Up @@ -169,80 +158,10 @@ func (s *snapshotClient) GetSnapshotInfo(ctx context.Context) (SnapshotInfo, err
}
snap.CreatedAt = info.CreatedAt.Time
snap.HasPendingClones = info.HasPendingClones
snap.Protected = info.Protected

return snap, nil
}

// ProtectSnapshot protects the snapshot of the subvolume.
func (s *snapshotClient) ProtectSnapshot(ctx context.Context) error {
// If "snapshot-autoprotect" feature is present, The ProtectSnapshot
// call should be treated as a no-op.
if checkSubvolumeHasFeature(autoProtect, s.Features) {
return nil
}
fsa, err := s.conn.GetFSAdmin()
if err != nil {
log.ErrorLog(ctx, "could not get FSAdmin: %s", err)

return err
}

err = fsa.ProtectSubVolumeSnapshot(s.FsName, s.SubvolumeGroup, s.VolID, s.SnapshotID)
if err != nil {
if errors.Is(err, rados.ErrObjectExists) {
return nil
}
log.ErrorLog(
ctx,
"failed to protect subvolume snapshot %s %s in fs %s with error: %s",
s.VolID,
s.SnapshotID,
s.FsName,
err)

return err
}

return nil
}

// UnprotectSnapshot unprotects the snapshot of the subvolume.
func (s *snapshotClient) UnprotectSnapshot(ctx context.Context) error {
// If "snapshot-autoprotect" feature is present, The UnprotectSnapshot
// call should be treated as a no-op.
if checkSubvolumeHasFeature(autoProtect, s.Features) {
return nil
}
fsa, err := s.conn.GetFSAdmin()
if err != nil {
log.ErrorLog(ctx, "could not get FSAdmin: %s", err)

return err
}

err = fsa.UnprotectSubVolumeSnapshot(s.FsName, s.SubvolumeGroup, s.VolID,
s.SnapshotID)
if err != nil {
// In case the snap is already unprotected we get ErrSnapProtectionExist error code
// in that case we are safe and we could discard this error.
if errors.Is(err, rados.ErrObjectExists) {
return nil
}
log.ErrorLog(
ctx,
"failed to unprotect subvolume snapshot %s %s in fs %s with error: %s",
s.VolID,
s.SnapshotID,
s.FsName,
err)

return err
}

return nil
}

// CloneSnapshot clones the snapshot of the subvolume.
func (s *snapshotClient) CloneSnapshot(
ctx context.Context,
Expand Down

0 comments on commit 61478a2

Please sign in to comment.