Skip to content

Commit

Permalink
executor: support capturing to and replaying from shared storage (#58862
Browse files Browse the repository at this point in the history
)

close #58861
  • Loading branch information
djshow832 authored Jan 14, 2025
1 parent 9c01dcd commit 6f4861d
Show file tree
Hide file tree
Showing 2 changed files with 303 additions and 37 deletions.
183 changes: 154 additions & 29 deletions pkg/executor/traffic.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"fmt"
"io"
"maps"
"net"
"net/http"
"net/url"
Expand All @@ -27,6 +28,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/parser/mysql"
Expand All @@ -40,8 +42,10 @@ import (

// The keys for the mocked data that stored in context. They are only used for test.
type tiproxyAddrKeyType struct{}
type trafficPathKeyType struct{}

var tiproxyAddrKey tiproxyAddrKeyType
var trafficPathKey trafficPathKeyType

type trafficJob struct {
Instance string `json:"-"` // not passed from TiProxy
Expand All @@ -55,6 +59,16 @@ type trafficJob struct {

const (
startTimeKey = "start-time"
outputKey = "output"
inputKey = "input"

capturePath = "/api/traffic/capture"
replayPath = "/api/traffic/replay"
cancelPath = "/api/traffic/cancel"
showPath = "/api/traffic/show"

sharedStorageTimeout = 10 * time.Second
filePrefix = "tiproxy-"
)

// TrafficCaptureExec sends capture traffic requests to TiProxy.
Expand All @@ -66,8 +80,16 @@ type TrafficCaptureExec struct {
// Next implements the Executor Next interface.
func (e *TrafficCaptureExec) Next(ctx context.Context, _ *chunk.Chunk) error {
e.Args[startTimeKey] = time.Now().Format(time.RFC3339)
form := getForm(e.Args)
_, err := request(ctx, e.BaseExecutor, strings.NewReader(form), http.MethodPost, "api/traffic/capture")
addrs, err := getTiProxyAddrs(ctx)
if err != nil {
return errors.Wrapf(err, "get tiproxy addresses failed")
}
// For shared storage, append a suffix to the output path for each TiProxy so that they won't write to the same path.
readers, err := formReader4Capture(e.Args, len(addrs))
if err != nil {
return err
}
_, err = request(ctx, addrs, readers, http.MethodPost, capturePath)
return err
}

Expand All @@ -80,8 +102,30 @@ type TrafficReplayExec struct {
// Next implements the Executor Next interface.
func (e *TrafficReplayExec) Next(ctx context.Context, _ *chunk.Chunk) error {
e.Args[startTimeKey] = time.Now().Format(time.RFC3339)
form := getForm(e.Args)
_, err := request(ctx, e.BaseExecutor, strings.NewReader(form), http.MethodPost, "api/traffic/replay")
addrs, err := getTiProxyAddrs(ctx)
if err != nil {
return errors.Wrapf(err, "get tiproxy addresses failed")
}
// For shared storage, read the sub-direcotires from the input path and assign each sub-directory to a TiProxy instance.
formCtx, cancel := context.WithTimeout(ctx, sharedStorageTimeout)
readers, err := formReader4Replay(formCtx, e.Args, len(addrs))
cancel()
if err != nil {
return err
}
readerNum, tiproxyNum := len(readers), len(addrs)
if readerNum > tiproxyNum {
logutil.Logger(ctx).Error("tiproxy instances number is less than input paths number", zap.Int("tiproxy number", tiproxyNum),
zap.Int("path number", readerNum))
return errors.Errorf("tiproxy instances number (%d) is less than input paths number (%d)", tiproxyNum, readerNum)
} else if readerNum < tiproxyNum {
addrs = addrs[:readerNum]
err = errors.Errorf("tiproxy instances number (%d) is greater than input paths number (%d), some instances won't replay", tiproxyNum, readerNum)
e.Ctx().GetSessionVars().StmtCtx.AppendWarning(err)
logutil.Logger(ctx).Warn("tiproxy instances number is greater than input paths number, some instances won't replay",
zap.Int("tiproxy number", tiproxyNum), zap.Int("path number", readerNum))
}
_, err = request(ctx, addrs, readers, http.MethodPost, replayPath)
return err
}

Expand All @@ -91,8 +135,12 @@ type TrafficCancelExec struct {
}

// Next implements the Executor Next interface.
func (e *TrafficCancelExec) Next(ctx context.Context, _ *chunk.Chunk) error {
_, err := request(ctx, e.BaseExecutor, nil, http.MethodPost, "api/traffic/cancel")
func (*TrafficCancelExec) Next(ctx context.Context, _ *chunk.Chunk) error {
addrs, err := getTiProxyAddrs(ctx)
if err != nil {
return errors.Wrapf(err, "get tiproxy addresses failed")
}
_, err = request(ctx, addrs, nil, http.MethodPost, cancelPath)
return err
}

Expand All @@ -108,7 +156,11 @@ func (e *TrafficShowExec) Open(ctx context.Context) error {
if err := e.BaseExecutor.Open(ctx); err != nil {
return err
}
resps, err := request(ctx, e.BaseExecutor, nil, http.MethodGet, "api/traffic/show")
addrs, err := getTiProxyAddrs(ctx)
if err != nil {
return errors.Wrapf(err, "get tiproxy addresses failed")
}
resps, err := request(ctx, addrs, nil, http.MethodGet, showPath)
if err != nil {
return err
}
Expand Down Expand Up @@ -154,30 +206,23 @@ func (e *TrafficShowExec) Next(ctx context.Context, req *chunk.Chunk) error {
return nil
}

func request(ctx context.Context, exec exec.BaseExecutor, reader io.Reader, method, path string) (map[string]string, error) {
addrs, err := getTiProxyAddrs(ctx)
if err != nil {
return nil, err
}
func request(ctx context.Context, addrs []string, readers []io.Reader, method, path string) (map[string]string, error) {
resps := make(map[string]string, len(addrs))
for _, addr := range addrs {
resp, requestErr := requestOne(method, addr, path, reader)
if requestErr != nil {
// Let's send requests to all the instances even if some fail.
exec.Ctx().GetSessionVars().StmtCtx.AppendError(requestErr)
logutil.Logger(ctx).Error("traffic request to tiproxy failed", zap.String("method", method),
zap.String("path", path), zap.String("addr", addr), zap.String("resp", resp), zap.Error(requestErr))
if err == nil {
err = requestErr
}
} else {
resps[addr] = resp
for i, addr := range addrs {
var reader io.Reader
if readers != nil && i < len(readers) {
reader = readers[i]
}
resp, err := requestOne(method, addr, path, reader)
if err != nil {
logutil.Logger(ctx).Error("traffic request to tiproxy failed", zap.String("path", path), zap.String("addr", addr),
zap.String("resp", resp), zap.Error(err))
return resps, errors.Wrapf(err, "request to tiproxy '%s' failed", addr)
}
resps[addr] = resp
}
if err == nil {
logutil.Logger(ctx).Info("traffic request to tiproxy succeeds", zap.Strings("addrs", addrs), zap.String("path", path))
}
return resps, err
logutil.Logger(ctx).Info("traffic request to tiproxy succeeds", zap.Strings("addrs", addrs), zap.String("path", path))
return resps, nil
}

func getTiProxyAddrs(ctx context.Context) ([]string, error) {
Expand All @@ -202,7 +247,7 @@ func getTiProxyAddrs(ctx context.Context) ([]string, error) {
}

func requestOne(method, addr, path string, rd io.Reader) (string, error) {
url := fmt.Sprintf("%s://%s/%s", util.InternalHTTPSchema(), addr, path)
url := fmt.Sprintf("%s://%s%s", util.InternalHTTPSchema(), addr, path)
req, err := http.NewRequest(method, url, rd)
if err != nil {
return "", errors.Trace(err)
Expand Down Expand Up @@ -242,3 +287,83 @@ func parseTime(ctx context.Context, exec exec.BaseExecutor, timeStr string) type
}
return types.NewTime(types.FromGoTime(t), mysql.TypeDatetime, types.MaxFsp)
}

func formReader4Capture(args map[string]string, tiproxyNum int) ([]io.Reader, error) {
output, ok := args[outputKey]
if !ok || len(output) == 0 {
return nil, errors.New("the output path for capture must be specified")
}
u, err := url.Parse(output)
if err != nil {
return nil, errors.Wrapf(err, "parse output path failed")
}
readers := make([]io.Reader, tiproxyNum)
if storage.IsLocal(u) {
form := getForm(args)
for i := 0; i < tiproxyNum; i++ {
readers[i] = strings.NewReader(form)
}
} else {
for i := 0; i < tiproxyNum; i++ {
m := maps.Clone(args)
m[outputKey] = u.JoinPath(fmt.Sprintf("%s%d", filePrefix, i)).String()
form := getForm(m)
readers[i] = strings.NewReader(form)
}
}
return readers, nil
}

func formReader4Replay(ctx context.Context, args map[string]string, tiproxyNum int) ([]io.Reader, error) {
input, ok := args[inputKey]
if !ok || len(input) == 0 {
return nil, errors.New("the input path for replay must be specified")
}
u, err := storage.ParseRawURL(input)
if err != nil {
return nil, errors.Wrapf(err, "parse input path failed")
}
if storage.IsLocal(u) {
readers := make([]io.Reader, tiproxyNum)
form := getForm(args)
for i := 0; i < tiproxyNum; i++ {
readers[i] = strings.NewReader(form)
}
return readers, nil
}

names := make([]string, 0, tiproxyNum)
if mockNames := ctx.Value(trafficPathKey); mockNames != nil {
names = mockNames.([]string)
} else {
backend, err := storage.ParseBackendFromURL(u, nil)
if err != nil {
return nil, errors.Wrapf(err, "parse backend from the input path failed")
}
store, err := storage.NewWithDefaultOpt(ctx, backend)
if err != nil {
return nil, errors.Wrapf(err, "create storage for input failed")
}
defer store.Close()
err = store.WalkDir(ctx, &storage.WalkOption{
ObjPrefix: filePrefix,
}, func(name string, _ int64) error {
names = append(names, name)
return nil
})
if err != nil {
return nil, errors.Wrapf(err, "walk input path failed")
}
}
if len(names) == 0 {
return nil, errors.New("no replay files found in the input path")
}
readers := make([]io.Reader, 0, len(names))
for _, name := range names {
m := maps.Clone(args)
m[inputKey] = u.JoinPath(name).String()
form := getForm(m)
readers = append(readers, strings.NewReader(form))
}
return readers, nil
}
Loading

0 comments on commit 6f4861d

Please sign in to comment.