Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(auto-balance): disk in pressure #1003

Merged
merged 3 commits into from
Jul 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion app/cmd/add_replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ func AddReplicaCmd() cli.Command {
Required: false,
Usage: "Enable fast file synchronization using change time and checksum",
},
cli.BoolFlag{
Name: "sync-local",
Required: false,
Usage: "sync local replica",
},
Comment on lines +38 to +42
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NIT: Do we need this if we won't implement local sync for this debug cmdline?

cli.IntFlag{
Name: "file-sync-http-client-timeout",
Required: false,
Expand Down Expand Up @@ -103,7 +108,7 @@ func addReplica(c *cli.Context) error {
if c.Bool("restore") {
return task.AddRestoreReplica(volumeSize, volumeCurrentSize, replica, replicaInstanceName)
}
return task.AddReplica(volumeSize, volumeCurrentSize, replica, replicaInstanceName, fileSyncHTTPClientTimeout, fastSync, grpcTimeoutSeconds)
return task.AddReplica(volumeSize, volumeCurrentSize, replica, replicaInstanceName, fileSyncHTTPClientTimeout, fastSync, nil, grpcTimeoutSeconds)
}

func StartWithReplicasCmd() cli.Command {
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ require (
github.com/longhorn/backupstore v0.0.0-20240624084713-e98e31ebcebb
github.com/longhorn/go-common-libs v0.0.0-20240627075631-d78642cff5e1
github.com/longhorn/go-iscsi-helper v0.0.0-20240624090318-a8ef86edd9a5
github.com/longhorn/sparse-tools v0.0.0-20240513025352-ed49dd3f93eb
github.com/longhorn/types v0.0.0-20240624083620-f11ba48bf396
github.com/longhorn/sparse-tools v0.0.0-20240703010727-92451e38077a
github.com/longhorn/types v0.0.0-20240702051453-949485b24d0a
github.com/moby/moby v26.1.4+incompatible
github.com/pkg/errors v0.9.1
github.com/rancher/go-fibmap v0.0.0-20160418233256-5fc9f8c1ed47
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ github.com/longhorn/go-common-libs v0.0.0-20240627075631-d78642cff5e1 h1:VGSNK9A
github.com/longhorn/go-common-libs v0.0.0-20240627075631-d78642cff5e1/go.mod h1:wpLEAlsDCnqBA7QfZg0gxYeR8MmLbWHbdidWYwnRbyM=
github.com/longhorn/go-iscsi-helper v0.0.0-20240624090318-a8ef86edd9a5 h1:X2D6tZjl8y99di1oo4XAbsgNpRTcw+C3DAK5760uQyI=
github.com/longhorn/go-iscsi-helper v0.0.0-20240624090318-a8ef86edd9a5/go.mod h1:tdKlSNcBEJR0ueHqcYW9xL2Yn01liqKNpAHDEBiX/to=
github.com/longhorn/sparse-tools v0.0.0-20240513025352-ed49dd3f93eb h1:Kh89s6i5T1W6BT1Aq9W1YHXojbbcTXlDieWC5KWAs/E=
github.com/longhorn/sparse-tools v0.0.0-20240513025352-ed49dd3f93eb/go.mod h1:vFvENahNfkr2VAyROaGHnZrLWuXwZJ0DHj8mBIh2miI=
github.com/longhorn/types v0.0.0-20240624083620-f11ba48bf396 h1:dvrppUjQ5i9wPOuU1qnanksHe82cSaNckGaWzn3f8QY=
github.com/longhorn/types v0.0.0-20240624083620-f11ba48bf396/go.mod h1:fonrC6SwGpvt+YVlfJ3xMmg0MlOH94T1Qx2+ZEy8n3U=
github.com/longhorn/sparse-tools v0.0.0-20240703010727-92451e38077a h1:+o63c0oh7ZNKeQdc0Hawfzz5vRa4LiDvLOtJYjegtnk=
github.com/longhorn/sparse-tools v0.0.0-20240703010727-92451e38077a/go.mod h1:iUJCZtOKG/9xv2rfrUAYZntFTzP5dZtvy4Kwe6dMcUc=
github.com/longhorn/types v0.0.0-20240702051453-949485b24d0a h1:qutBOTJH2Jlh0gq2sl9nbBohacgcnDZqxEvnZG8jOnU=
github.com/longhorn/types v0.0.0-20240702051453-949485b24d0a/go.mod h1:fonrC6SwGpvt+YVlfJ3xMmg0MlOH94T1Qx2+ZEy8n3U=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
Expand Down
15 changes: 12 additions & 3 deletions pkg/replica/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ func (c *ReplicaClient) LaunchReceiver(toFilePath string) (string, int32, error)
return c.host, reply.Port, nil
}

func (c *ReplicaClient) SyncFiles(fromAddress string, list []types.SyncFileInfo, fileSyncHTTPClientTimeout int, fastSync bool, grpcTimeoutSeconds int64) error {
func (c *ReplicaClient) SyncFiles(fromAddress string, list []types.SyncFileInfo, fileSyncHTTPClientTimeout int, fastSync bool, grpcTimeoutSeconds int64, localSync *types.FileLocalSync) error {
syncAgentServiceClient, err := c.getSyncServiceClient()
if err != nil {
return err
Expand All @@ -518,14 +518,23 @@ func (c *ReplicaClient) SyncFiles(fromAddress string, list []types.SyncFileInfo,
ctx, cancel := context.WithTimeout(context.Background(), grpcTimeout)
defer cancel()

if _, err := syncAgentServiceClient.FilesSync(ctx, &enginerpc.FilesSyncRequest{
fileSyncRequest := &enginerpc.FilesSyncRequest{
FromAddress: fromAddress,
ToHost: c.host,
SyncFileInfoList: syncFileInfoListToSyncAgentGRPCFormat(list),
FastSync: fastSync,
FileSyncHttpClientTimeout: int32(fileSyncHTTPClientTimeout),
GrpcTimeoutSeconds: grpcTimeoutSeconds,
}); err != nil {
}

if localSync != nil {
fileSyncRequest.LocalSync = &enginerpc.FileLocalSync{
SourcePath: localSync.SourcePath,
TargetPath: localSync.TargetPath,
}
}

if _, err := syncAgentServiceClient.FilesSync(ctx, fileSyncRequest); err != nil {
return errors.Wrapf(err, "failed to sync files %+v from %v", list, fromAddress)
}

Expand Down
72 changes: 67 additions & 5 deletions pkg/sync/rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ import (
"google.golang.org/grpc/reflection"
"google.golang.org/protobuf/types/known/emptypb"

lhio "github.com/longhorn/go-common-libs/io"

"github.com/longhorn/longhorn-engine/pkg/backup"
"github.com/longhorn/longhorn-engine/pkg/interceptor"
"github.com/longhorn/longhorn-engine/pkg/replica"
Expand Down Expand Up @@ -445,16 +447,77 @@ func (s *SyncAgentServer) FilesSync(ctx context.Context, req *enginerpc.FilesSyn
}
}()

if req.LocalSync != nil {
err := s.fileSyncLocal(ctx, req)
if err == nil {
return &emptypb.Empty{}, nil
}

logrus.WithError(err).Warn("Falling back to remote sync")
}

return &emptypb.Empty{}, s.fileSyncRemote(ctx, req)
}

func (s *SyncAgentServer) fileSyncLocal(ctx context.Context, req *enginerpc.FilesSyncRequest) error {
var targetPaths []string
var err error

log := logrus.WithFields(logrus.Fields{
"sourcePath": req.LocalSync.SourcePath,
"targetPath": req.LocalSync.TargetPath,
})

log.Info("Syncing files locally")

// Defer function to handle cleanup of files if an error occurs
defer func() {
if err == nil {
log.Info("Done syncing files locally")
} else {
log.WithError(err).Warn("Failed to sync files locally, reverting changes")

for _, targetPath := range targetPaths {
if removeErr := os.Remove(targetPath); removeErr != nil && removeErr != os.ErrNotExist {
log.WithError(removeErr).Warnf("Failed to remove file %v", targetPath)
}
}
}
}()

for _, info := range req.SyncFileInfoList {
sourcePath := filepath.Join("/host", req.LocalSync.SourcePath, info.FromFileName)
targetPath := filepath.Join("/host", req.LocalSync.TargetPath, info.ToFileName)
targetPaths = append(targetPaths, targetPath)

log.Tracef("Copying file %v to %v", sourcePath, targetPath)

err = sparse.SyncLocalFile(sourcePath, targetPath)
if err != nil {
return err
}

err := lhio.CheckIsFileSizeSame(sourcePath, targetPath)
c3y1huang marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return errors.Wrapf(err, "failed to check file size for file %v", sourcePath)
}
}

return nil
}

func (s *SyncAgentServer) fileSyncRemote(ctx context.Context, req *enginerpc.FilesSyncRequest) error {
// We generally don't know the from replica's instanceName since it is arbitrarily chosen from candidate addresses
// stored in the controller. Don't modify FilesSyncRequest to contain it, and create a client without it.
fromClient, err := replicaclient.NewReplicaClient(req.FromAddress, s.volumeName, "")
if err != nil {
return nil, err
return err
}
defer fromClient.Close()

var ops sparserest.SyncFileOperations
fileStub := &sparserest.SyncFileStub{}

for _, info := range req.SyncFileInfoList {
// Do not count size for disk meta file or empty disk file.
if info.ActualSize == 0 {
Expand All @@ -465,14 +528,13 @@ func (s *SyncAgentServer) FilesSync(ctx context.Context, req *enginerpc.FilesSyn

port, err := s.launchReceiver("FilesSync", info.ToFileName, ops)
if err != nil {
return nil, errors.Wrapf(err, "failed to launch receiver for file %v", info.ToFileName)
return errors.Wrapf(err, "failed to launch receiver for file %v", info.ToFileName)
}
if err := fromClient.SendFile(info.FromFileName, req.ToHost, int32(port), int(req.FileSyncHttpClientTimeout), req.FastSync, req.GrpcTimeoutSeconds); err != nil {
return nil, errors.Wrapf(err, "replica %v failed to send file %v to %v:%v", req.FromAddress, info.ToFileName, req.ToHost, port)
return errors.Wrapf(err, "replica %v failed to send file %v to %v:%v", req.FromAddress, info.ToFileName, req.ToHost, port)
}
}

return &emptypb.Empty{}, nil
return nil
}

func (s *SyncAgentServer) PrepareRebuild(list []*enginerpc.SyncFileInfo, fromReplicaAddress string) error {
Expand Down
4 changes: 2 additions & 2 deletions pkg/sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ func (t *Task) VerifyRebuildReplica(address, instanceName string) error {
return nil
}

func (t *Task) AddReplica(volumeSize, volumeCurrentSize int64, address, instanceName string, fileSyncHTTPClientTimeout int, fastSync bool, grpcTimeoutSeconds int64) error {
func (t *Task) AddReplica(volumeSize, volumeCurrentSize int64, address, instanceName string, fileSyncHTTPClientTimeout int, fastSync bool, localSync *types.FileLocalSync, grpcTimeoutSeconds int64) error {
volume, err := t.client.VolumeGet()
if err != nil {
return err
Expand Down Expand Up @@ -459,7 +459,7 @@ func (t *Task) AddReplica(volumeSize, volumeCurrentSize int64, address, instance
return fmt.Errorf("sync file list shouldn't contain volume head")
}

if err = toClient.SyncFiles(fromAddress, resp, fileSyncHTTPClientTimeout, fastSync, grpcTimeoutSeconds); err != nil {
if err = toClient.SyncFiles(fromAddress, resp, fileSyncHTTPClientTimeout, fastSync, grpcTimeoutSeconds, localSync); err != nil {
return err
}

Expand Down
5 changes: 5 additions & 0 deletions pkg/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,3 +203,8 @@ func GRPCReplicaModeToReplicaMode(replicaMode enginerpc.ReplicaMode) Mode {
}
return ERR
}

type FileLocalSync struct {
SourcePath string
TargetPath string
}
176 changes: 176 additions & 0 deletions vendor/github.com/longhorn/sparse-tools/sparse/local.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading