Skip to content

Commit

Permalink
rbd: implement volume group using go-ceph
Browse files Browse the repository at this point in the history
This adds the required functionality to
call the go-ceph API's for the rbd
volume group.

Signed-off-by: Madhu Rajanna <[email protected]>
  • Loading branch information
Madhu-1 committed Aug 1, 2024
1 parent d6bf1d8 commit add2d0b
Show file tree
Hide file tree
Showing 2 changed files with 308 additions and 8 deletions.
16 changes: 9 additions & 7 deletions internal/csi-addons/rbd/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
csicommon "github.com/ceph/ceph-csi/internal/csi-common"
"github.com/ceph/ceph-csi/internal/rbd"
corerbd "github.com/ceph/ceph-csi/internal/rbd"
rbd_group "github.com/ceph/ceph-csi/internal/rbd/group"
"github.com/ceph/ceph-csi/internal/rbd/types"
"github.com/ceph/ceph-csi/internal/util"
"github.com/ceph/ceph-csi/internal/util/log"
Expand Down Expand Up @@ -785,13 +786,14 @@ func getGRPCError(err error) error {
}

errorStatusMap := map[error]codes.Code{
corerbd.ErrImageNotFound: codes.NotFound,
util.ErrPoolNotFound: codes.NotFound,
corerbd.ErrInvalidArgument: codes.InvalidArgument,
corerbd.ErrFlattenInProgress: codes.Aborted,
corerbd.ErrAborted: codes.Aborted,
corerbd.ErrFailedPrecondition: codes.FailedPrecondition,
corerbd.ErrUnavailable: codes.Unavailable,
corerbd.ErrImageNotFound: codes.NotFound,
util.ErrPoolNotFound: codes.NotFound,
corerbd.ErrInvalidArgument: codes.InvalidArgument,
corerbd.ErrFlattenInProgress: codes.Aborted,
corerbd.ErrAborted: codes.Aborted,
corerbd.ErrFailedPrecondition: codes.FailedPrecondition,
corerbd.ErrUnavailable: codes.Unavailable,
rbd_group.ErrRBDGroupUnAvailable: codes.Unavailable,
}

for e, code := range errorStatusMap {
Expand Down
300 changes: 299 additions & 1 deletion internal/rbd/group/volume_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (
"errors"
"fmt"
"strings"
"time"

"github.com/ceph/go-ceph/rados"
librbd "github.com/ceph/go-ceph/rbd"
"github.com/ceph/go-ceph/rbd/admin"
"github.com/container-storage-interface/spec/lib/go/csi"
"github.com/csi-addons/spec/lib/go/volumegroup"

Expand All @@ -33,7 +35,10 @@ import (
"github.com/ceph/ceph-csi/internal/util/log"
)

var ErrRBDGroupNotConnected = errors.New("RBD group is not connected")
var (
ErrRBDGroupNotConnected = errors.New("RBD group is not connected")
ErrRBDGroupUnAvailable = errors.New("RBD group is unavailable")
)

// volumeGroup handles all requests for 'rbd group' operations.
type volumeGroup struct {
Expand Down Expand Up @@ -469,3 +474,296 @@ func (vg *volumeGroup) ListVolumes(ctx context.Context) ([]types.Volume, error)
func (vg *volumeGroup) ToMirror() (types.Mirror, error) {
return vg, nil
}

func (vg *volumeGroup) EnableMirroring(ctx context.Context, mode librbd.ImageMirrorMode) error {
name, err := vg.GetName(ctx)
if err != nil {
return err
}

ioctx, err := vg.GetIOContext(ctx)
if err != nil {
return err
}

err = librbd.MirrorGroupEnable(ioctx, name, mode)
if err != nil {
return fmt.Errorf("failed to enable mirroring on volume group %q: %w", vg, err)
}

log.DebugLog(ctx, "mirroring is enabled on the volume group %q", vg)

return nil
}

func (vg *volumeGroup) DisableMirroring(ctx context.Context, force bool) error {
name, err := vg.GetName(ctx)
if err != nil {
return err
}

ioctx, err := vg.GetIOContext(ctx)
if err != nil {
return err
}

err = librbd.MirrorGroupDisable(ioctx, name, force)
if err != nil && !errors.Is(rados.ErrNotFound, err) {
return fmt.Errorf("failed to disable mirroring on volume group %q: %w", vg, err)
}

log.DebugLog(ctx, "mirroring is disabled on the volume group %q", vg)

return nil
}

func (vg *volumeGroup) Promote(ctx context.Context, force bool) error {
name, err := vg.GetName(ctx)
if err != nil {
return err
}

ioctx, err := vg.GetIOContext(ctx)
if err != nil {
return err
}

err = librbd.MirrorGroupPromote(ioctx, name, force)
if err != nil {
return fmt.Errorf("failed to promote volume group %q: %w", vg, err)
}

log.DebugLog(ctx, "volume group %q has been promoted", vg)

return nil
}

func (vg *volumeGroup) ForcePromote(ctx context.Context, cr *util.Credentials) error {
promoteArgs := []string{
"mirror", "group", "promote",
vg.String(),
"--force",
"--id", cr.ID,
"-m", vg.monitors,
"--keyfile=" + cr.KeyFile,
}
_, stderr, err := util.ExecCommandWithTimeout(
ctx,
// 2 minutes timeout as the Replication RPC timeout is 2.5 minutes.
2*time.Minute,
"rbd",
promoteArgs...,
)
if err != nil {
return fmt.Errorf("failed to promote group %q with error: %w", vg, err)
}

if stderr != "" {
return fmt.Errorf("failed to promote group %q with stderror: %s", vg, stderr)
}

log.DebugLog(ctx, "volume group %q has been force promoted", vg)

return nil
}

func (vg *volumeGroup) Demote(ctx context.Context) error {
name, err := vg.GetName(ctx)
if err != nil {
return err
}

ioctx, err := vg.GetIOContext(ctx)
if err != nil {
return err
}

err = librbd.MirrorGroupDemote(ioctx, name)
if err != nil {
return fmt.Errorf("failed to demote volume group %q: %w", vg, err)
}

log.DebugLog(ctx, "volume group %q has been demoted", vg)

return nil
}

func (vg *volumeGroup) Resync(ctx context.Context) error {
name, err := vg.GetName(ctx)
if err != nil {
return err
}

ioctx, err := vg.GetIOContext(ctx)
if err != nil {
return err
}

err = librbd.MirrorGroupResync(ioctx, name)
if err != nil {
return fmt.Errorf("failed to resync volume group %q: %w", vg, err)
}

log.DebugLog(ctx, "issued resync on volume group %q", vg)
// If we issued a resync, return a non-final error as image needs to be recreated
// locally. Caller retries till RBD syncs an initial version of the image to
// report its status in the resync request.
return fmt.Errorf("%w: awaiting initial resync due to split brain", ErrRBDGroupUnAvailable)
}

func (vg *volumeGroup) GetMirroringInfo(ctx context.Context) (types.MirrorInfo, error) {
name, err := vg.GetName(ctx)
if err != nil {
return nil, err
}

ioctx, err := vg.GetIOContext(ctx)
if err != nil {
return nil, err
}

info, err := librbd.GetMirrorGroupInfo(ioctx, name)
if err != nil {
return nil, fmt.Errorf("failed to get volume group mirroring info %q: %w", vg, err)
}

return GroupInfo{MirrorGroupInfo: info}, nil
}

func (vg *volumeGroup) GetGlobalMirroringStatus(ctx context.Context) (types.GlobalStatus, error) {
name, err := vg.GetName(ctx)
if err != nil {
return nil, err
}

ioctx, err := vg.GetIOContext(ctx)
if err != nil {
return nil, err
}
statusInfo, err := librbd.GetGlobalMirrorGroupStatus(ioctx, name)
if err != nil {
return nil, fmt.Errorf("failed to get volume group mirroring status %q: %w", vg, err)
}

return GlobalMirrorGroupStatus{GlobalMirrorGroupStatus: &statusInfo}, nil
}

func (vg *volumeGroup) AddSnapshotScheduling(interval admin.Interval, startTime admin.StartTime) error {
ls := admin.NewLevelSpec(vg.pool, vg.namespace, "")
ra, err := vg.conn.GetRBDAdmin()
if err != nil {
return err
}
adminConn := ra.MirrorSnashotSchedule()
err = adminConn.Add(ls, interval, startTime)
if err != nil {
return err
}

return nil
}

// GroupInfo is a wrapper around librbd.MirrorGroupInfo that contains the
// group mirror info.
type GroupInfo struct {
*librbd.MirrorGroupInfo
}

func (info GroupInfo) GetState() string {
return info.State.String()
}

func (info GroupInfo) IsPrimary() bool {
return info.Primary
}

// GlobalGroupMirrorStatus is a wrapper around librbd.GlobalGroupMirrorImageStatus that contains the
// global mirror group status.
type GlobalMirrorGroupStatus struct {
*librbd.GlobalMirrorGroupStatus
}

func (status GlobalMirrorGroupStatus) GetState() string {
return status.GlobalMirrorGroupStatus.Info.State.String()
}

func (status GlobalMirrorGroupStatus) IsPrimary() bool {
return status.GlobalMirrorGroupStatus.Info.Primary
}

func (status GlobalMirrorGroupStatus) GetLocalSiteStatus() (types.SiteStatus, error) {
s, err := status.GlobalMirrorGroupStatus.LocalStatus()
if err != nil {
err = fmt.Errorf("failed to get local site status: %w", err)
}

return SiteMirrorGroupStatus{
SiteMirrorGroupStatus: &s,
}, err
}

func (status GlobalMirrorGroupStatus) GetAllSitesStatus() []types.SiteStatus {
var siteStatuses []types.SiteStatus
for i := range status.SiteStatuses {
siteStatuses = append(siteStatuses, SiteMirrorGroupStatus{SiteMirrorGroupStatus: &status.SiteStatuses[i]})
}

return siteStatuses
}

// RemoteStatus returns one SiteMirrorGroupStatus item from the SiteStatuses
// slice that corresponds to the remote site's status. If the remote status
// is not found than the error ErrNotExist will be returned.
func (status GlobalMirrorGroupStatus) GetRemoteSiteStatus(ctx context.Context) (types.SiteStatus, error) {
var (
ss librbd.SiteMirrorGroupStatus
err error = librbd.ErrNotExist
)

for i := range status.SiteStatuses {
log.DebugLog(
ctx,
"Site status of MirrorUUID: %s, state: %s, description: %s, lastUpdate: %v, up: %t",
status.SiteStatuses[i].MirrorUUID,
status.SiteStatuses[i].State,
status.SiteStatuses[i].Description,
status.SiteStatuses[i].LastUpdate,
status.SiteStatuses[i].Up)

if status.SiteStatuses[i].MirrorUUID != "" {
ss = status.SiteStatuses[i]
err = nil

break
}
}

return SiteMirrorGroupStatus{SiteMirrorGroupStatus: &ss}, err
}

// SiteMirrorGroupStatus is a wrapper around librbd.SiteMirrorGroupStatus that contains the
// site mirror group status.
type SiteMirrorGroupStatus struct {
*librbd.SiteMirrorGroupStatus
}

func (status SiteMirrorGroupStatus) GetMirrorUUID() string {
return status.MirrorUUID
}

func (status SiteMirrorGroupStatus) GetState() string {
return status.State.String()
}

func (status SiteMirrorGroupStatus) GetDescription() string {
return status.Description
}

func (status SiteMirrorGroupStatus) IsUP() bool {
return status.Up
}

func (status SiteMirrorGroupStatus) GetLastUpdate() time.Time {
// convert the last update time to UTC
return time.Unix(status.LastUpdate, 0).UTC()
}

0 comments on commit add2d0b

Please sign in to comment.