Skip to content

Commit

Permalink
Refactor StartBackup to Support Custom Backup Handlers (#849)
Browse files Browse the repository at this point in the history
* Refactor StartBackup to Support Custom Backup Handlers

* fix: TestStartBackupEndInGivenTime test failure

* Refactor StartBackupToFile to create backup directory before initiating backup

* fix: Add error handling for w.Close()

* style:Rename filename to binlogFilename for clarity in StartBackup

* docs: Simplify the comments of the StartBackup method

* refactor: Rename the StartBackup method to StartBackupWithHandler so that the StartBackup method retains its original behavior
  • Loading branch information
doraemonkeys authored Feb 4, 2024
1 parent a057d23 commit 39976f7
Showing 1 changed file with 34 additions and 12 deletions.
46 changes: 34 additions & 12 deletions replication/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,25 @@ import (
// StartBackup: Like mysqlbinlog remote raw backup
// Backup remote binlog from position (filename, offset) and write in backupDir
func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Duration) error {
err := os.MkdirAll(backupDir, 0755)
if err != nil {
return errors.Trace(err)
}
return b.StartBackupWithHandler(p, timeout, func(filename string) (io.WriteCloser, error) {
return os.OpenFile(path.Join(backupDir, filename), os.O_CREATE|os.O_WRONLY, 0644)
})
}

// StartBackupWithHandler starts the backup process for the binary log using the specified position and handler.
// The process will continue until the timeout is reached or an error occurs.
//
// Parameters:
// - p: The starting position in the binlog from which to begin the backup.
// - timeout: The maximum duration to wait for new binlog events before stopping the backup process.
// If set to 0, a default very long timeout (30 days) is used instead.
// - handler: A function that takes a binlog filename and returns an WriteCloser for writing raw events to.
func (b *BinlogSyncer) StartBackupWithHandler(p Position, timeout time.Duration,
handler func(binlogFilename string) (io.WriteCloser, error)) (retErr error) {
if timeout == 0 {
// a very long timeout here
timeout = 30 * 3600 * 24 * time.Second
Expand All @@ -22,10 +41,6 @@ func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Du
// Force use raw mode
b.parser.SetRawMode(true)

if err := os.MkdirAll(backupDir, 0755); err != nil {
return errors.Trace(err)
}

s, err := b.StartSync(p)
if err != nil {
return errors.Trace(err)
Expand All @@ -34,10 +49,14 @@ func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Du
var filename string
var offset uint32

var f *os.File
var w io.WriteCloser
defer func() {
if f != nil {
f.Close()
var closeErr error
if w != nil {
closeErr = w.Close()
}
if retErr == nil {
retErr = closeErr
}
}()

Expand Down Expand Up @@ -67,26 +86,29 @@ func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Du
} else if e.Header.EventType == FORMAT_DESCRIPTION_EVENT {
// FormateDescriptionEvent is the first event in binlog, we will close old one and create a new

if f != nil {
f.Close()
if w != nil {
if err = w.Close(); err != nil {
w = nil
return errors.Trace(err)
}
}

if len(filename) == 0 {
return errors.Errorf("empty binlog filename for FormateDescriptionEvent")
}

f, err = os.OpenFile(path.Join(backupDir, filename), os.O_CREATE|os.O_WRONLY, 0644)
w, err = handler(filename)
if err != nil {
return errors.Trace(err)
}

// write binlog header fe'bin'
if _, err = f.Write(BinLogFileHeader); err != nil {
if _, err = w.Write(BinLogFileHeader); err != nil {
return errors.Trace(err)
}
}

if n, err := f.Write(e.RawData); err != nil {
if n, err := w.Write(e.RawData); err != nil {
return errors.Trace(err)
} else if n != len(e.RawData) {
return errors.Trace(io.ErrShortWrite)
Expand Down

0 comments on commit 39976f7

Please sign in to comment.