From 625822faf8a89a2d47fdd94367e4b6985fa92154 Mon Sep 17 00:00:00 2001 From: Idan Novogroder Date: Thu, 28 Sep 2023 06:41:38 +0300 Subject: [PATCH] Review comments --- cmd/lakectl/cmd/fs_download.go | 80 ++++++++++++++----------------- cmd/lakectl/cmd/fs_upload.go | 48 +++++++++++++++---- cmd/lakectl/cmd/local.go | 11 +++-- cmd/lakectl/cmd/local_checkout.go | 6 +-- cmd/lakectl/cmd/local_clone.go | 6 +-- cmd/lakectl/cmd/local_commit.go | 6 +-- cmd/lakectl/cmd/local_init.go | 2 +- cmd/lakectl/cmd/local_list.go | 2 +- cmd/lakectl/cmd/local_pull.go | 6 +-- cmd/lakectl/cmd/local_status.go | 2 +- docs/reference/cli.md | 15 +++--- pkg/local/sync.go | 11 ++--- 12 files changed, 110 insertions(+), 85 deletions(-) diff --git a/cmd/lakectl/cmd/fs_download.go b/cmd/lakectl/cmd/fs_download.go index 08f403bcf20..30f35c57a24 100644 --- a/cmd/lakectl/cmd/fs_download.go +++ b/cmd/lakectl/cmd/fs_download.go @@ -2,7 +2,6 @@ package cmd import ( "net/http" - "path/filepath" "strings" "github.com/go-openapi/swag" @@ -15,11 +14,6 @@ import ( const ( fsDownloadCmdMinArgs = 1 fsDownloadCmdMaxArgs = 2 - - fsDownloadParallelDefault = 6 - - fsNonRecursiveDownloadTemplate = `Successfully downloaded {{.Path}} to {{.Dest}} -` ) var fsDownloadCmd = &cobra.Command{ @@ -27,28 +21,46 @@ var fsDownloadCmd = &cobra.Command{ Short: "Download object(s) from a given repository path", Args: cobra.RangeArgs(fsDownloadCmdMinArgs, fsDownloadCmdMaxArgs), Run: func(cmd *cobra.Command, args []string) { - remote, dest := getLocalArgs(args, true, false) - flagSet := cmd.Flags() - preSignMode := Must(flagSet.GetBool("pre-sign")) - recursive := Must(flagSet.GetBool("recursive")) - parallel := Must(flagSet.GetInt("parallel")) - - if parallel < 1 { - DieFmt("Invalid value for parallel (%d), minimum is 1.\n", parallel) - } + remote, dest := getSyncArgs(args, true, false) + client := getClient() + syncFlags := getSyncFlags(cmd, client) // optional destination directory if len(args) > 1 { dest = args[1] } - client := getClient() ctx := cmd.Context() - s := local.NewSyncManager(ctx, client, parallel, preSignMode) + s := local.NewSyncManager(ctx, client, syncFlags.parallelism, syncFlags.presign) remotePath := remote.GetPath() - if recursive { - // Dynamically construct changes - ch := make(chan *local.Change, filesChanSize) + + ch := make(chan *local.Change, filesChanSize) + + stat, err := client.StatObjectWithResponse(ctx, remote.Repository, remote.Ref, &apigen.StatObjectParams{ + Path: *remote.Path, + }) + switch { + case err != nil: + DieErr(err) + case stat.JSON200 != nil: + var objName string + if strings.Contains(remotePath, uri.PathSeparator) { + lastInd := strings.LastIndex(remotePath, uri.PathSeparator) + remotePath, objName = remotePath[lastInd+len(uri.PathSeparator):], remotePath[:lastInd] + } else { + objName = "" + } + remote.Path = swag.String(objName) + ch <- &local.Change{ + Source: local.ChangeSourceRemote, + Path: remotePath, + Type: local.ChangeTypeAdded, + } + close(ch) + default: + if remotePath != "" && !strings.HasSuffix(remotePath, uri.PathSeparator) { + *remote.Path += uri.PathSeparator + } go func() { defer close(ch) var after string @@ -83,35 +95,17 @@ var fsDownloadCmd = &cobra.Command{ after = listResp.JSON200.Pagination.NextOffset } }() + } + err = s.Sync(dest, remote, ch) - err := s.Sync(dest, remote, ch) - - if err != nil { - DieErr(err) - } - } else { - objectPath, ObjectName := filepath.Split(remotePath) - *remote.Path = objectPath - - err := s.Download(ctx, dest, remote, ObjectName) - if err != nil { - DieErr(err) - } - - downloadRes := struct { - Path string - Dest string - }{remote.String() + ObjectName, dest} - Write(fsNonRecursiveDownloadTemplate, downloadRes) + if err != nil { + DieErr(err) } }, } //nolint:gochecknoinits func init() { - fsDownloadCmd.Flags().BoolP("recursive", "r", false, "recursively all objects under path") - fsDownloadCmd.Flags().IntP("parallel", "p", fsDownloadParallelDefault, "max concurrent downloads") - fsDownloadCmd.Flags().Bool("pre-sign", false, "Request pre-sign link to access the data") - + withSyncFlags(fsDownloadCmd) fsCmd.AddCommand(fsDownloadCmd) } diff --git a/cmd/lakectl/cmd/fs_upload.go b/cmd/lakectl/cmd/fs_upload.go index 01de86470fa..98b47c0f29e 100644 --- a/cmd/lakectl/cmd/fs_upload.go +++ b/cmd/lakectl/cmd/fs_upload.go @@ -2,6 +2,8 @@ package cmd import ( "context" + "os" + "path/filepath" "github.com/spf13/cobra" "github.com/treeverse/lakefs/pkg/api/apigen" @@ -13,7 +15,7 @@ import ( var fsUploadCmd = &cobra.Command{ Use: "upload ", - Short: "Upload a local file to the specified URI", + Short: "upload a local file to the specified URI", Args: cobra.ExactArgs(1), ValidArgsFunction: ValidArgsRepository, Run: func(cmd *cobra.Command, args []string) { @@ -21,16 +23,22 @@ var fsUploadCmd = &cobra.Command{ pathURI := MustParsePathURI("path", args[0]) flagSet := cmd.Flags() source := Must(flagSet.GetString("source")) - recursive := Must(flagSet.GetBool("recursive")) - preSignMode := Must(flagSet.GetBool("pre-sign")) + syncFlags := getSyncFlags(cmd, client) contentType := Must(flagSet.GetString("content-type")) ctx := cmd.Context() - if !recursive { + + info, err := os.Stat(source) + if err != nil { + DieErr(err) + return + } + + if !info.IsDir() { if pathURI.GetPath() == "" { Die("target path is not a valid URI", 1) } - stat, err := upload(ctx, client, source, pathURI, contentType, preSignMode) + stat, err := upload(ctx, client, source, pathURI, contentType, syncFlags.presign) if err != nil { DieErr(err) } @@ -38,11 +46,34 @@ var fsUploadCmd = &cobra.Command{ return } - s := local.NewSyncManager(ctx, client, 1, preSignMode) - err := s.Upload(ctx, source, pathURI, *pathURI.Path) + changes := localDiff(cmd.Context(), client, pathURI, source) + // sync changes + c := make(chan *local.Change, filesChanSize) + go func() { + defer close(c) + for _, change := range changes { + if change.Type == local.ChangeTypeRemoved { + continue + } + c <- change + } + }() + s := local.NewSyncManager(ctx, client, syncFlags.parallelism, syncFlags.presign) + currentDir, err := os.Getwd() + if err != nil { + DieErr(err) + } + err = s.Sync(filepath.Join(currentDir, source), pathURI, c) if err != nil { DieErr(err) } + Write(localSummaryTemplate, struct { + Operation string + local.Tasks + }{ + Operation: "Sync", + Tasks: s.Summary(), + }) }, } @@ -61,10 +92,9 @@ func upload(ctx context.Context, client apigen.ClientWithResponsesInterface, sou //nolint:gochecknoinits func init() { fsUploadCmd.Flags().StringP("source", "s", "", "local file to upload, or \"-\" for stdin") - fsUploadCmd.Flags().BoolP("recursive", "r", false, "recursively copy all files under local source") _ = fsUploadCmd.MarkFlagRequired("source") fsUploadCmd.Flags().StringP("content-type", "", "", "MIME type of contents") - fsUploadCmd.Flags().Bool("pre-sign", false, "Use pre-sign link to access the data") + withSyncFlags(fsUploadCmd) fsCmd.AddCommand(fsUploadCmd) } diff --git a/cmd/lakectl/cmd/local.go b/cmd/lakectl/cmd/local.go index c96372faa3c..f4b6793bbfc 100644 --- a/cmd/lakectl/cmd/local.go +++ b/cmd/lakectl/cmd/local.go @@ -63,7 +63,7 @@ func withPresignFlag(cmd *cobra.Command) { "Use pre-signed URLs when downloading/uploading data (recommended)") } -func withLocalSyncFlags(cmd *cobra.Command) { +func withSyncFlags(cmd *cobra.Command) { withParallelismFlag(cmd) withPresignFlag(cmd) } @@ -82,7 +82,7 @@ type syncFlags struct { presign bool } -func getLocalSyncFlags(cmd *cobra.Command, client *apigen.ClientWithResponses) syncFlags { +func getSyncFlags(cmd *cobra.Command, client *apigen.ClientWithResponses) syncFlags { presign := Must(cmd.Flags().GetBool(localPresignFlagName)) presignFlag := cmd.Flags().Lookup(localPresignFlagName) if !presignFlag.Changed { @@ -95,12 +95,15 @@ func getLocalSyncFlags(cmd *cobra.Command, client *apigen.ClientWithResponses) s } parallelism := Must(cmd.Flags().GetInt(localParallelismFlagName)) + if parallelism < 1 { + DieFmt("Invalid value for parallel (%d), minimum is 1.\n", parallelism) + } return syncFlags{parallelism: parallelism, presign: presign} } -// getLocalArgs parses arguments to extract a remote URI and deduces the local path. +// getSyncArgs parses arguments to extract a remote URI and deduces the local path. // If the local path isn't provided and considerGitRoot is true, it uses the git repository root. -func getLocalArgs(args []string, requireRemote bool, considerGitRoot bool) (remote *uri.URI, localPath string) { +func getSyncArgs(args []string, requireRemote bool, considerGitRoot bool) (remote *uri.URI, localPath string) { idx := 0 if requireRemote { remote = MustParsePathURI("path", args[0]) diff --git a/cmd/lakectl/cmd/local_checkout.go b/cmd/lakectl/cmd/local_checkout.go index 70855d1b3a7..9164c94c333 100644 --- a/cmd/lakectl/cmd/local_checkout.go +++ b/cmd/lakectl/cmd/local_checkout.go @@ -18,7 +18,7 @@ var localCheckoutCmd = &cobra.Command{ Run: func(cmd *cobra.Command, args []string) { specifiedRef := Must(cmd.Flags().GetString("ref")) all := Must(cmd.Flags().GetBool("all")) - _, localPath := getLocalArgs(args, false, all) + _, localPath := getSyncArgs(args, false, all) if !all { localCheckout(cmd, localPath, specifiedRef, true) return @@ -40,7 +40,7 @@ var localCheckoutCmd = &cobra.Command{ func localCheckout(cmd *cobra.Command, localPath string, specifiedRef string, confirmByFlag bool) { client := getClient() - locaSyncFlags := getLocalSyncFlags(cmd, client) + locaSyncFlags := getSyncFlags(cmd, client) idx, err := local.ReadIndex(localPath) if err != nil { if errors.Is(err, fs.ErrNotExist) { @@ -120,6 +120,6 @@ func init() { localCheckoutCmd.Flags().Bool("all", false, "Checkout given source branch or reference for all linked directories") localCheckoutCmd.MarkFlagsMutuallyExclusive("ref", "all") AssignAutoConfirmFlag(localCheckoutCmd.Flags()) - withLocalSyncFlags(localCheckoutCmd) + withSyncFlags(localCheckoutCmd) localCmd.AddCommand(localCheckoutCmd) } diff --git a/cmd/lakectl/cmd/local_clone.go b/cmd/lakectl/cmd/local_clone.go index 09c615eb79b..3927c521cdb 100644 --- a/cmd/lakectl/cmd/local_clone.go +++ b/cmd/lakectl/cmd/local_clone.go @@ -27,8 +27,8 @@ var localCloneCmd = &cobra.Command{ Args: cobra.RangeArgs(localCloneMinArgs, localCloneMaxArgs), Run: func(cmd *cobra.Command, args []string) { client := getClient() - remote, localPath := getLocalArgs(args, true, false) - syncFlags := getLocalSyncFlags(cmd, client) + remote, localPath := getSyncArgs(args, true, false) + syncFlags := getSyncFlags(cmd, client) updateIgnore := Must(cmd.Flags().GetBool(localGitIgnoreFlagName)) empty, err := fileutil.IsDirEmpty(localPath) if err != nil { @@ -106,6 +106,6 @@ var localCloneCmd = &cobra.Command{ //nolint:gochecknoinits func init() { withGitIgnoreFlag(localCloneCmd) - withLocalSyncFlags(localCloneCmd) + withSyncFlags(localCloneCmd) localCmd.AddCommand(localCloneCmd) } diff --git a/cmd/lakectl/cmd/local_commit.go b/cmd/lakectl/cmd/local_commit.go index 50fbd938be1..8c1e192c8e8 100644 --- a/cmd/lakectl/cmd/local_commit.go +++ b/cmd/lakectl/cmd/local_commit.go @@ -35,8 +35,8 @@ var localCommitCmd = &cobra.Command{ Args: localDefaultArgsRange, Run: func(cmd *cobra.Command, args []string) { client := getClient() - _, localPath := getLocalArgs(args, false, false) - syncFlags := getLocalSyncFlags(cmd, client) + _, localPath := getSyncArgs(args, false, false) + syncFlags := getSyncFlags(cmd, client) message := Must(cmd.Flags().GetString(localCommitMessageFlagName)) allowEmptyMessage := Must(cmd.Flags().GetBool(localCommitAllowEmptyMessage)) if message == "" && !allowEmptyMessage { @@ -187,6 +187,6 @@ func init() { localCommitCmd.Flags().Bool(localCommitAllowEmptyMessage, false, "Allow commit with empty message") localCommitCmd.MarkFlagsMutuallyExclusive(localCommitMessageFlagName, localCommitAllowEmptyMessage) localCommitCmd.Flags().StringSlice(metaFlagName, []string{}, "key value pair in the form of key=value") - withLocalSyncFlags(localCommitCmd) + withSyncFlags(localCommitCmd) localCmd.AddCommand(localCommitCmd) } diff --git a/cmd/lakectl/cmd/local_init.go b/cmd/lakectl/cmd/local_init.go index 5ad73cfab75..8a80e541533 100644 --- a/cmd/lakectl/cmd/local_init.go +++ b/cmd/lakectl/cmd/local_init.go @@ -92,7 +92,7 @@ var localInitCmd = &cobra.Command{ Short: "set a local directory to sync with a lakeFS path.", Args: cobra.RangeArgs(localInitMinArgs, localInitMaxArgs), Run: func(cmd *cobra.Command, args []string) { - remote, localPath := getLocalArgs(args, true, false) + remote, localPath := getSyncArgs(args, true, false) force := Must(cmd.Flags().GetBool(localForceFlagName)) updateIgnore := Must(cmd.Flags().GetBool(localGitIgnoreFlagName)) _, err := localInit(cmd.Context(), localPath, remote, force, updateIgnore) diff --git a/cmd/lakectl/cmd/local_list.go b/cmd/lakectl/cmd/local_list.go index 71696caebc3..327a1aedbd8 100644 --- a/cmd/lakectl/cmd/local_list.go +++ b/cmd/lakectl/cmd/local_list.go @@ -17,7 +17,7 @@ var localListCmd = &cobra.Command{ Short: "find and list directories that are synced with lakeFS.", Args: localDefaultArgsRange, Run: func(cmd *cobra.Command, args []string) { - _, localPath := getLocalArgs(args, false, true) + _, localPath := getSyncArgs(args, false, true) dirs, err := local.FindIndices(localPath) if err != nil { diff --git a/cmd/lakectl/cmd/local_pull.go b/cmd/lakectl/cmd/local_pull.go index 9586593f89b..1d81f3d3701 100644 --- a/cmd/lakectl/cmd/local_pull.go +++ b/cmd/lakectl/cmd/local_pull.go @@ -18,9 +18,9 @@ var localPullCmd = &cobra.Command{ Args: localDefaultArgsRange, Run: func(cmd *cobra.Command, args []string) { client := getClient() - _, localPath := getLocalArgs(args, false, false) + _, localPath := getSyncArgs(args, false, false) force := Must(cmd.Flags().GetBool(localForceFlagName)) - syncFlags := getLocalSyncFlags(cmd, client) + syncFlags := getSyncFlags(cmd, client) idx, err := local.ReadIndex(localPath) if err != nil { DieErr(err) @@ -89,6 +89,6 @@ var localPullCmd = &cobra.Command{ //nolint:gochecknoinits func init() { withForceFlag(localPullCmd, "Reset any uncommitted local change") - withLocalSyncFlags(localPullCmd) + withSyncFlags(localPullCmd) localCmd.AddCommand(localPullCmd) } diff --git a/cmd/lakectl/cmd/local_status.go b/cmd/lakectl/cmd/local_status.go index 4a1c3a281c4..476c6821f8d 100644 --- a/cmd/lakectl/cmd/local_status.go +++ b/cmd/lakectl/cmd/local_status.go @@ -19,7 +19,7 @@ var localStatusCmd = &cobra.Command{ Short: "show modifications (both remote and local) to the directory and the remote location it tracks", Args: localDefaultArgsRange, Run: func(cmd *cobra.Command, args []string) { - _, localPath := getLocalArgs(args, false, false) + _, localPath := getSyncArgs(args, false, false) abs, err := filepath.Abs(localPath) if err != nil { DieErr(err) diff --git a/docs/reference/cli.md b/docs/reference/cli.md index edbf286f67b..5c6d47b8d5b 100644 --- a/docs/reference/cli.md +++ b/docs/reference/cli.md @@ -2120,7 +2120,7 @@ lakectl fs cat [flags] ### lakectl fs download -Download object(s) from a given repository path +download object(s) from a given repository path ``` lakectl fs download [] [flags] @@ -2130,10 +2130,9 @@ lakectl fs download [] [flags] {:.no_toc} ``` - -h, --help help for download - -p, --parallel int max concurrent downloads (default 6) - --pre-sign Request pre-sign link to access the data - -r, --recursive recursively all objects under path + -h, --help help for download + -p, --parallelism int Max concurrent operations to perform (default 25) + --pre-sign Use pre-signed URLs when downloading/uploading data (recommended) (default true) ``` @@ -2244,7 +2243,7 @@ lakectl fs stat [flags] ### lakectl fs upload -Upload a local file to the specified URI +upload a local file to the specified URI ``` lakectl fs upload [flags] @@ -2256,8 +2255,8 @@ lakectl fs upload [flags] ``` --content-type string MIME type of contents -h, --help help for upload - --pre-sign Use pre-sign link to access the data - -r, --recursive recursively copy all files under local source + -p, --parallelism int Max concurrent operations to perform (default 25) + --pre-sign Use pre-signed URLs when downloading/uploading data (recommended) (default true) -s, --source string local file to upload, or "-" for stdin ``` diff --git a/pkg/local/sync.go b/pkg/local/sync.go index e145e3e9534..3df80770dd8 100644 --- a/pkg/local/sync.go +++ b/pkg/local/sync.go @@ -98,12 +98,12 @@ func (s *SyncManager) apply(ctx context.Context, rootPath string, remote *uri.UR switch change.Source { case ChangeSourceRemote: // remotely changed something, download it! - if err := s.Download(ctx, rootPath, remote, change.Path); err != nil { + if err := s.download(ctx, rootPath, remote, change.Path); err != nil { return fmt.Errorf("download %s failed: %w", change.Path, err) } case ChangeSourceLocal: - // we wrote something, Upload it! - if err := s.Upload(ctx, rootPath, remote, change.Path); err != nil { + // we wrote something, upload it! + if err := s.upload(ctx, rootPath, remote, change.Path); err != nil { return fmt.Errorf("upload %s failed: %w", change.Path, err) } default: @@ -129,7 +129,7 @@ func (s *SyncManager) apply(ctx context.Context, rootPath string, remote *uri.UR return nil } -func (s *SyncManager) Download(ctx context.Context, rootPath string, remote *uri.URI, path string) error { +func (s *SyncManager) download(ctx context.Context, rootPath string, remote *uri.URI, path string) error { if err := fileutil.VerifyRelPath(strings.TrimPrefix(path, uri.PathSeparator), rootPath); err != nil { return err } @@ -138,7 +138,6 @@ func (s *SyncManager) Download(ctx context.Context, rootPath string, remote *uri if err := os.MkdirAll(destinationDirectory, DefaultDirectoryMask); err != nil { return err } - statResp, err := s.client.StatObjectWithResponse(ctx, remote.Repository, remote.Ref, &apigen.StatObjectParams{ Path: filepath.ToSlash(filepath.Join(remote.GetPath(), path)), Presign: swag.Bool(s.presign), @@ -235,7 +234,7 @@ func (s *SyncManager) Download(ctx context.Context, rootPath string, remote *uri return err } -func (s *SyncManager) Upload(ctx context.Context, rootPath string, remote *uri.URI, path string) error { +func (s *SyncManager) upload(ctx context.Context, rootPath string, remote *uri.URI, path string) error { source := filepath.Join(rootPath, path) if err := fileutil.VerifySafeFilename(source); err != nil { return err