From add2d0b6dd7487b6f0fddc393221896d29a8c2f1 Mon Sep 17 00:00:00 2001 From: Madhu Rajanna Date: Wed, 31 Jul 2024 15:56:40 +0200 Subject: [PATCH] rbd: implement volume group using go-ceph This adds the required functionality to call the go-ceph API's for the rbd volume group. Signed-off-by: Madhu Rajanna --- internal/csi-addons/rbd/replication.go | 16 +- internal/rbd/group/volume_group.go | 300 ++++++++++++++++++++++++- 2 files changed, 308 insertions(+), 8 deletions(-) diff --git a/internal/csi-addons/rbd/replication.go b/internal/csi-addons/rbd/replication.go index 93cb028746f5..38bcc87188e1 100644 --- a/internal/csi-addons/rbd/replication.go +++ b/internal/csi-addons/rbd/replication.go @@ -29,6 +29,7 @@ import ( csicommon "github.com/ceph/ceph-csi/internal/csi-common" "github.com/ceph/ceph-csi/internal/rbd" corerbd "github.com/ceph/ceph-csi/internal/rbd" + rbd_group "github.com/ceph/ceph-csi/internal/rbd/group" "github.com/ceph/ceph-csi/internal/rbd/types" "github.com/ceph/ceph-csi/internal/util" "github.com/ceph/ceph-csi/internal/util/log" @@ -785,13 +786,14 @@ func getGRPCError(err error) error { } errorStatusMap := map[error]codes.Code{ - corerbd.ErrImageNotFound: codes.NotFound, - util.ErrPoolNotFound: codes.NotFound, - corerbd.ErrInvalidArgument: codes.InvalidArgument, - corerbd.ErrFlattenInProgress: codes.Aborted, - corerbd.ErrAborted: codes.Aborted, - corerbd.ErrFailedPrecondition: codes.FailedPrecondition, - corerbd.ErrUnavailable: codes.Unavailable, + corerbd.ErrImageNotFound: codes.NotFound, + util.ErrPoolNotFound: codes.NotFound, + corerbd.ErrInvalidArgument: codes.InvalidArgument, + corerbd.ErrFlattenInProgress: codes.Aborted, + corerbd.ErrAborted: codes.Aborted, + corerbd.ErrFailedPrecondition: codes.FailedPrecondition, + corerbd.ErrUnavailable: codes.Unavailable, + rbd_group.ErrRBDGroupUnAvailable: codes.Unavailable, } for e, code := range errorStatusMap { diff --git a/internal/rbd/group/volume_group.go b/internal/rbd/group/volume_group.go index 9057a0079926..d4f8d869f452 100644 --- a/internal/rbd/group/volume_group.go +++ b/internal/rbd/group/volume_group.go @@ -21,9 +21,11 @@ import ( "errors" "fmt" "strings" + "time" "github.com/ceph/go-ceph/rados" librbd "github.com/ceph/go-ceph/rbd" + "github.com/ceph/go-ceph/rbd/admin" "github.com/container-storage-interface/spec/lib/go/csi" "github.com/csi-addons/spec/lib/go/volumegroup" @@ -33,7 +35,10 @@ import ( "github.com/ceph/ceph-csi/internal/util/log" ) -var ErrRBDGroupNotConnected = errors.New("RBD group is not connected") +var ( + ErrRBDGroupNotConnected = errors.New("RBD group is not connected") + ErrRBDGroupUnAvailable = errors.New("RBD group is unavailable") +) // volumeGroup handles all requests for 'rbd group' operations. type volumeGroup struct { @@ -469,3 +474,296 @@ func (vg *volumeGroup) ListVolumes(ctx context.Context) ([]types.Volume, error) func (vg *volumeGroup) ToMirror() (types.Mirror, error) { return vg, nil } + +func (vg *volumeGroup) EnableMirroring(ctx context.Context, mode librbd.ImageMirrorMode) error { + name, err := vg.GetName(ctx) + if err != nil { + return err + } + + ioctx, err := vg.GetIOContext(ctx) + if err != nil { + return err + } + + err = librbd.MirrorGroupEnable(ioctx, name, mode) + if err != nil { + return fmt.Errorf("failed to enable mirroring on volume group %q: %w", vg, err) + } + + log.DebugLog(ctx, "mirroring is enabled on the volume group %q", vg) + + return nil +} + +func (vg *volumeGroup) DisableMirroring(ctx context.Context, force bool) error { + name, err := vg.GetName(ctx) + if err != nil { + return err + } + + ioctx, err := vg.GetIOContext(ctx) + if err != nil { + return err + } + + err = librbd.MirrorGroupDisable(ioctx, name, force) + if err != nil && !errors.Is(rados.ErrNotFound, err) { + return fmt.Errorf("failed to disable mirroring on volume group %q: %w", vg, err) + } + + log.DebugLog(ctx, "mirroring is disabled on the volume group %q", vg) + + return nil +} + +func (vg *volumeGroup) Promote(ctx context.Context, force bool) error { + name, err := vg.GetName(ctx) + if err != nil { + return err + } + + ioctx, err := vg.GetIOContext(ctx) + if err != nil { + return err + } + + err = librbd.MirrorGroupPromote(ioctx, name, force) + if err != nil { + return fmt.Errorf("failed to promote volume group %q: %w", vg, err) + } + + log.DebugLog(ctx, "volume group %q has been promoted", vg) + + return nil +} + +func (vg *volumeGroup) ForcePromote(ctx context.Context, cr *util.Credentials) error { + promoteArgs := []string{ + "mirror", "group", "promote", + vg.String(), + "--force", + "--id", cr.ID, + "-m", vg.monitors, + "--keyfile=" + cr.KeyFile, + } + _, stderr, err := util.ExecCommandWithTimeout( + ctx, + // 2 minutes timeout as the Replication RPC timeout is 2.5 minutes. + 2*time.Minute, + "rbd", + promoteArgs..., + ) + if err != nil { + return fmt.Errorf("failed to promote group %q with error: %w", vg, err) + } + + if stderr != "" { + return fmt.Errorf("failed to promote group %q with stderror: %s", vg, stderr) + } + + log.DebugLog(ctx, "volume group %q has been force promoted", vg) + + return nil +} + +func (vg *volumeGroup) Demote(ctx context.Context) error { + name, err := vg.GetName(ctx) + if err != nil { + return err + } + + ioctx, err := vg.GetIOContext(ctx) + if err != nil { + return err + } + + err = librbd.MirrorGroupDemote(ioctx, name) + if err != nil { + return fmt.Errorf("failed to demote volume group %q: %w", vg, err) + } + + log.DebugLog(ctx, "volume group %q has been demoted", vg) + + return nil +} + +func (vg *volumeGroup) Resync(ctx context.Context) error { + name, err := vg.GetName(ctx) + if err != nil { + return err + } + + ioctx, err := vg.GetIOContext(ctx) + if err != nil { + return err + } + + err = librbd.MirrorGroupResync(ioctx, name) + if err != nil { + return fmt.Errorf("failed to resync volume group %q: %w", vg, err) + } + + log.DebugLog(ctx, "issued resync on volume group %q", vg) + // If we issued a resync, return a non-final error as image needs to be recreated + // locally. Caller retries till RBD syncs an initial version of the image to + // report its status in the resync request. + return fmt.Errorf("%w: awaiting initial resync due to split brain", ErrRBDGroupUnAvailable) +} + +func (vg *volumeGroup) GetMirroringInfo(ctx context.Context) (types.MirrorInfo, error) { + name, err := vg.GetName(ctx) + if err != nil { + return nil, err + } + + ioctx, err := vg.GetIOContext(ctx) + if err != nil { + return nil, err + } + + info, err := librbd.GetMirrorGroupInfo(ioctx, name) + if err != nil { + return nil, fmt.Errorf("failed to get volume group mirroring info %q: %w", vg, err) + } + + return GroupInfo{MirrorGroupInfo: info}, nil +} + +func (vg *volumeGroup) GetGlobalMirroringStatus(ctx context.Context) (types.GlobalStatus, error) { + name, err := vg.GetName(ctx) + if err != nil { + return nil, err + } + + ioctx, err := vg.GetIOContext(ctx) + if err != nil { + return nil, err + } + statusInfo, err := librbd.GetGlobalMirrorGroupStatus(ioctx, name) + if err != nil { + return nil, fmt.Errorf("failed to get volume group mirroring status %q: %w", vg, err) + } + + return GlobalMirrorGroupStatus{GlobalMirrorGroupStatus: &statusInfo}, nil +} + +func (vg *volumeGroup) AddSnapshotScheduling(interval admin.Interval, startTime admin.StartTime) error { + ls := admin.NewLevelSpec(vg.pool, vg.namespace, "") + ra, err := vg.conn.GetRBDAdmin() + if err != nil { + return err + } + adminConn := ra.MirrorSnashotSchedule() + err = adminConn.Add(ls, interval, startTime) + if err != nil { + return err + } + + return nil +} + +// GroupInfo is a wrapper around librbd.MirrorGroupInfo that contains the +// group mirror info. +type GroupInfo struct { + *librbd.MirrorGroupInfo +} + +func (info GroupInfo) GetState() string { + return info.State.String() +} + +func (info GroupInfo) IsPrimary() bool { + return info.Primary +} + +// GlobalGroupMirrorStatus is a wrapper around librbd.GlobalGroupMirrorImageStatus that contains the +// global mirror group status. +type GlobalMirrorGroupStatus struct { + *librbd.GlobalMirrorGroupStatus +} + +func (status GlobalMirrorGroupStatus) GetState() string { + return status.GlobalMirrorGroupStatus.Info.State.String() +} + +func (status GlobalMirrorGroupStatus) IsPrimary() bool { + return status.GlobalMirrorGroupStatus.Info.Primary +} + +func (status GlobalMirrorGroupStatus) GetLocalSiteStatus() (types.SiteStatus, error) { + s, err := status.GlobalMirrorGroupStatus.LocalStatus() + if err != nil { + err = fmt.Errorf("failed to get local site status: %w", err) + } + + return SiteMirrorGroupStatus{ + SiteMirrorGroupStatus: &s, + }, err +} + +func (status GlobalMirrorGroupStatus) GetAllSitesStatus() []types.SiteStatus { + var siteStatuses []types.SiteStatus + for i := range status.SiteStatuses { + siteStatuses = append(siteStatuses, SiteMirrorGroupStatus{SiteMirrorGroupStatus: &status.SiteStatuses[i]}) + } + + return siteStatuses +} + +// RemoteStatus returns one SiteMirrorGroupStatus item from the SiteStatuses +// slice that corresponds to the remote site's status. If the remote status +// is not found than the error ErrNotExist will be returned. +func (status GlobalMirrorGroupStatus) GetRemoteSiteStatus(ctx context.Context) (types.SiteStatus, error) { + var ( + ss librbd.SiteMirrorGroupStatus + err error = librbd.ErrNotExist + ) + + for i := range status.SiteStatuses { + log.DebugLog( + ctx, + "Site status of MirrorUUID: %s, state: %s, description: %s, lastUpdate: %v, up: %t", + status.SiteStatuses[i].MirrorUUID, + status.SiteStatuses[i].State, + status.SiteStatuses[i].Description, + status.SiteStatuses[i].LastUpdate, + status.SiteStatuses[i].Up) + + if status.SiteStatuses[i].MirrorUUID != "" { + ss = status.SiteStatuses[i] + err = nil + + break + } + } + + return SiteMirrorGroupStatus{SiteMirrorGroupStatus: &ss}, err +} + +// SiteMirrorGroupStatus is a wrapper around librbd.SiteMirrorGroupStatus that contains the +// site mirror group status. +type SiteMirrorGroupStatus struct { + *librbd.SiteMirrorGroupStatus +} + +func (status SiteMirrorGroupStatus) GetMirrorUUID() string { + return status.MirrorUUID +} + +func (status SiteMirrorGroupStatus) GetState() string { + return status.State.String() +} + +func (status SiteMirrorGroupStatus) GetDescription() string { + return status.Description +} + +func (status SiteMirrorGroupStatus) IsUP() bool { + return status.Up +} + +func (status SiteMirrorGroupStatus) GetLastUpdate() time.Time { + // convert the last update time to UTC + return time.Unix(status.LastUpdate, 0).UTC() +}