Skip to content

Commit

Permalink
Remove lakectl direct deprecated functionality (#6623)
Browse files Browse the repository at this point in the history
* Remove lakectl direct deprecated functionality

* fix and remove more unused code

* More dead code and code style

* apply code review
  • Loading branch information
nopcoder authored Sep 20, 2023
1 parent 91da86a commit bbe5f47
Show file tree
Hide file tree
Showing 50 changed files with 325 additions and 712 deletions.
2 changes: 0 additions & 2 deletions .github/workflows/esti.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,6 @@ jobs:
LAKEFS_BLOCKSTORE_S3_CREDENTIALS_SECRET_ACCESS_KEY: ${{ secrets.ESTI_AWS_SECRET_ACCESS_KEY }}
LAKEFS_DATABASE_TYPE: dynamodb
DOCKER_REG: ${{ steps.login-ecr.outputs.registry }}
ESTI_TEST_DATA_ACCESS: true,false
ESTI_BLOCKSTORE_TYPE: s3
ESTI_STORAGE_NAMESPACE: s3://esti-system-testing/${{ github.run_number }}/${{ steps.unique.outputs.value }}
ESTI_AWS_ACCESS_KEY_ID: ${{ secrets.ESTI_AWS_ACCESS_KEY_ID }}
Expand Down Expand Up @@ -838,7 +837,6 @@ jobs:
LAKEFS_BLOCKSTORE_S3_CREDENTIALS_SECRET_ACCESS_KEY: ${{ secrets.ESTI_AWS_SECRET_ACCESS_KEY }}
LAKEFS_DATABASE_TYPE: postgres
DOCKER_REG: ${{ steps.login-ecr.outputs.registry }}
ESTI_TEST_DATA_ACCESS: true,false
ESTI_BLOCKSTORE_TYPE: s3
ESTI_STORAGE_NAMESPACE: s3://esti-system-testing/${{ github.run_number }}/${{ steps.unique.outputs.value }}
ESTI_AWS_ACCESS_KEY_ID: ${{ secrets.ESTI_AWS_ACCESS_KEY_ID }}
Expand Down
20 changes: 0 additions & 20 deletions cmd/lakectl/cmd/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,8 @@ import (
"github.com/spf13/cobra"
)

type transportMethod int

const (
transportMethodDefault = iota
transportMethodDirect
transportMethodPreSign
)

var ErrRequestFailed = errors.New("request failed")

func transportMethodFromFlags(direct bool, preSign bool) transportMethod {
switch {
case direct && preSign:
Die("Can't enable both direct and pre-sign", 1)
case direct:
return transportMethodDirect
case preSign:
return transportMethodPreSign
}
return transportMethodDefault
}

// fsCmd represents the fs command
var fsCmd = &cobra.Command{
Use: "fs",
Expand Down
27 changes: 7 additions & 20 deletions cmd/lakectl/cmd/fs_cat.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/go-openapi/swag"
"github.com/spf13/cobra"
"github.com/treeverse/lakefs/pkg/api/apigen"
"github.com/treeverse/lakefs/pkg/api/helpers"
)

var fsCatCmd = &cobra.Command{
Expand All @@ -19,25 +18,18 @@ var fsCatCmd = &cobra.Command{
Run: func(cmd *cobra.Command, args []string) {
pathURI := MustParsePathURI("path", args[0])
flagSet := cmd.Flags()
direct := Must(flagSet.GetBool("direct"))
preSignMode := Must(flagSet.GetBool("pre-sign"))
transport := transportMethodFromFlags(direct, preSignMode)

var err error
var body io.ReadCloser
client := getClient()
if transport == transportMethodDirect {
_, body, err = helpers.ClientDownload(cmd.Context(), client, pathURI.Repository, pathURI.Ref, *pathURI.Path)
} else {
preSign := swag.Bool(transport == transportMethodPreSign)
var resp *http.Response
resp, err = client.GetObject(cmd.Context(), pathURI.Repository, pathURI.Ref, &apigen.GetObjectParams{
Path: *pathURI.Path,
Presign: preSign,
})
DieOnHTTPError(resp)
body = resp.Body
}
var resp *http.Response
resp, err = client.GetObject(cmd.Context(), pathURI.Repository, pathURI.Ref, &apigen.GetObjectParams{
Path: *pathURI.Path,
Presign: swag.Bool(preSignMode),
})
DieOnHTTPError(resp)
body = resp.Body
if err != nil {
DieErr(err)
}
Expand All @@ -56,11 +48,6 @@ var fsCatCmd = &cobra.Command{

//nolint:gochecknoinits
func init() {
fsCatCmd.Flags().BoolP("direct", "d", false, "read directly from backing store (faster but requires more credentials)")
err := fsCatCmd.Flags().MarkDeprecated("direct", "use --pre-sign instead")
if err != nil {
DieErr(err)
}
fsCatCmd.Flags().Bool("pre-sign", false, "Use pre-sign link to access the data")

fsCmd.AddCommand(fsCatCmd)
Expand Down
28 changes: 5 additions & 23 deletions cmd/lakectl/cmd/fs_download.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/spf13/cobra"
"github.com/treeverse/lakefs/pkg/api/apigen"
"github.com/treeverse/lakefs/pkg/api/apiutil"
"github.com/treeverse/lakefs/pkg/api/helpers"
"github.com/treeverse/lakefs/pkg/uri"
)

Expand All @@ -33,11 +32,9 @@ var fsDownloadCmd = &cobra.Command{
Run: func(cmd *cobra.Command, args []string) {
pathURI := MustParsePathURI("path", args[0])
flagSet := cmd.Flags()
direct := Must(flagSet.GetBool("direct"))
preSignMode := Must(flagSet.GetBool("pre-sign"))
recursive := Must(flagSet.GetBool("recursive"))
parallel := Must(flagSet.GetInt("parallel"))
transport := transportMethodFromFlags(direct, preSignMode)

if parallel < 1 {
DieFmt("Invalid value for parallel (%d), minimum is 1.\n", parallel)
Expand Down Expand Up @@ -92,7 +89,7 @@ var fsDownloadCmd = &cobra.Command{
}
// destination is without the source URI
dst := filepath.Join(dest, strings.TrimPrefix(downloadPath, prefix))
err := downloadHelper(ctx, client, transport, src, dst)
err := downloadHelper(ctx, client, preSignMode, src, dst)
if err == nil {
fmt.Printf("Successfully downloaded %s to %s\n", src.String(), dst)
} else {
Expand Down Expand Up @@ -133,8 +130,8 @@ func listRecursiveHelper(ctx context.Context, client *apigen.ClientWithResponses
}
}

func downloadHelper(ctx context.Context, client *apigen.ClientWithResponses, method transportMethod, src uri.URI, dst string) error {
body, err := getObjectHelper(ctx, client, method, src)
func downloadHelper(ctx context.Context, client *apigen.ClientWithResponses, preSign bool, src uri.URI, dst string) error {
body, err := getObjectHelper(ctx, client, preSign, src)
if err != nil {
return err
}
Expand All @@ -154,21 +151,11 @@ func downloadHelper(ctx context.Context, client *apigen.ClientWithResponses, met
return err
}

func getObjectHelper(ctx context.Context, client *apigen.ClientWithResponses, method transportMethod, src uri.URI) (io.ReadCloser, error) {
if method == transportMethodDirect {
// download directly from storage
_, body, err := helpers.ClientDownload(ctx, client, src.Repository, src.Ref, *src.Path)
if err != nil {
return nil, err
}
return body, nil
}

func getObjectHelper(ctx context.Context, client *apigen.ClientWithResponses, preSign bool, src uri.URI) (io.ReadCloser, error) {
// download from lakefs
preSign := swag.Bool(method == transportMethodPreSign)
resp, err := client.GetObject(ctx, src.Repository, src.Ref, &apigen.GetObjectParams{
Path: *src.Path,
Presign: preSign,
Presign: swag.Bool(preSign),
})
if err != nil {
return nil, err
Expand All @@ -182,11 +169,6 @@ func getObjectHelper(ctx context.Context, client *apigen.ClientWithResponses, me

//nolint:gochecknoinits
func init() {
fsDownloadCmd.Flags().BoolP("direct", "d", false, "read directly from backing store (requires credentials)")
err := fsDownloadCmd.Flags().MarkDeprecated("direct", "use --pre-sign instead")
if err != nil {
DieErr(err)
}
fsDownloadCmd.Flags().BoolP("recursive", "r", false, "recursively all objects under path")
fsDownloadCmd.Flags().IntP("parallel", "p", fsDownloadParallelDefault, "max concurrent downloads")
fsDownloadCmd.Flags().Bool("pre-sign", false, "Request pre-sign link to access the data")
Expand Down
23 changes: 5 additions & 18 deletions cmd/lakectl/cmd/fs_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,15 @@ var fsUploadCmd = &cobra.Command{
flagSet := cmd.Flags()
source := Must(flagSet.GetString("source"))
recursive := Must(flagSet.GetBool("recursive"))
direct := Must(flagSet.GetBool("direct"))
preSignMode := Must(flagSet.GetBool("pre-sign"))
contentType := Must(flagSet.GetString("content-type"))

ctx := cmd.Context()
transport := transportMethodFromFlags(direct, preSignMode)
if !recursive {
if pathURI.GetPath() == "" {
Die("target path is not a valid URI", 1)
}
stat, err := upload(ctx, client, source, pathURI, contentType, transport)
stat, err := upload(ctx, client, source, pathURI, contentType, preSignMode)
if err != nil {
DieErr(err)
}
Expand All @@ -64,7 +62,7 @@ var fsUploadCmd = &cobra.Command{
uri := *pathURI
p := filepath.ToSlash(filepath.Join(*uri.Path, relPath))
uri.Path = &p
stat, err := upload(ctx, client, path, &uri, contentType, transport)
stat, err := upload(ctx, client, path, &uri, contentType, preSignMode)
if err != nil {
return fmt.Errorf("upload %s: %w", path, err)
}
Expand All @@ -81,33 +79,22 @@ var fsUploadCmd = &cobra.Command{
},
}

func upload(ctx context.Context, client apigen.ClientWithResponsesInterface, sourcePathname string, destURI *uri.URI, contentType string, method transportMethod) (*apigen.ObjectStats, error) {
func upload(ctx context.Context, client apigen.ClientWithResponsesInterface, sourcePathname string, destURI *uri.URI, contentType string, preSign bool) (*apigen.ObjectStats, error) {
fp := Must(OpenByPath(sourcePathname))
defer func() {
_ = fp.Close()
}()
objectPath := apiutil.Value(destURI.Path)
switch method {
case transportMethodDefault:
return helpers.ClientUpload(ctx, client, destURI.Repository, destURI.Ref, objectPath, nil, contentType, fp)
case transportMethodDirect:
return helpers.ClientUploadDirect(ctx, client, destURI.Repository, destURI.Ref, objectPath, nil, contentType, fp)
case transportMethodPreSign:
if preSign {
return helpers.ClientUploadPreSign(ctx, client, destURI.Repository, destURI.Ref, objectPath, nil, contentType, fp)
default:
panic("unsupported upload method")
}
return helpers.ClientUpload(ctx, client, destURI.Repository, destURI.Ref, objectPath, nil, contentType, fp)
}

//nolint:gochecknoinits
func init() {
fsUploadCmd.Flags().StringP("source", "s", "", "local file to upload, or \"-\" for stdin")
fsUploadCmd.Flags().BoolP("recursive", "r", false, "recursively copy all files under local source")
fsUploadCmd.Flags().BoolP("direct", "d", false, "write directly to backing store (faster but requires more credentials)")
err := fsUploadCmd.Flags().MarkDeprecated("direct", "use --pre-sign instead")
if err != nil {
DieErr(err)
}
_ = fsUploadCmd.MarkFlagRequired("source")
fsUploadCmd.Flags().StringP("content-type", "", "", "MIME type of contents")
fsUploadCmd.Flags().Bool("pre-sign", false, "Use pre-sign link to access the data")
Expand Down
2 changes: 1 addition & 1 deletion docs/howto/deploy/aws.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ Checkout Nginx [documentation](https://kubernetes.github.io/ingress-nginx/user-g
to use lakeFS to upload or download objects. Specifically you won't be able to:
* Upload objects using the lakeFS GUI (**Works with presign mode**)
* Upload objects through Spark using the S3 gateway
* Run `lakectl fs` commands (unless using the `--direct` flag or using **presign mode** with `--pre-sign` flag)
* Run `lakectl fs` commands (unless using **presign mode** with `--pre-sign` flag)
* Use [Actions and Hooks](/howto/hooks/)

```json
Expand Down
9 changes: 5 additions & 4 deletions pkg/api/helpers/adapters.go → esti/adapters_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package helpers
package esti

import (
"context"
Expand All @@ -12,6 +12,7 @@ import (
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go/aws"
"github.com/treeverse/lakefs/pkg/api/helpers"
)

// ObjectStats metadata of an object stored on a backing store.
Expand Down Expand Up @@ -42,7 +43,7 @@ type AdapterFactory map[string]func() (ClientAdapter, error)
func NewAdapter(protocol string) (ClientAdapter, error) {
factory, ok := adapterFactory[protocol]
if !ok {
return nil, ErrUnsupportedProtocol
return nil, helpers.ErrUnsupportedProtocol
}
return factory()
}
Expand All @@ -67,7 +68,7 @@ func newS3Adapter() (ClientAdapter, error) {

func (s *s3Adapter) Upload(ctx context.Context, physicalAddress *url.URL, contents io.ReadSeeker) (ObjectStats, error) {
if physicalAddress.Scheme != s3Scheme {
return ObjectStats{}, fmt.Errorf("%s: %w", s3Scheme, ErrUnsupportedProtocol)
return ObjectStats{}, fmt.Errorf("%s: %w", s3Scheme, helpers.ErrUnsupportedProtocol)
}

key := strings.TrimPrefix(physicalAddress.Path, "/")
Expand All @@ -94,7 +95,7 @@ func (s *s3Adapter) Upload(ctx context.Context, physicalAddress *url.URL, conten

func (s *s3Adapter) Download(ctx context.Context, physicalAddress *url.URL) (io.ReadCloser, error) {
if physicalAddress.Scheme != s3Scheme {
return nil, fmt.Errorf("%s: %w", s3Scheme, ErrUnsupportedProtocol)
return nil, fmt.Errorf("%s: %w", s3Scheme, helpers.ErrUnsupportedProtocol)
}
// TODO(ariels): Allow customization of request
bucket := physicalAddress.Hostname()
Expand Down
6 changes: 3 additions & 3 deletions esti/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestAdminPermissions(t *testing.T) {
ctx, _, repo := setupTest(t)
defer tearDownTest(repo)

// creating new group should succeed
// creating a new group should succeed
const gid = "TestGroup"
resCreateGroup, err := client.CreateGroupWithResponse(ctx, apigen.CreateGroupJSONRequestBody{
Id: gid,
Expand Down Expand Up @@ -210,13 +210,13 @@ func newClientFromGroup(t *testing.T, context context.Context, logger logging.Lo

// Tests merge with different clients
func mergeAuthTest(t *testing.T, cli *apigen.ClientWithResponses, ctx context.Context, repo string, branch string) (*apigen.MergeIntoBranchResponse, error) {
uploadFileRandomData(ctx, t, repo, mainBranch, "README", false)
uploadFileRandomData(ctx, t, repo, mainBranch, "README")

resMainCommit, err := cli.CommitWithResponse(ctx, repo, mainBranch, &apigen.CommitParams{}, apigen.CommitJSONRequestBody{Message: "Initial content"})
require.NoError(t, err, "failed to commit initial content in merge auth test")
require.Equal(t, http.StatusCreated, resMainCommit.StatusCode())

uploadFileRandomData(ctx, t, repo, branch, "foo.txt", false)
uploadFileRandomData(ctx, t, repo, branch, "foo.txt")

resBranchCommit, err := cli.CommitWithResponse(ctx, repo, branch, &apigen.CommitParams{}, apigen.CommitJSONRequestBody{Message: "Additional content"})
require.NoError(t, err, "failed to commit additional content in merge auth test")
Expand Down
Loading

0 comments on commit bbe5f47

Please sign in to comment.