From 31e7cbc96b289816892bdade4c6db07754676f27 Mon Sep 17 00:00:00 2001 From: liyao Date: Mon, 27 Nov 2023 16:36:27 +0800 Subject: [PATCH] refactor: refactor to disk space error while backing up (#474) * fix: detect disk space error Signed-off-by: mlycore * feat: add data node ip and address to error message Signed-off-by: mlycore * fix: remove useless log Signed-off-by: mlycore * feat: add disk space error check to cmd output Signed-off-by: mlycore * fix: fix return error Signed-off-by: mlycore * feat: hide delete backupfiles output Signed-off-by: mlycore * fix: remove comments Signed-off-by: mlycore * chore: add golint comment Signed-off-by: mlycore * fix: fix test Signed-off-by: mlycore --------- Signed-off-by: mlycore --- pitr/agent/internal/pkg/opengauss.go | 23 ++++++++++-------- pitr/agent/pkg/cmds/cmd.go | 36 +++++++++++++++++++++------- pitr/cli/internal/cmd/backup.go | 15 ++++++------ pitr/cli/internal/cmd/backup_test.go | 4 ++-- 4 files changed, 49 insertions(+), 29 deletions(-) diff --git a/pitr/agent/internal/pkg/opengauss.go b/pitr/agent/internal/pkg/opengauss.go index 21872c69..84866d3c 100644 --- a/pitr/agent/internal/pkg/opengauss.go +++ b/pitr/agent/internal/pkg/opengauss.go @@ -98,6 +98,10 @@ const ( ) func (og *openGauss) AsyncBackup(backupPath, instanceName, backupMode string, threadsNum uint8, dbPort uint16) (string, error) { + var ( + bid string + err error + ) cmd := fmt.Sprintf(_backupFmt, backupPath, instanceName, backupMode, og.pgData, threadsNum, dbPort) outputs, err := cmds.AsyncExec(og.shell, cmd) if err != nil { @@ -117,17 +121,15 @@ func (og *openGauss) AsyncBackup(backupPath, instanceName, backupMode string, th return "", output.Error } - // get the backup id from the first line - bid, err := og.getBackupID(output.Message) - if err != nil { - og.log.Error(fmt.Sprintf("og.getBackupID[source=%s] return err wrap: %s", output.Message, err)) - return "", err + if strings.Contains(output.Message, "INFO: Backup start") { + bid, err = og.getBackupID(output.Message) + if err != nil { + og.log.Error(fmt.Sprintf("og.getBackupID[source=%s] return err wrap: %s", output.Message, err)) + return "", err + } } - // ignore other output - go og.ignore(outputs) - return bid, nil //nolint } - return "", fmt.Errorf("unknow err") + return bid, nil //nolint } //nolint:dupl @@ -192,7 +194,7 @@ func (og *openGauss) AddInstance(backupPath, instance string) error { if errors.Is(err, cons.CmdOperateFailed) { og.log.Error(fmt.Sprintf("add instance failure[output=%s], err: %s, wrap: %s", output, err, cons.InstanceAlreadyExist)) - return err + return fmt.Errorf("add instance failure[output=%s], err: %s, wrap: %w", output, err, cons.InstanceAlreadyExist) } if err != nil { og.log.Error(fmt.Sprintf(_CmdErrorFmt, og.shell, cmd, cons.CmdAddInstanceFailed)) @@ -347,6 +349,7 @@ func (og *openGauss) ShowBackupList(backupPath, instanceName string) ([]*model.B return og.showbackup(cmd, instanceName) } +//nolint:unused func (og *openGauss) ignore(outputs chan *cmds.Output) { defer func() { _ = recover() diff --git a/pitr/agent/pkg/cmds/cmd.go b/pitr/agent/pkg/cmds/cmd.go index 486148b1..77a98f53 100644 --- a/pitr/agent/pkg/cmds/cmd.go +++ b/pitr/agent/pkg/cmds/cmd.go @@ -49,6 +49,7 @@ func AsyncExec(name string, args ...string) (chan *Output, error) { if err != nil { return nil, fmt.Errorf("can not obtain stdout pipe for command[args=%+v]:%s", args, err) } + if err = cmd.Start(); err != nil { return nil, fmt.Errorf("the command is err[args=%+v]:%s", args, err) } @@ -61,11 +62,16 @@ func AsyncExec(name string, args ...string) (chan *Output, error) { go func() { if err = syncutils.NewRecoverFuncWithErrRet("", func() error { for scanner.Scan() { - output <- &Output{ + op := &Output{ LineNo: index, Message: scanner.Text(), Error: err, } + if strings.Contains(scanner.Text(), "No space left on device") { + op.Error = fmt.Errorf("%s", "No space left on device") + } + + output <- op index++ } @@ -78,13 +84,14 @@ func AsyncExec(name string, args ...string) (chan *Output, error) { if err = cmd.Wait(); err != nil { if ee, ok := err.(*exec.ExitError); ok { - logging.Error(fmt.Sprintf("exec failure[ee=%s], wrap=%s", ee, cons.CmdOperateFailed)) + output <- &Output{ + Error: fmt.Errorf("exec failure[ee=%s], wrap=%w", ee, cons.CmdOperateFailed), + } + } else { + output <- &Output{ + Error: fmt.Errorf("%s err: %s", cmd.String(), err), + } } - - output <- &Output{ - Error: fmt.Errorf("%s err: %s", cmd.String(), err), - } - } return nil })(); err != nil { @@ -113,6 +120,12 @@ func Exec(name string, args ...string) (string, error) { if err != nil { return "", fmt.Errorf("can not obtain stdout pipe for command[args=%+v]:%s", args, err) } + + stderr, err := cmd.StderrPipe() + if err != nil { + return "", fmt.Errorf("can not obtain stderr pipe for cmand[args=%+v]:%s", args, err) + } + if err = cmd.Start(); err != nil { return "", fmt.Errorf("the command is err[args=%+v]:%s", args, err) } @@ -122,11 +135,16 @@ func Exec(name string, args ...string) (string, error) { return "", fmt.Errorf("io.ReadAll return err=%w", err) } + ereader, err := io.ReadAll(stderr) + if err != nil { + return "", fmt.Errorf("io.ReadAll return err=%w", err) + } + if err = cmd.Wait(); err != nil { if ee, ok := err.(*exec.ExitError); ok { - logging.Error(fmt.Sprintf("exec failure[ee=%s,stdout=%s]", ee, string(reader))) + return "", fmt.Errorf("exec failure[ee=%s,stdout=%s], wrap:%w", ee, string(reader), cons.CmdOperateFailed) } - return "", fmt.Errorf("%s err: %s", cmd.String(), err) + return "", fmt.Errorf("%s err: %s", cmd.String(), string(ereader)) } return string(reader), nil } diff --git a/pitr/cli/internal/cmd/backup.go b/pitr/cli/internal/cmd/backup.go index 1b59e3c6..77f19a52 100644 --- a/pitr/cli/internal/cmd/backup.go +++ b/pitr/cli/internal/cmd/backup.go @@ -134,12 +134,7 @@ func backup() error { } if lsBackup != nil { - if cancel { - deleteBackupFiles(ls, lsBackup, deleteModeQuiet) - } else { - logging.Warn("Try to delete backup data ...") - deleteBackupFiles(ls, lsBackup, deleteModeNormal) - } + deleteBackupFiles(ls, lsBackup, deleteModeQuiet) } } }() @@ -310,20 +305,24 @@ func _execBackup(as pkg.IAgentServer, node *model.StorageNode, dnCh chan *model. Instance: defaultInstance, } backupID, err := as.Backup(in) + status := model.SsBackupStatusRunning if err != nil { - return xerr.NewCliErr(err.Error()) + status = model.BackupStatus(err.Error()) } // update DnList of lsBackup dn := &model.DataNode{ IP: node.IP, Port: node.Port, - Status: model.SsBackupStatusRunning, + Status: status, BackupID: backupID, StartTime: timeutil.Now().String(), EndTime: timeutil.Init(), } dnCh <- dn + if err != nil { + return fmt.Errorf("data node %s:%d backup error: %s", node.IP, node.Port, err) + } return nil } diff --git a/pitr/cli/internal/cmd/backup_test.go b/pitr/cli/internal/cmd/backup_test.go index ee72697a..8c7387ca 100644 --- a/pitr/cli/internal/cmd/backup_test.go +++ b/pitr/cli/internal/cmd/backup_test.go @@ -180,9 +180,9 @@ var _ = Describe("Backup", func() { as.EXPECT().Backup(gomock.Any()).Return("", xerr.NewCliErr("backup failed")) - Expect(_execBackup(as, bak.SsBackup.StorageNodes[0], dnCh)).ToNot(BeNil()) + Expect(_execBackup(as, bak.SsBackup.StorageNodes[1], dnCh)).ToNot(BeNil()) close(dnCh) - Expect(len(dnCh)).To(Equal(1)) + Expect(len(dnCh)).To(Equal(2)) }) })