Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DMN] playground: add dm support for playground #2464

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions components/playground/instance/dm_master.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package instance

import (
"context"
"fmt"
"path/filepath"
"strings"

"github.com/pingcap/tiup/pkg/utils"
)

type DMMaster struct {
instance
Process
initEndpoints []*DMMaster
}

var _ Instance = &DMMaster{}

func NewDMMaster(binPath string, dir, host, configPath string, portOffset int, id int, port int) *DMMaster {
if port <= 0 {
port = 8261
}
return &DMMaster{
instance: instance{
BinPath: binPath,
ID: id,
Dir: dir,
Host: host,
Port: utils.MustGetFreePort(host, 8291, portOffset),
// Similar like PD's client port, here use StatusPort for Master Port.
StatusPort: utils.MustGetFreePort(host, port, portOffset),
ConfigPath: configPath,
},
}
}

func (m *DMMaster) Name() string {
return fmt.Sprintf("dm-master-%d", m.ID)
}

func (m *DMMaster) Start(ctx context.Context) error {
args := []string{
fmt.Sprintf("--name=%s", m.Name()),
fmt.Sprintf("--master-addr=http://%s", utils.JoinHostPort(m.Host, m.StatusPort)),
fmt.Sprintf("--advertise-addr=http://%s", utils.JoinHostPort(AdvertiseHost(m.Host), m.StatusPort)),
fmt.Sprintf("--peer-urls=http://%s", utils.JoinHostPort(m.Host, m.Port)),
fmt.Sprintf("--advertise-peer-urls=http://%s", utils.JoinHostPort(AdvertiseHost(m.Host), m.Port)),
fmt.Sprintf("--log-file=%s", m.LogFile()),
}

endpoints := make([]string, 0)
for _, master := range m.initEndpoints {
endpoints = append(endpoints, fmt.Sprintf("%s=http://%s", master.Name(), utils.JoinHostPort(master.Host, master.Port)))
}
args = append(args, fmt.Sprintf("--initial-cluster=%s", strings.Join(endpoints, ",")))

if m.ConfigPath != "" {
args = append(args, fmt.Sprintf("--config=%s", m.ConfigPath))
}

m.Process = &process{cmd: PrepareCommand(ctx, m.BinPath, args, nil, m.Dir)}

logIfErr(m.Process.SetOutputFile(m.LogFile()))
return m.Process.Start()
}

func (m *DMMaster) SetInitEndpoints(endpoints []*DMMaster) {
m.initEndpoints = endpoints
}

func (m *DMMaster) Component() string {
return "dm-master"
}

func (m *DMMaster) LogFile() string {
return filepath.Join(m.Dir, "dm-master.log")
}
83 changes: 83 additions & 0 deletions components/playground/instance/dm_worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package instance

import (
"context"
"fmt"
"path/filepath"
"strings"
"time"

"github.com/pingcap/tiup/pkg/utils"
)

type DMWorker struct {
instance
Process

masters []*DMMaster
}

var _ Instance = &DMWorker{}

// NewDMWorker create a DMWorker instance.
func NewDMWorker(binPath string, dir, host, configPath string, portOffset int, id int, port int, masters []*DMMaster) *DMWorker {
if port <= 0 {
port = 8262
}
return &DMWorker{
instance: instance{
BinPath: binPath,
ID: id,
Dir: dir,
Host: host,
Port: utils.MustGetFreePort(host, port, portOffset),
ConfigPath: configPath,
},
masters: masters,
}
}

func (w *DMWorker) MasterAddrs() []string {
var addrs []string
for _, master := range w.masters {
addrs = append(addrs, utils.JoinHostPort(AdvertiseHost(master.Host), master.StatusPort))
}
return addrs
}

func (w *DMWorker) Name() string {
return fmt.Sprintf("dm-worker-%d", w.ID)
}

func (w *DMWorker) Start(ctx context.Context) error {
args := []string{
fmt.Sprintf("--name=%s", w.Name()),
fmt.Sprintf("--worker-addr=%s", utils.JoinHostPort(w.Host, w.Port)),
fmt.Sprintf("--advertise-addr=%s", utils.JoinHostPort(AdvertiseHost(w.Host), w.Port)),
fmt.Sprintf("--join=%s", strings.Join(w.MasterAddrs(), ",")),
fmt.Sprintf("--log-file=%s", w.LogFile()),
}

if w.ConfigPath != "" {
args = append(args, fmt.Sprintf("--config=%s", w.ConfigPath))
}

w.Process = &process{cmd: PrepareCommand(ctx, w.BinPath, args, nil, w.Dir)}

logIfErr(w.Process.SetOutputFile(w.LogFile()))

// try to wait for the master to be ready
// this is a very ugly implementation, but it may mostly works
// TODO: find a better way to do this,
// e.g, let master support a HTTP API to check if it's ready
time.Sleep(time.Second * 3)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Benjamin2037 seems that for DM worker, we should support Retry when the worker starts and tries to connect the leader of the Master.

In the case of Playground, when the master starts, and the worker will also immediately start, and the worker will fail because at that time, there is no leader in DM master.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After discussing with the TiUP owner, we decided to use TiUP first to start the DM master and wait for all DM master starting processes to finish, and then TiUP starts the DM worker.

return w.Process.Start()
}

func (w *DMWorker) Component() string {
return "dm-worker"
}

func (w *DMWorker) LogFile() string {
return filepath.Join(w.Dir, "dm-worker.log")
}
15 changes: 15 additions & 0 deletions components/playground/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ type BootOptions struct {
CSEOpts instance.CSEOptions `yaml:"cse"` // Only available when mode == tidb-cse
GrafanaPort int `yaml:"grafana_port"`
PortOffset int `yaml:"port_offset"`
DMMaster instance.Config `yaml:"dm_master"`
DMWorker instance.Config `yaml:"dm_worker"`
}

var (
Expand Down Expand Up @@ -351,6 +353,19 @@ Note: Version constraint [bold]%s[reset] is resolved to [green][bold]%s[reset].
rootCmd.Flags().StringVar(&options.CSEOpts.AccessKey, "cse.access_key", "minioadmin", "Object store access key for --mode=tidb-cse")
rootCmd.Flags().StringVar(&options.CSEOpts.SecretKey, "cse.secret_key", "minioadmin", "Object store secret key for --mode=tidb-cse")

rootCmd.Flags().IntVar(&options.DMMaster.Num, "dm-master", 0, "DM-master instance number")
rootCmd.Flags().IntVar(&options.DMWorker.Num, "dm-worker", 0, "DM-worker instance number")

rootCmd.Flags().StringVar(&options.DMMaster.Host, "dm-master.host", "", "DM-master instance host")
rootCmd.Flags().IntVar(&options.DMMaster.Port, "dm-master.port", 8261, "DM-master instance port")
rootCmd.Flags().StringVar(&options.DMMaster.ConfigPath, "dm-master.config", "", "DM-master instance configuration file")
rootCmd.Flags().StringVar(&options.DMMaster.BinPath, "dm-master.binpath", "", "DM-master instance binary path")

rootCmd.Flags().StringVar(&options.DMWorker.Host, "dm-worker.host", "", "DM-worker instance host")
rootCmd.Flags().IntVar(&options.DMWorker.Port, "dm-worker.port", 8262, "DM-worker instance port")
rootCmd.Flags().StringVar(&options.DMWorker.ConfigPath, "dm-worker.config", "", "DM-worker instance configuration file")
rootCmd.Flags().StringVar(&options.DMWorker.BinPath, "dm-worker.binpath", "", "DM-worker instance binary path")

rootCmd.AddCommand(newDisplay())
rootCmd.AddCommand(newScaleOut())
rootCmd.AddCommand(newScaleIn())
Expand Down
44 changes: 44 additions & 0 deletions components/playground/playground.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ type Playground struct {
tikvCdcs []*instance.TiKVCDC
pumps []*instance.Pump
drainers []*instance.Drainer
dmMasters []*instance.DMMaster
dmWorkers []*instance.DMWorker
startedInstances []instance.Instance

idAlloc map[string]int
Expand Down Expand Up @@ -682,6 +684,20 @@ func (p *Playground) WalkInstances(fn func(componentID string, ins instance.Inst
}
}

for _, ins := range p.dmMasters {
err := fn(spec.ComponentDMMaster, ins)
if err != nil {
return err
}
}

for _, ins := range p.dmWorkers {
err := fn(spec.ComponentDMWorker, ins)
if err != nil {
return err
}
}

return nil
}

Expand Down Expand Up @@ -783,6 +799,17 @@ func (p *Playground) addInstance(componentID string, pdRole instance.PDRole, tif
inst := instance.NewDrainer(cfg.BinPath, dir, host, cfg.ConfigPath, options.PortOffset, id, p.pds)
ins = inst
p.drainers = append(p.drainers, inst)
case spec.ComponentDMMaster:
inst := instance.NewDMMaster(cfg.BinPath, dir, host, cfg.ConfigPath, options.PortOffset, id, cfg.Port)
ins = inst
p.dmMasters = append(p.dmMasters, inst)
for _, master := range p.dmMasters {
master.SetInitEndpoints(p.dmMasters)
}
case spec.ComponentDMWorker:
inst := instance.NewDMWorker(cfg.BinPath, dir, host, cfg.ConfigPath, options.PortOffset, id, cfg.Port, p.dmMasters)
ins = inst
p.dmWorkers = append(p.dmWorkers, inst)
default:
return nil, errors.Errorf("unknown component: %s", componentID)
}
Expand Down Expand Up @@ -928,6 +955,8 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme
&options.Pump,
&options.Drainer,
&options.TiKVCDC,
&options.DMMaster,
&options.DMWorker,
} {
path, err := getAbsolutePath(cfg.ConfigPath)
if err != nil {
Expand Down Expand Up @@ -969,6 +998,8 @@ func (p *Playground) bootCluster(ctx context.Context, env *environment.Environme
{spec.ComponentCDC, "", "", options.TiCDC},
{spec.ComponentTiKVCDC, "", "", options.TiKVCDC},
{spec.ComponentDrainer, "", "", options.Drainer},
{spec.ComponentDMMaster, "", "", options.DMMaster},
{spec.ComponentDMWorker, "", "", options.DMWorker},
}

if options.Mode == "tidb" {
Expand Down Expand Up @@ -1254,6 +1285,19 @@ func (p *Playground) terminate(sig syscall.Signal) {
if p.grafana != nil && p.grafana.cmd != nil && p.grafana.cmd.Process != nil {
go kill("grafana", p.grafana.cmd.Process.Pid, p.grafana.wait)
}

for _, inst := range p.dmMasters {
if inst.Process != nil && inst.Process.Cmd() != nil && inst.Process.Cmd().Process != nil {
kill(inst.Component(), inst.Pid(), inst.Wait)
}
}

for _, inst := range p.dmWorkers {
if inst.Process != nil && inst.Process.Cmd() != nil && inst.Process.Cmd().Process != nil {
kill(inst.Component(), inst.Pid(), inst.Wait)
}
}

for _, inst := range p.tiflashs {
if inst.Process != nil && inst.Process.Cmd() != nil && inst.Process.Cmd().Process != nil {
kill(inst.Component(), inst.Pid(), inst.Wait)
Expand Down
Loading