Skip to content

Commit

Permalink
Fix(migrate): support migrate chunkserver successed
Browse files Browse the repository at this point in the history
Signed-off-by: caoxianfei1 <[email protected]>
  • Loading branch information
caoxianfei1 committed Dec 10, 2023
1 parent 9d83a9c commit 503f6ff
Show file tree
Hide file tree
Showing 14 changed files with 625 additions and 57 deletions.
4 changes: 1 addition & 3 deletions cli/command/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,6 @@ func genDeployPlaybook(curveadm *cli.CurveAdm,
Name: options.poolset,
Type: options.poolsetDiskType,
}
diskType := options.poolsetDiskType

pb := playbook.NewPlaybook(curveadm)
for _, step := range steps {
Expand All @@ -255,8 +254,7 @@ func genDeployPlaybook(curveadm *cli.CurveAdm,
options[comm.KEY_NUMBER_OF_CHUNKSERVER] = calcNumOfChunkserver(curveadm, dcs)
} else if step == CREATE_LOGICAL_POOL {
options[comm.KEY_CREATE_POOL_TYPE] = comm.POOL_TYPE_LOGICAL
options[comm.POOLSET] = poolset
options[comm.POOLSET_DISK_TYPE] = diskType
options[comm.KEY_POOLSET] = poolset
options[comm.KEY_NUMBER_OF_CHUNKSERVER] = calcNumOfChunkserver(curveadm, dcs)
}

Expand Down
124 changes: 93 additions & 31 deletions cli/command/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ import (
"github.com/opencurve/curveadm/internal/configure/topology"
"github.com/opencurve/curveadm/internal/errno"
"github.com/opencurve/curveadm/internal/playbook"
tui "github.com/opencurve/curveadm/internal/tui/common"
"github.com/opencurve/curveadm/internal/task/task/common"
tui "github.com/opencurve/curveadm/internal/tui"
tuicomm "github.com/opencurve/curveadm/internal/tui/common"

cliutil "github.com/opencurve/curveadm/internal/utils"
"github.com/spf13/cobra"
)
Expand Down Expand Up @@ -71,14 +74,12 @@ var (
// chunkserevr (curvebs)
MIGRATE_CHUNKSERVER_STEPS = []int{
playbook.BACKUP_ETCD_DATA,
playbook.STOP_SERVICE,
playbook.CLEAN_SERVICE, // only container
playbook.CREATE_PHYSICAL_POOL, // add machine that migrate to
playbook.PULL_IMAGE,
playbook.CREATE_CONTAINER,
playbook.SYNC_CONFIG,
playbook.CREATE_PHYSICAL_POOL,
playbook.START_CHUNKSERVER,
playbook.CREATE_LOGICAL_POOL,
playbook.MARK_SERVER_PENGDDING,
}

// metaserver (curvefs)
Expand All @@ -100,12 +101,25 @@ var (
topology.ROLE_SNAPSHOTCLONE: MIGRATE_SNAPSHOTCLONE_STEPS,
topology.ROLE_METASERVER: MIGRATE_METASERVER_STEPS,
}

MIGRATE_POST_CLEAN_STEPS = []int{
playbook.STOP_SERVICE,
playbook.CLEAN_SERVICE, // only container
playbook.CREATE_PHYSICAL_POOL, // remove machine that migrate from, only for chunkserver or metaserver
playbook.UPDATE_TOPOLOGY,
}

GET_MIGRATE_STATUS = []int{
playbook.GET_MIGRATE_STATUS,
}
)

type migrateOptions struct {
filename string
poolset string
poolsetDiskType string
showStatus bool
clean bool
}

func NewMigrateCommand(curveadm *cli.CurveAdm) *cobra.Command {
Expand All @@ -125,7 +139,8 @@ func NewMigrateCommand(curveadm *cli.CurveAdm) *cobra.Command {
flags := cmd.Flags()
flags.StringVar(&options.poolset, "poolset", "default", "Specify the poolset")
flags.StringVar(&options.poolsetDiskType, "poolset-disktype", "ssd", "Specify the disk type of physical pool")

flags.BoolVar(&options.showStatus, "status", false, "Show copyset transferring status")
flags.BoolVar(&options.clean, "clean", false, "Clean migrated environment for chunkserver or metaserver")
return cmd
}

Expand Down Expand Up @@ -191,8 +206,21 @@ func genMigratePlaybook(curveadm *cli.CurveAdm,
migrates := getMigrates(curveadm, data)
role := migrates[0].From.GetRole()
steps := MIGRATE_ROLE_STEPS[role]
poolset := options.poolset
poolsetDiskType := options.poolsetDiskType

// show status
if options.showStatus {
steps = GET_MIGRATE_STATUS
}

// post clean
if options.clean {
steps = MIGRATE_POST_CLEAN_STEPS
}

poolset := configure.Poolset{
Name: options.poolset,
Type: options.poolsetDiskType,
}

pb := playbook.NewPlaybook(curveadm)
for _, step := range steps {
Expand All @@ -204,36 +232,40 @@ func genMigratePlaybook(curveadm *cli.CurveAdm,
config = dcs2del
case playbook.BACKUP_ETCD_DATA:
config = curveadm.FilterDeployConfigByRole(dcs, topology.ROLE_ETCD)
case CREATE_PHYSICAL_POOL,
CREATE_LOGICAL_POOL:
case
playbook.CREATE_PHYSICAL_POOL,
playbook.CREATE_LOGICAL_POOL,
playbook.MARK_SERVER_PENGDDING,
playbook.GET_MIGRATE_STATUS:
config = curveadm.FilterDeployConfigByRole(dcs, topology.ROLE_MDS)[:1]
}

// options
options := map[string]interface{}{}
optionsKV := map[string]interface{}{}
switch step {
case playbook.CLEAN_SERVICE:
options[comm.KEY_CLEAN_ITEMS] = []string{comm.CLEAN_ITEM_CONTAINER}
options[comm.KEY_CLEAN_BY_RECYCLE] = true
optionsKV[comm.KEY_CLEAN_ITEMS] = []string{comm.CLEAN_ITEM_CONTAINER}
optionsKV[comm.KEY_CLEAN_BY_RECYCLE] = true
optionsKV[comm.KEY_REMOVE_MIGRATED_SERVER] = true
case playbook.CREATE_PHYSICAL_POOL:
options[comm.KEY_CREATE_POOL_TYPE] = comm.POOL_TYPE_PHYSICAL
options[comm.KEY_MIGRATE_SERVERS] = migrates
options[comm.POOLSET] = poolset
options[comm.POOLSET_DISK_TYPE] = poolsetDiskType
optionsKV[comm.KEY_CREATE_POOL_TYPE] = comm.POOL_TYPE_PHYSICAL
optionsKV[comm.KEY_MIGRATE_SERVERS] = migrates
optionsKV[comm.KEY_POOLSET] = poolset
case playbook.CREATE_LOGICAL_POOL:
options[comm.KEY_CREATE_POOL_TYPE] = comm.POOL_TYPE_LOGICAL
options[comm.KEY_MIGRATE_SERVERS] = migrates
options[comm.KEY_NEW_TOPOLOGY_DATA] = data
options[comm.POOLSET] = poolset
options[comm.POOLSET_DISK_TYPE] = poolsetDiskType
optionsKV[comm.KEY_CREATE_POOL_TYPE] = comm.POOL_TYPE_LOGICAL
optionsKV[comm.KEY_MIGRATE_SERVERS] = migrates
optionsKV[comm.KEY_NEW_TOPOLOGY_DATA] = data
optionsKV[comm.KEY_POOLSET] = poolset
case playbook.UPDATE_TOPOLOGY:
options[comm.KEY_NEW_TOPOLOGY_DATA] = data
optionsKV[comm.KEY_NEW_TOPOLOGY_DATA] = data
case playbook.GET_MIGRATE_STATUS:
optionsKV[comm.KEY_MIGRATE_SERVERS] = migrates
}

pb.AddStep(&playbook.PlaybookStep{
Type: step,
Configs: config,
Options: options,
Options: optionsKV,
ExecOptions: playbook.ExecOptions{
SilentSubBar: step == playbook.UPDATE_TOPOLOGY,
},
Expand All @@ -252,6 +284,23 @@ func displayMigrateTitle(curveadm *cli.CurveAdm, data string) {
curveadm.WriteOutln(color.YellowString(" - Migrate host: from %s to %s", from.GetHost(), to.GetHost()))
}

func displayMigrateStatus(curveadm *cli.CurveAdm) {
var output string
statuses := []common.MigrateStatus{}
v := curveadm.MemStorage().Get(comm.KEY_MIGRATE_STATUS)
if v != nil {
m := v.(map[string]common.MigrateStatus)
for _, status := range m {
statuses = append(statuses, status)
}
}

output = tui.FormatMigrateStatus(statuses)

curveadm.WriteOutln("")
curveadm.WriteOut("%s", output)
}

func runMigrate(curveadm *cli.CurveAdm, options migrateOptions) error {
// TODO(P0): added prechek for target host
// 1) parse cluster topology
Expand All @@ -261,7 +310,11 @@ func runMigrate(curveadm *cli.CurveAdm, options migrateOptions) error {
}

// 2) read topology from file
data, err := readTopology(curveadm, options.filename)
data, err := readTopology(curveadm,
options.filename,
options.showStatus,
options.clean,
)
if err != nil {
return err
}
Expand All @@ -272,13 +325,15 @@ func runMigrate(curveadm *cli.CurveAdm, options migrateOptions) error {
return err
}

// 4) display title
displayMigrateTitle(curveadm, data)
if !options.showStatus && !options.clean {
// 4) display title
displayMigrateTitle(curveadm, data)

// 5) confirm by user
if pass := tui.ConfirmYes(tui.DEFAULT_CONFIRM_PROMPT); !pass {
curveadm.WriteOutln(tui.PromptCancelOpetation("migrate service"))
return errno.ERR_CANCEL_OPERATION
// 5) confirm by user
if pass := tuicomm.ConfirmYes(tuicomm.DEFAULT_CONFIRM_PROMPT); !pass {
curveadm.WriteOutln(tuicomm.PromptCancelOpetation("migrate service"))
return errno.ERR_CANCEL_OPERATION
}
}

// 6) generate migrate playbook
Expand All @@ -294,6 +349,13 @@ func runMigrate(curveadm *cli.CurveAdm, options migrateOptions) error {
}

// 9) print success prompt
if options.showStatus {
displayMigrateStatus(curveadm)
return nil
}
if options.clean {
return nil
}
curveadm.WriteOutln("")
curveadm.WriteOutln(color.GreenString("Services successfully migrateed ^_^."))
// TODO(P1): warning iff there is changed configs
Expand Down
8 changes: 5 additions & 3 deletions cli/command/scale_out.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func NewScaleOutCommand(curveadm *cli.CurveAdm) *cobra.Command {
return cmd
}

func readTopology(curveadm *cli.CurveAdm, filename string) (string, error) {
func readTopology(curveadm *cli.CurveAdm, filename string, showStatus bool, clean bool) (string, error) {
if !utils.PathExist(filename) {
return "", errno.ERR_TOPOLOGY_FILE_NOT_FOUND.
F("%s: no such file", utils.AbsPath(filename))
Expand All @@ -156,7 +156,9 @@ func readTopology(curveadm *cli.CurveAdm, filename string) (string, error) {
}

oldData := curveadm.ClusterTopologyData()
curveadm.WriteOut("%s", utils.Diff(oldData, data))
if !showStatus && !clean {
curveadm.WriteOut("%s", utils.Diff(oldData, data))
}
return data, nil
}

Expand Down Expand Up @@ -384,7 +386,7 @@ func runScaleOut(curveadm *cli.CurveAdm, options scaleOutOptions) error {
}

// 2) read topology from file
data, err := readTopology(curveadm, options.filename)
data, err := readTopology(curveadm, options.filename, false, false)
if err != nil {
return err
}
Expand Down
17 changes: 11 additions & 6 deletions internal/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ const (
// format
KEY_ALL_FORMAT_STATUS = "ALL_FORMAT_STATUS"

// migrate
KEY_MIGRATE_STATUS = "MIGRATE_STATUS"
KEY_MIGRATE_COMMON_STATUS = "MIGRATE_COMMON_STATUS"

// check
KEY_CHECK_WITH_WEAK = "CHECK_WITH_WEAK"
KEY_CHECK_KERNEL_MODULE_NAME = "CHECK_KERNEL_MODULE_NAME"
Expand All @@ -71,12 +75,13 @@ const (
SERVICE_STATUS_UNKNOWN = "Unknown"

// clean
KEY_CLEAN_ITEMS = "CLEAN_ITEMS"
KEY_CLEAN_BY_RECYCLE = "CLEAN_BY_RECYCLE"
CLEAN_ITEM_LOG = "log"
CLEAN_ITEM_DATA = "data"
CLEAN_ITEM_CONTAINER = "container"
CLEANED_CONTAINER_ID = "-"
KEY_CLEAN_ITEMS = "CLEAN_ITEMS"
KEY_CLEAN_BY_RECYCLE = "CLEAN_BY_RECYCLE"
CLEAN_ITEM_LOG = "log"
CLEAN_ITEM_DATA = "data"
CLEAN_ITEM_CONTAINER = "container"
CLEANED_CONTAINER_ID = "-"
KEY_REMOVE_MIGRATED_SERVER = "REMOVE_MIGRATED_SERVER"

// client
KEY_CLIENT_HOST = "CLIENT_HOST"
Expand Down
37 changes: 25 additions & 12 deletions internal/configure/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,26 +263,39 @@ func ScaleOutClusterPool(old *CurveClusterTopo, dcs []*topology.DeployConfig, po
old.NPools = old.NPools + 1
}

func MigrateClusterServer(old *CurveClusterTopo, migrates []*MigrateServer) {
func MigrateClusterServer(old *CurveClusterTopo, migrates []*MigrateServer, removeMigratedServer bool) {
m := map[string]*topology.DeployConfig{} // key: from.Name, value: to.DeployConfig
for _, migrate := range migrates {
m[formatName(migrate.From)] = migrate.To
}

for i, server := range old.Servers {
dc, ok := m[server.Name]
if !ok {
continue
// add server that will migrate to
for fromName, toDc := range m {
server := Server{}
server.InternalIp = toDc.GetListenIp()
server.ExternalIp = toDc.GetListenExternalIp()
server.InternalPort = toDc.GetListenPort()
server.ExternalPort = toDc.GetListenExternalPort()
server.Name = formatName(toDc)

for _, oldServer := range old.Servers {
if oldServer.Name == fromName {
server.PhysicalPool = oldServer.PhysicalPool
server.Poolset = oldServer.Poolset
server.Zone = oldServer.Zone
}
}
old.Servers = append(old.Servers, server)
}

server.InternalIp = dc.GetListenIp()
server.ExternalIp = dc.GetListenExternalIp()
server.Name = formatName(dc)
if server.InternalPort != 0 && server.ExternalPort != 0 {
server.InternalPort = dc.GetListenPort()
server.ExternalPort = dc.GetListenExternalPort()
// remove server that has migrated
if removeMigratedServer {
for i := 0; i < len(old.Servers); i++ {
_, ok := m[old.Servers[i].Name]
if ok {
old.Servers = append(old.Servers[:i], old.Servers[i+1:]...)
}
}
old.Servers[i] = server
}
}

Expand Down
6 changes: 5 additions & 1 deletion internal/errno/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,11 @@ var (
ERR_ENCRYPT_FILE_FAILED = EC(410021, "encrypt file failed")
ERR_CLIENT_ID_NOT_FOUND = EC(410022, "client id not found")
ERR_ENABLE_ETCD_AUTH_FAILED = EC(410023, "enable etcd auth failed")

ERR_MARK_CHUNKSERVER_PENDDING = EC(410024, "mark chunkserver pendding status failed when migrate")
RRR_GET_CLUSTER_MDSADDR = EC(410025, "failed to get cluster mds addr")
ERR_GET_CHUNKSERVER_COPYSET = EC(410026, "failed to get chunkserver copyset")
ERR_GET_MIGRATE_COPYSET = EC(410027, "migrate chunkserver copyset info must be 2")
ERR_CONTAINER_NOT_REMOVED = EC(410027, "container not removed")
// 420: common (curvebs client)
ERR_VOLUME_ALREADY_MAPPED = EC(420000, "volume already mapped")
ERR_VOLUME_CONTAINER_LOSED = EC(420001, "volume container is losed")
Expand Down
6 changes: 6 additions & 0 deletions internal/playbook/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ const (
GET_CLIENT_STATUS
INSTALL_CLIENT
UNINSTALL_CLIENT
GET_MIGRATE_STATUS

// bs
FORMAT_CHUNKFILE_POOL
Expand All @@ -93,6 +94,7 @@ const (
CREATE_VOLUME
MAP_IMAGE
UNMAP_IMAGE
MARK_SERVER_PENGDDING

// monitor
PULL_MONITOR_IMAGE
Expand Down Expand Up @@ -247,6 +249,8 @@ func (p *Playbook) createTasks(step *PlaybookStep) (*tasks.Tasks, error) {
t, err = comm.NewInstallClientTask(curveadm, config.GetCC(i))
case UNINSTALL_CLIENT:
t, err = comm.NewUninstallClientTask(curveadm, nil)
case GET_MIGRATE_STATUS:
t, err = comm.NewGetMigrateStatusTask(curveadm, config.GetDC(i))
// bs
case FORMAT_CHUNKFILE_POOL:
t, err = bs.NewFormatChunkfilePoolTask(curveadm, config.GetFC(i))
Expand Down Expand Up @@ -275,6 +279,8 @@ func (p *Playbook) createTasks(step *PlaybookStep) (*tasks.Tasks, error) {
t, err = bs.NewDeleteTargetTask(curveadm, nil)
case LIST_TARGETS:
t, err = bs.NewListTargetsTask(curveadm, nil)
case MARK_SERVER_PENGDDING:
t, err = bs.NewMarkServerPendding(curveadm, config.GetDC(i))
// fs
case CHECK_CLIENT_S3:
t, err = checker.NewClientS3ConfigureTask(curveadm, config.GetCC(i))
Expand Down
Loading

0 comments on commit 503f6ff

Please sign in to comment.