Skip to content

Commit

Permalink
Review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
idanovo committed Sep 28, 2023
1 parent c1f9224 commit 625822f
Show file tree
Hide file tree
Showing 12 changed files with 110 additions and 85 deletions.
80 changes: 37 additions & 43 deletions cmd/lakectl/cmd/fs_download.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package cmd

import (
"net/http"
"path/filepath"
"strings"

"github.com/go-openapi/swag"
Expand All @@ -15,40 +14,53 @@ import (
const (
fsDownloadCmdMinArgs = 1
fsDownloadCmdMaxArgs = 2

fsDownloadParallelDefault = 6

fsNonRecursiveDownloadTemplate = `Successfully downloaded {{.Path}} to {{.Dest}}
`
)

var fsDownloadCmd = &cobra.Command{
Use: "download <path uri> [<destination path>]",
Short: "Download object(s) from a given repository path",
Args: cobra.RangeArgs(fsDownloadCmdMinArgs, fsDownloadCmdMaxArgs),
Run: func(cmd *cobra.Command, args []string) {
remote, dest := getLocalArgs(args, true, false)
flagSet := cmd.Flags()
preSignMode := Must(flagSet.GetBool("pre-sign"))
recursive := Must(flagSet.GetBool("recursive"))
parallel := Must(flagSet.GetInt("parallel"))

if parallel < 1 {
DieFmt("Invalid value for parallel (%d), minimum is 1.\n", parallel)
}
remote, dest := getSyncArgs(args, true, false)
client := getClient()
syncFlags := getSyncFlags(cmd, client)

// optional destination directory
if len(args) > 1 {
dest = args[1]
}

client := getClient()
ctx := cmd.Context()
s := local.NewSyncManager(ctx, client, parallel, preSignMode)
s := local.NewSyncManager(ctx, client, syncFlags.parallelism, syncFlags.presign)
remotePath := remote.GetPath()
if recursive {
// Dynamically construct changes
ch := make(chan *local.Change, filesChanSize)

ch := make(chan *local.Change, filesChanSize)

stat, err := client.StatObjectWithResponse(ctx, remote.Repository, remote.Ref, &apigen.StatObjectParams{
Path: *remote.Path,
})
switch {
case err != nil:
DieErr(err)
case stat.JSON200 != nil:
var objName string
if strings.Contains(remotePath, uri.PathSeparator) {
lastInd := strings.LastIndex(remotePath, uri.PathSeparator)
remotePath, objName = remotePath[lastInd+len(uri.PathSeparator):], remotePath[:lastInd]
} else {
objName = ""
}
remote.Path = swag.String(objName)
ch <- &local.Change{
Source: local.ChangeSourceRemote,
Path: remotePath,
Type: local.ChangeTypeAdded,
}
close(ch)
default:
if remotePath != "" && !strings.HasSuffix(remotePath, uri.PathSeparator) {
*remote.Path += uri.PathSeparator
}
go func() {
defer close(ch)
var after string
Expand Down Expand Up @@ -83,35 +95,17 @@ var fsDownloadCmd = &cobra.Command{
after = listResp.JSON200.Pagination.NextOffset
}
}()
}
err = s.Sync(dest, remote, ch)

err := s.Sync(dest, remote, ch)

if err != nil {
DieErr(err)
}
} else {
objectPath, ObjectName := filepath.Split(remotePath)
*remote.Path = objectPath

err := s.Download(ctx, dest, remote, ObjectName)
if err != nil {
DieErr(err)
}

downloadRes := struct {
Path string
Dest string
}{remote.String() + ObjectName, dest}
Write(fsNonRecursiveDownloadTemplate, downloadRes)
if err != nil {
DieErr(err)
}
},
}

//nolint:gochecknoinits
func init() {
fsDownloadCmd.Flags().BoolP("recursive", "r", false, "recursively all objects under path")
fsDownloadCmd.Flags().IntP("parallel", "p", fsDownloadParallelDefault, "max concurrent downloads")
fsDownloadCmd.Flags().Bool("pre-sign", false, "Request pre-sign link to access the data")

withSyncFlags(fsDownloadCmd)
fsCmd.AddCommand(fsDownloadCmd)
}
48 changes: 39 additions & 9 deletions cmd/lakectl/cmd/fs_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package cmd

import (
"context"
"os"
"path/filepath"

"github.com/spf13/cobra"
"github.com/treeverse/lakefs/pkg/api/apigen"
Expand All @@ -13,36 +15,65 @@ import (

var fsUploadCmd = &cobra.Command{
Use: "upload <path uri>",
Short: "Upload a local file to the specified URI",
Short: "upload a local file to the specified URI",
Args: cobra.ExactArgs(1),
ValidArgsFunction: ValidArgsRepository,
Run: func(cmd *cobra.Command, args []string) {
client := getClient()
pathURI := MustParsePathURI("path", args[0])
flagSet := cmd.Flags()
source := Must(flagSet.GetString("source"))
recursive := Must(flagSet.GetBool("recursive"))
preSignMode := Must(flagSet.GetBool("pre-sign"))
syncFlags := getSyncFlags(cmd, client)
contentType := Must(flagSet.GetString("content-type"))

ctx := cmd.Context()
if !recursive {

info, err := os.Stat(source)
if err != nil {
DieErr(err)
return
}

if !info.IsDir() {
if pathURI.GetPath() == "" {
Die("target path is not a valid URI", 1)
}
stat, err := upload(ctx, client, source, pathURI, contentType, preSignMode)
stat, err := upload(ctx, client, source, pathURI, contentType, syncFlags.presign)
if err != nil {
DieErr(err)
}
Write(fsStatTemplate, stat)
return
}

s := local.NewSyncManager(ctx, client, 1, preSignMode)
err := s.Upload(ctx, source, pathURI, *pathURI.Path)
changes := localDiff(cmd.Context(), client, pathURI, source)
// sync changes
c := make(chan *local.Change, filesChanSize)
go func() {
defer close(c)
for _, change := range changes {
if change.Type == local.ChangeTypeRemoved {
continue
}
c <- change
}
}()
s := local.NewSyncManager(ctx, client, syncFlags.parallelism, syncFlags.presign)
currentDir, err := os.Getwd()
if err != nil {
DieErr(err)
}
err = s.Sync(filepath.Join(currentDir, source), pathURI, c)
if err != nil {
DieErr(err)
}
Write(localSummaryTemplate, struct {
Operation string
local.Tasks
}{
Operation: "Sync",
Tasks: s.Summary(),
})
},
}

Expand All @@ -61,10 +92,9 @@ func upload(ctx context.Context, client apigen.ClientWithResponsesInterface, sou
//nolint:gochecknoinits
func init() {
fsUploadCmd.Flags().StringP("source", "s", "", "local file to upload, or \"-\" for stdin")
fsUploadCmd.Flags().BoolP("recursive", "r", false, "recursively copy all files under local source")
_ = fsUploadCmd.MarkFlagRequired("source")
fsUploadCmd.Flags().StringP("content-type", "", "", "MIME type of contents")
fsUploadCmd.Flags().Bool("pre-sign", false, "Use pre-sign link to access the data")
withSyncFlags(fsUploadCmd)

fsCmd.AddCommand(fsUploadCmd)
}
11 changes: 7 additions & 4 deletions cmd/lakectl/cmd/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func withPresignFlag(cmd *cobra.Command) {
"Use pre-signed URLs when downloading/uploading data (recommended)")
}

func withLocalSyncFlags(cmd *cobra.Command) {
func withSyncFlags(cmd *cobra.Command) {
withParallelismFlag(cmd)
withPresignFlag(cmd)
}
Expand All @@ -82,7 +82,7 @@ type syncFlags struct {
presign bool
}

func getLocalSyncFlags(cmd *cobra.Command, client *apigen.ClientWithResponses) syncFlags {
func getSyncFlags(cmd *cobra.Command, client *apigen.ClientWithResponses) syncFlags {
presign := Must(cmd.Flags().GetBool(localPresignFlagName))
presignFlag := cmd.Flags().Lookup(localPresignFlagName)
if !presignFlag.Changed {
Expand All @@ -95,12 +95,15 @@ func getLocalSyncFlags(cmd *cobra.Command, client *apigen.ClientWithResponses) s
}

parallelism := Must(cmd.Flags().GetInt(localParallelismFlagName))
if parallelism < 1 {
DieFmt("Invalid value for parallel (%d), minimum is 1.\n", parallelism)
}
return syncFlags{parallelism: parallelism, presign: presign}
}

// getLocalArgs parses arguments to extract a remote URI and deduces the local path.
// getSyncArgs parses arguments to extract a remote URI and deduces the local path.
// If the local path isn't provided and considerGitRoot is true, it uses the git repository root.
func getLocalArgs(args []string, requireRemote bool, considerGitRoot bool) (remote *uri.URI, localPath string) {
func getSyncArgs(args []string, requireRemote bool, considerGitRoot bool) (remote *uri.URI, localPath string) {
idx := 0
if requireRemote {
remote = MustParsePathURI("path", args[0])
Expand Down
6 changes: 3 additions & 3 deletions cmd/lakectl/cmd/local_checkout.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var localCheckoutCmd = &cobra.Command{
Run: func(cmd *cobra.Command, args []string) {
specifiedRef := Must(cmd.Flags().GetString("ref"))
all := Must(cmd.Flags().GetBool("all"))
_, localPath := getLocalArgs(args, false, all)
_, localPath := getSyncArgs(args, false, all)
if !all {
localCheckout(cmd, localPath, specifiedRef, true)
return
Expand All @@ -40,7 +40,7 @@ var localCheckoutCmd = &cobra.Command{

func localCheckout(cmd *cobra.Command, localPath string, specifiedRef string, confirmByFlag bool) {
client := getClient()
locaSyncFlags := getLocalSyncFlags(cmd, client)
locaSyncFlags := getSyncFlags(cmd, client)
idx, err := local.ReadIndex(localPath)
if err != nil {
if errors.Is(err, fs.ErrNotExist) {
Expand Down Expand Up @@ -120,6 +120,6 @@ func init() {
localCheckoutCmd.Flags().Bool("all", false, "Checkout given source branch or reference for all linked directories")
localCheckoutCmd.MarkFlagsMutuallyExclusive("ref", "all")
AssignAutoConfirmFlag(localCheckoutCmd.Flags())
withLocalSyncFlags(localCheckoutCmd)
withSyncFlags(localCheckoutCmd)
localCmd.AddCommand(localCheckoutCmd)
}
6 changes: 3 additions & 3 deletions cmd/lakectl/cmd/local_clone.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ var localCloneCmd = &cobra.Command{
Args: cobra.RangeArgs(localCloneMinArgs, localCloneMaxArgs),
Run: func(cmd *cobra.Command, args []string) {
client := getClient()
remote, localPath := getLocalArgs(args, true, false)
syncFlags := getLocalSyncFlags(cmd, client)
remote, localPath := getSyncArgs(args, true, false)
syncFlags := getSyncFlags(cmd, client)
updateIgnore := Must(cmd.Flags().GetBool(localGitIgnoreFlagName))
empty, err := fileutil.IsDirEmpty(localPath)
if err != nil {
Expand Down Expand Up @@ -106,6 +106,6 @@ var localCloneCmd = &cobra.Command{
//nolint:gochecknoinits
func init() {
withGitIgnoreFlag(localCloneCmd)
withLocalSyncFlags(localCloneCmd)
withSyncFlags(localCloneCmd)
localCmd.AddCommand(localCloneCmd)
}
6 changes: 3 additions & 3 deletions cmd/lakectl/cmd/local_commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ var localCommitCmd = &cobra.Command{
Args: localDefaultArgsRange,
Run: func(cmd *cobra.Command, args []string) {
client := getClient()
_, localPath := getLocalArgs(args, false, false)
syncFlags := getLocalSyncFlags(cmd, client)
_, localPath := getSyncArgs(args, false, false)
syncFlags := getSyncFlags(cmd, client)
message := Must(cmd.Flags().GetString(localCommitMessageFlagName))
allowEmptyMessage := Must(cmd.Flags().GetBool(localCommitAllowEmptyMessage))
if message == "" && !allowEmptyMessage {
Expand Down Expand Up @@ -187,6 +187,6 @@ func init() {
localCommitCmd.Flags().Bool(localCommitAllowEmptyMessage, false, "Allow commit with empty message")
localCommitCmd.MarkFlagsMutuallyExclusive(localCommitMessageFlagName, localCommitAllowEmptyMessage)
localCommitCmd.Flags().StringSlice(metaFlagName, []string{}, "key value pair in the form of key=value")
withLocalSyncFlags(localCommitCmd)
withSyncFlags(localCommitCmd)
localCmd.AddCommand(localCommitCmd)
}
2 changes: 1 addition & 1 deletion cmd/lakectl/cmd/local_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ var localInitCmd = &cobra.Command{
Short: "set a local directory to sync with a lakeFS path.",
Args: cobra.RangeArgs(localInitMinArgs, localInitMaxArgs),
Run: func(cmd *cobra.Command, args []string) {
remote, localPath := getLocalArgs(args, true, false)
remote, localPath := getSyncArgs(args, true, false)
force := Must(cmd.Flags().GetBool(localForceFlagName))
updateIgnore := Must(cmd.Flags().GetBool(localGitIgnoreFlagName))
_, err := localInit(cmd.Context(), localPath, remote, force, updateIgnore)
Expand Down
2 changes: 1 addition & 1 deletion cmd/lakectl/cmd/local_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ var localListCmd = &cobra.Command{
Short: "find and list directories that are synced with lakeFS.",
Args: localDefaultArgsRange,
Run: func(cmd *cobra.Command, args []string) {
_, localPath := getLocalArgs(args, false, true)
_, localPath := getSyncArgs(args, false, true)

dirs, err := local.FindIndices(localPath)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions cmd/lakectl/cmd/local_pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ var localPullCmd = &cobra.Command{
Args: localDefaultArgsRange,
Run: func(cmd *cobra.Command, args []string) {
client := getClient()
_, localPath := getLocalArgs(args, false, false)
_, localPath := getSyncArgs(args, false, false)
force := Must(cmd.Flags().GetBool(localForceFlagName))
syncFlags := getLocalSyncFlags(cmd, client)
syncFlags := getSyncFlags(cmd, client)
idx, err := local.ReadIndex(localPath)
if err != nil {
DieErr(err)
Expand Down Expand Up @@ -89,6 +89,6 @@ var localPullCmd = &cobra.Command{
//nolint:gochecknoinits
func init() {
withForceFlag(localPullCmd, "Reset any uncommitted local change")
withLocalSyncFlags(localPullCmd)
withSyncFlags(localPullCmd)
localCmd.AddCommand(localPullCmd)
}
2 changes: 1 addition & 1 deletion cmd/lakectl/cmd/local_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ var localStatusCmd = &cobra.Command{
Short: "show modifications (both remote and local) to the directory and the remote location it tracks",
Args: localDefaultArgsRange,
Run: func(cmd *cobra.Command, args []string) {
_, localPath := getLocalArgs(args, false, false)
_, localPath := getSyncArgs(args, false, false)
abs, err := filepath.Abs(localPath)
if err != nil {
DieErr(err)
Expand Down
Loading

0 comments on commit 625822f

Please sign in to comment.