From bec8c484bf4211dbd7fe0a1a25f46a34e5dcfc18 Mon Sep 17 00:00:00 2001 From: doraemon Date: Thu, 1 Feb 2024 19:18:45 +0800 Subject: [PATCH] Refactor StartBackup to Support Custom Backup Handlers --- replication/backup.go | 50 ++++++++++++++++++++++++++++++++----------- 1 file changed, 37 insertions(+), 13 deletions(-) diff --git a/replication/backup.go b/replication/backup.go index d75ba365f..f152764b9 100644 --- a/replication/backup.go +++ b/replication/backup.go @@ -13,7 +13,35 @@ import ( // StartBackup: Like mysqlbinlog remote raw backup // Backup remote binlog from position (filename, offset) and write in backupDir -func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Duration) error { +func (b *BinlogSyncer) StartBackupToFile(backupDir string, p Position, timeout time.Duration) error { + return b.StartBackup(p, timeout, func(filename string) (io.WriteCloser, error) { + return os.OpenFile(path.Join(backupDir, filename), os.O_CREATE|os.O_WRONLY, 0644) + }) +} + +// StartBackup initiates a backup process for binlog events starting from a specified position. +// It continuously fetches binlog events and writes them to files managed by a provided handler function. +// The backup process can be controlled with a timeout duration, after which the backup will stop if no new events are received. +// +// Parameters: +// - p Position: The starting position in the binlog from which to begin the backup. +// - timeout time.Duration: The maximum duration to wait for new binlog events before stopping the backup process. +// If set to 0, a default very long timeout (30 days) is used instead. +// - handler func(filename string) (io.WriteCloser, error): A function provided by the caller to handle file creation and writing. +// This function is expected to return an io.WriteCloser for the specified filename, which will be used to write binlog events. +// +// The function first checks if a timeout is specified, setting a default if not. It then enables raw mode parsing for binlog events +// to ensure that events are not parsed but passed as raw data for backup. It starts syncing binlog events from the specified position +// and enters a loop to continuously fetch and write events. +// +// For each event, it checks the event type. If it's a ROTATE_EVENT, it updates the filename to the next log file as indicated by the event. +// If it's a FORMAT_DESCRIPTION_EVENT, it signifies the start of a new binlog file, and the function closes the current file (if open) and opens +// a new one using the handler function. It also writes the binlog file header to the new file. +// +// The function writes the raw data of each event to the current file and handles errors such as context deadline exceeded (timeout), +// write errors, or short writes. +func (b *BinlogSyncer) StartBackup(p Position, timeout time.Duration, + handler func(filename string) (io.WriteCloser, error)) error { if timeout == 0 { // a very long timeout here timeout = 30 * 3600 * 24 * time.Second @@ -22,10 +50,6 @@ func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Du // Force use raw mode b.parser.SetRawMode(true) - if err := os.MkdirAll(backupDir, 0755); err != nil { - return errors.Trace(err) - } - s, err := b.StartSync(p) if err != nil { return errors.Trace(err) @@ -34,10 +58,10 @@ func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Du var filename string var offset uint32 - var f *os.File + var w io.WriteCloser defer func() { - if f != nil { - f.Close() + if w != nil { + w.Close() } }() @@ -67,26 +91,26 @@ func (b *BinlogSyncer) StartBackup(backupDir string, p Position, timeout time.Du } else if e.Header.EventType == FORMAT_DESCRIPTION_EVENT { // FormateDescriptionEvent is the first event in binlog, we will close old one and create a new - if f != nil { - f.Close() + if w != nil { + _ = w.Close() } if len(filename) == 0 { return errors.Errorf("empty binlog filename for FormateDescriptionEvent") } - f, err = os.OpenFile(path.Join(backupDir, filename), os.O_CREATE|os.O_WRONLY, 0644) + w, err = handler(filename) if err != nil { return errors.Trace(err) } // write binlog header fe'bin' - if _, err = f.Write(BinLogFileHeader); err != nil { + if _, err = w.Write(BinLogFileHeader); err != nil { return errors.Trace(err) } } - if n, err := f.Write(e.RawData); err != nil { + if n, err := w.Write(e.RawData); err != nil { return errors.Trace(err) } else if n != len(e.RawData) { return errors.Trace(io.ErrShortWrite)