diff --git a/app/cmd/add_replica.go b/app/cmd/add_replica.go index 55e51dd43..b58b2ba3d 100644 --- a/app/cmd/add_replica.go +++ b/app/cmd/add_replica.go @@ -35,6 +35,11 @@ func AddReplicaCmd() cli.Command { Required: false, Usage: "Enable fast file synchronization using change time and checksum", }, + cli.BoolFlag{ + Name: "sync-local", + Required: false, + Usage: "sync local replica", + }, cli.IntFlag{ Name: "file-sync-http-client-timeout", Required: false, @@ -103,7 +108,7 @@ func addReplica(c *cli.Context) error { if c.Bool("restore") { return task.AddRestoreReplica(volumeSize, volumeCurrentSize, replica, replicaInstanceName) } - return task.AddReplica(volumeSize, volumeCurrentSize, replica, replicaInstanceName, fileSyncHTTPClientTimeout, fastSync, grpcTimeoutSeconds) + return task.AddReplica(volumeSize, volumeCurrentSize, replica, replicaInstanceName, fileSyncHTTPClientTimeout, fastSync, nil, grpcTimeoutSeconds) } func StartWithReplicasCmd() cli.Command { diff --git a/pkg/replica/client/client.go b/pkg/replica/client/client.go index 8e277a9c9..4fc36aac7 100644 --- a/pkg/replica/client/client.go +++ b/pkg/replica/client/client.go @@ -506,7 +506,7 @@ func (c *ReplicaClient) LaunchReceiver(toFilePath string) (string, int32, error) return c.host, reply.Port, nil } -func (c *ReplicaClient) SyncFiles(fromAddress string, list []types.SyncFileInfo, fileSyncHTTPClientTimeout int, fastSync bool, grpcTimeoutSeconds int64) error { +func (c *ReplicaClient) SyncFiles(fromAddress string, list []types.SyncFileInfo, fileSyncHTTPClientTimeout int, fastSync bool, grpcTimeoutSeconds int64, localSync *types.FileLocalSync) error { syncAgentServiceClient, err := c.getSyncServiceClient() if err != nil { return err @@ -518,14 +518,23 @@ func (c *ReplicaClient) SyncFiles(fromAddress string, list []types.SyncFileInfo, ctx, cancel := context.WithTimeout(context.Background(), grpcTimeout) defer cancel() - if _, err := syncAgentServiceClient.FilesSync(ctx, &enginerpc.FilesSyncRequest{ + fileSyncRequest := &enginerpc.FilesSyncRequest{ FromAddress: fromAddress, ToHost: c.host, SyncFileInfoList: syncFileInfoListToSyncAgentGRPCFormat(list), FastSync: fastSync, FileSyncHttpClientTimeout: int32(fileSyncHTTPClientTimeout), GrpcTimeoutSeconds: grpcTimeoutSeconds, - }); err != nil { + } + + if localSync != nil { + fileSyncRequest.LocalSync = &enginerpc.FileLocalSync{ + SourcePath: localSync.SourcePath, + TargetPath: localSync.TargetPath, + } + } + + if _, err := syncAgentServiceClient.FilesSync(ctx, fileSyncRequest); err != nil { return errors.Wrapf(err, "failed to sync files %+v from %v", list, fromAddress) } diff --git a/pkg/sync/rpc/server.go b/pkg/sync/rpc/server.go index 02feff008..29089bc3c 100644 --- a/pkg/sync/rpc/server.go +++ b/pkg/sync/rpc/server.go @@ -27,6 +27,8 @@ import ( "google.golang.org/grpc/reflection" "google.golang.org/protobuf/types/known/emptypb" + lhio "github.com/longhorn/go-common-libs/io" + "github.com/longhorn/longhorn-engine/pkg/backup" "github.com/longhorn/longhorn-engine/pkg/interceptor" "github.com/longhorn/longhorn-engine/pkg/replica" @@ -445,16 +447,77 @@ func (s *SyncAgentServer) FilesSync(ctx context.Context, req *enginerpc.FilesSyn } }() + if req.LocalSync != nil { + err := s.fileSyncLocal(ctx, req) + if err == nil { + return &emptypb.Empty{}, nil + } + + logrus.WithError(err).Warn("Falling back to remote sync") + } + + return &emptypb.Empty{}, s.fileSyncRemote(ctx, req) +} + +func (s *SyncAgentServer) fileSyncLocal(ctx context.Context, req *enginerpc.FilesSyncRequest) error { + var targetPaths []string + var err error + + log := logrus.WithFields(logrus.Fields{ + "sourcePath": req.LocalSync.SourcePath, + "targetPath": req.LocalSync.TargetPath, + }) + + log.Info("Syncing files locally") + + // Defer function to handle cleanup of files if an error occurs + defer func() { + if err == nil { + log.Info("Done syncing files locally") + } else { + log.WithError(err).Warn("Failed to sync files locally, reverting changes") + + for _, targetPath := range targetPaths { + if removeErr := os.Remove(targetPath); removeErr != nil && removeErr != os.ErrNotExist { + log.WithError(removeErr).Warnf("Failed to remove file %v", targetPath) + } + } + } + }() + + for _, info := range req.SyncFileInfoList { + sourcePath := filepath.Join("/host", req.LocalSync.SourcePath, info.FromFileName) + targetPath := filepath.Join("/host", req.LocalSync.TargetPath, info.ToFileName) + targetPaths = append(targetPaths, targetPath) + + log.Tracef("Copying file %v to %v", sourcePath, targetPath) + + err = lhio.CopyFile(sourcePath, targetPath, true) + if err != nil { + return err + } + + err := lhio.CheckIsFileSizeSame(sourcePath, targetPath) + if err != nil { + return errors.Wrapf(err, "failed to check file size for file %v", sourcePath) + } + } + + return nil +} + +func (s *SyncAgentServer) fileSyncRemote(ctx context.Context, req *enginerpc.FilesSyncRequest) error { // We generally don't know the from replica's instanceName since it is arbitrarily chosen from candidate addresses // stored in the controller. Don't modify FilesSyncRequest to contain it, and create a client without it. fromClient, err := replicaclient.NewReplicaClient(req.FromAddress, s.volumeName, "") if err != nil { - return nil, err + return err } defer fromClient.Close() var ops sparserest.SyncFileOperations fileStub := &sparserest.SyncFileStub{} + for _, info := range req.SyncFileInfoList { // Do not count size for disk meta file or empty disk file. if info.ActualSize == 0 { @@ -465,14 +528,13 @@ func (s *SyncAgentServer) FilesSync(ctx context.Context, req *enginerpc.FilesSyn port, err := s.launchReceiver("FilesSync", info.ToFileName, ops) if err != nil { - return nil, errors.Wrapf(err, "failed to launch receiver for file %v", info.ToFileName) + return errors.Wrapf(err, "failed to launch receiver for file %v", info.ToFileName) } if err := fromClient.SendFile(info.FromFileName, req.ToHost, int32(port), int(req.FileSyncHttpClientTimeout), req.FastSync, req.GrpcTimeoutSeconds); err != nil { - return nil, errors.Wrapf(err, "replica %v failed to send file %v to %v:%v", req.FromAddress, info.ToFileName, req.ToHost, port) + return errors.Wrapf(err, "replica %v failed to send file %v to %v:%v", req.FromAddress, info.ToFileName, req.ToHost, port) } } - - return &emptypb.Empty{}, nil + return nil } func (s *SyncAgentServer) PrepareRebuild(list []*enginerpc.SyncFileInfo, fromReplicaAddress string) error { diff --git a/pkg/sync/sync.go b/pkg/sync/sync.go index 9e5c420d6..de80b2d8d 100644 --- a/pkg/sync/sync.go +++ b/pkg/sync/sync.go @@ -412,7 +412,7 @@ func (t *Task) VerifyRebuildReplica(address, instanceName string) error { return nil } -func (t *Task) AddReplica(volumeSize, volumeCurrentSize int64, address, instanceName string, fileSyncHTTPClientTimeout int, fastSync bool, grpcTimeoutSeconds int64) error { +func (t *Task) AddReplica(volumeSize, volumeCurrentSize int64, address, instanceName string, fileSyncHTTPClientTimeout int, fastSync bool, localSync *types.FileLocalSync, grpcTimeoutSeconds int64) error { volume, err := t.client.VolumeGet() if err != nil { return err @@ -459,7 +459,7 @@ func (t *Task) AddReplica(volumeSize, volumeCurrentSize int64, address, instance return fmt.Errorf("sync file list shouldn't contain volume head") } - if err = toClient.SyncFiles(fromAddress, resp, fileSyncHTTPClientTimeout, fastSync, grpcTimeoutSeconds); err != nil { + if err = toClient.SyncFiles(fromAddress, resp, fileSyncHTTPClientTimeout, fastSync, grpcTimeoutSeconds, localSync); err != nil { return err } diff --git a/pkg/types/types.go b/pkg/types/types.go index 3c5ec3ce5..3e02bbb7e 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -203,3 +203,8 @@ func GRPCReplicaModeToReplicaMode(replicaMode enginerpc.ReplicaMode) Mode { } return ERR } + +type FileLocalSync struct { + SourcePath string + TargetPath string +}