Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove lakectl direct deprecated functionality #6623

Merged
merged 4 commits into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions .github/workflows/esti.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -782,7 +782,6 @@ jobs:
LAKEFS_BLOCKSTORE_S3_CREDENTIALS_SECRET_ACCESS_KEY: ${{ secrets.ESTI_AWS_SECRET_ACCESS_KEY }}
LAKEFS_DATABASE_TYPE: dynamodb
DOCKER_REG: ${{ steps.login-ecr.outputs.registry }}
ESTI_TEST_DATA_ACCESS: true,false
ESTI_BLOCKSTORE_TYPE: s3
ESTI_STORAGE_NAMESPACE: s3://esti-system-testing/${{ github.run_number }}/${{ steps.unique.outputs.value }}
ESTI_AWS_ACCESS_KEY_ID: ${{ secrets.ESTI_AWS_ACCESS_KEY_ID }}
Expand Down Expand Up @@ -838,7 +837,6 @@ jobs:
LAKEFS_BLOCKSTORE_S3_CREDENTIALS_SECRET_ACCESS_KEY: ${{ secrets.ESTI_AWS_SECRET_ACCESS_KEY }}
LAKEFS_DATABASE_TYPE: postgres
DOCKER_REG: ${{ steps.login-ecr.outputs.registry }}
ESTI_TEST_DATA_ACCESS: true,false
ESTI_BLOCKSTORE_TYPE: s3
ESTI_STORAGE_NAMESPACE: s3://esti-system-testing/${{ github.run_number }}/${{ steps.unique.outputs.value }}
ESTI_AWS_ACCESS_KEY_ID: ${{ secrets.ESTI_AWS_ACCESS_KEY_ID }}
Expand Down
20 changes: 0 additions & 20 deletions cmd/lakectl/cmd/fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,8 @@ import (
"github.com/spf13/cobra"
)

type transportMethod int

const (
transportMethodDefault = iota
transportMethodDirect
transportMethodPreSign
)

var ErrRequestFailed = errors.New("request failed")

func transportMethodFromFlags(direct bool, preSign bool) transportMethod {
switch {
case direct && preSign:
Die("Can't enable both direct and pre-sign", 1)
case direct:
return transportMethodDirect
case preSign:
return transportMethodPreSign
}
return transportMethodDefault
}

// fsCmd represents the fs command
var fsCmd = &cobra.Command{
Use: "fs",
Expand Down
27 changes: 7 additions & 20 deletions cmd/lakectl/cmd/fs_cat.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/go-openapi/swag"
"github.com/spf13/cobra"
"github.com/treeverse/lakefs/pkg/api/apigen"
"github.com/treeverse/lakefs/pkg/api/helpers"
)

var fsCatCmd = &cobra.Command{
Expand All @@ -19,25 +18,18 @@ var fsCatCmd = &cobra.Command{
Run: func(cmd *cobra.Command, args []string) {
pathURI := MustParsePathURI("path", args[0])
flagSet := cmd.Flags()
direct := Must(flagSet.GetBool("direct"))
preSignMode := Must(flagSet.GetBool("pre-sign"))
transport := transportMethodFromFlags(direct, preSignMode)

var err error
var body io.ReadCloser
client := getClient()
if transport == transportMethodDirect {
_, body, err = helpers.ClientDownload(cmd.Context(), client, pathURI.Repository, pathURI.Ref, *pathURI.Path)
} else {
preSign := swag.Bool(transport == transportMethodPreSign)
var resp *http.Response
resp, err = client.GetObject(cmd.Context(), pathURI.Repository, pathURI.Ref, &apigen.GetObjectParams{
Path: *pathURI.Path,
Presign: preSign,
})
DieOnHTTPError(resp)
body = resp.Body
}
var resp *http.Response
resp, err = client.GetObject(cmd.Context(), pathURI.Repository, pathURI.Ref, &apigen.GetObjectParams{
Path: *pathURI.Path,
Presign: swag.Bool(preSignMode),
})
DieOnHTTPError(resp)
body = resp.Body
if err != nil {
DieErr(err)
}
Expand All @@ -56,11 +48,6 @@ var fsCatCmd = &cobra.Command{

//nolint:gochecknoinits
func init() {
fsCatCmd.Flags().BoolP("direct", "d", false, "read directly from backing store (faster but requires more credentials)")
err := fsCatCmd.Flags().MarkDeprecated("direct", "use --pre-sign instead")
if err != nil {
DieErr(err)
}
fsCatCmd.Flags().Bool("pre-sign", false, "Use pre-sign link to access the data")

fsCmd.AddCommand(fsCatCmd)
Expand Down
28 changes: 5 additions & 23 deletions cmd/lakectl/cmd/fs_download.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/spf13/cobra"
"github.com/treeverse/lakefs/pkg/api/apigen"
"github.com/treeverse/lakefs/pkg/api/apiutil"
"github.com/treeverse/lakefs/pkg/api/helpers"
"github.com/treeverse/lakefs/pkg/uri"
)

Expand All @@ -33,11 +32,9 @@ var fsDownloadCmd = &cobra.Command{
Run: func(cmd *cobra.Command, args []string) {
pathURI := MustParsePathURI("path", args[0])
flagSet := cmd.Flags()
direct := Must(flagSet.GetBool("direct"))
preSignMode := Must(flagSet.GetBool("pre-sign"))
recursive := Must(flagSet.GetBool("recursive"))
parallel := Must(flagSet.GetInt("parallel"))
transport := transportMethodFromFlags(direct, preSignMode)

if parallel < 1 {
DieFmt("Invalid value for parallel (%d), minimum is 1.\n", parallel)
Expand Down Expand Up @@ -92,7 +89,7 @@ var fsDownloadCmd = &cobra.Command{
}
// destination is without the source URI
dst := filepath.Join(dest, strings.TrimPrefix(downloadPath, prefix))
err := downloadHelper(ctx, client, transport, src, dst)
err := downloadHelper(ctx, client, preSignMode, src, dst)
if err == nil {
fmt.Printf("Successfully downloaded %s to %s\n", src.String(), dst)
} else {
Expand Down Expand Up @@ -133,8 +130,8 @@ func listRecursiveHelper(ctx context.Context, client *apigen.ClientWithResponses
}
}

func downloadHelper(ctx context.Context, client *apigen.ClientWithResponses, method transportMethod, src uri.URI, dst string) error {
body, err := getObjectHelper(ctx, client, method, src)
func downloadHelper(ctx context.Context, client *apigen.ClientWithResponses, preSign bool, src uri.URI, dst string) error {
body, err := getObjectHelper(ctx, client, preSign, src)
if err != nil {
return err
}
Expand All @@ -154,21 +151,11 @@ func downloadHelper(ctx context.Context, client *apigen.ClientWithResponses, met
return err
}

func getObjectHelper(ctx context.Context, client *apigen.ClientWithResponses, method transportMethod, src uri.URI) (io.ReadCloser, error) {
if method == transportMethodDirect {
// download directly from storage
_, body, err := helpers.ClientDownload(ctx, client, src.Repository, src.Ref, *src.Path)
if err != nil {
return nil, err
}
return body, nil
}

func getObjectHelper(ctx context.Context, client *apigen.ClientWithResponses, preSign bool, src uri.URI) (io.ReadCloser, error) {
// download from lakefs
preSign := swag.Bool(method == transportMethodPreSign)
resp, err := client.GetObject(ctx, src.Repository, src.Ref, &apigen.GetObjectParams{
Path: *src.Path,
Presign: preSign,
Presign: swag.Bool(preSign),
})
if err != nil {
return nil, err
Expand All @@ -182,11 +169,6 @@ func getObjectHelper(ctx context.Context, client *apigen.ClientWithResponses, me

//nolint:gochecknoinits
func init() {
fsDownloadCmd.Flags().BoolP("direct", "d", false, "read directly from backing store (requires credentials)")
err := fsDownloadCmd.Flags().MarkDeprecated("direct", "use --pre-sign instead")
if err != nil {
DieErr(err)
}
fsDownloadCmd.Flags().BoolP("recursive", "r", false, "recursively all objects under path")
fsDownloadCmd.Flags().IntP("parallel", "p", fsDownloadParallelDefault, "max concurrent downloads")
fsDownloadCmd.Flags().Bool("pre-sign", false, "Request pre-sign link to access the data")
Expand Down
23 changes: 5 additions & 18 deletions cmd/lakectl/cmd/fs_upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,15 @@ var fsUploadCmd = &cobra.Command{
flagSet := cmd.Flags()
source := Must(flagSet.GetString("source"))
recursive := Must(flagSet.GetBool("recursive"))
direct := Must(flagSet.GetBool("direct"))
preSignMode := Must(flagSet.GetBool("pre-sign"))
contentType := Must(flagSet.GetString("content-type"))

ctx := cmd.Context()
transport := transportMethodFromFlags(direct, preSignMode)
if !recursive {
if pathURI.GetPath() == "" {
Die("target path is not a valid URI", 1)
}
stat, err := upload(ctx, client, source, pathURI, contentType, transport)
stat, err := upload(ctx, client, source, pathURI, contentType, preSignMode)
if err != nil {
DieErr(err)
}
Expand All @@ -64,7 +62,7 @@ var fsUploadCmd = &cobra.Command{
uri := *pathURI
p := filepath.ToSlash(filepath.Join(*uri.Path, relPath))
uri.Path = &p
stat, err := upload(ctx, client, path, &uri, contentType, transport)
stat, err := upload(ctx, client, path, &uri, contentType, preSignMode)
if err != nil {
return fmt.Errorf("upload %s: %w", path, err)
}
Expand All @@ -81,33 +79,22 @@ var fsUploadCmd = &cobra.Command{
},
}

func upload(ctx context.Context, client apigen.ClientWithResponsesInterface, sourcePathname string, destURI *uri.URI, contentType string, method transportMethod) (*apigen.ObjectStats, error) {
func upload(ctx context.Context, client apigen.ClientWithResponsesInterface, sourcePathname string, destURI *uri.URI, contentType string, preSign bool) (*apigen.ObjectStats, error) {
fp := Must(OpenByPath(sourcePathname))
defer func() {
_ = fp.Close()
}()
objectPath := apiutil.Value(destURI.Path)
switch method {
case transportMethodDefault:
return helpers.ClientUpload(ctx, client, destURI.Repository, destURI.Ref, objectPath, nil, contentType, fp)
case transportMethodDirect:
return helpers.ClientUploadDirect(ctx, client, destURI.Repository, destURI.Ref, objectPath, nil, contentType, fp)
case transportMethodPreSign:
if preSign {
return helpers.ClientUploadPreSign(ctx, client, destURI.Repository, destURI.Ref, objectPath, nil, contentType, fp)
default:
panic("unsupported upload method")
}
return helpers.ClientUpload(ctx, client, destURI.Repository, destURI.Ref, objectPath, nil, contentType, fp)
}

//nolint:gochecknoinits
func init() {
fsUploadCmd.Flags().StringP("source", "s", "", "local file to upload, or \"-\" for stdin")
fsUploadCmd.Flags().BoolP("recursive", "r", false, "recursively copy all files under local source")
fsUploadCmd.Flags().BoolP("direct", "d", false, "write directly to backing store (faster but requires more credentials)")
err := fsUploadCmd.Flags().MarkDeprecated("direct", "use --pre-sign instead")
if err != nil {
DieErr(err)
}
_ = fsUploadCmd.MarkFlagRequired("source")
fsUploadCmd.Flags().StringP("content-type", "", "", "MIME type of contents")
fsUploadCmd.Flags().Bool("pre-sign", false, "Use pre-sign link to access the data")
Expand Down
2 changes: 1 addition & 1 deletion docs/howto/deploy/aws.md
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ Checkout Nginx [documentation](https://kubernetes.github.io/ingress-nginx/user-g
to use lakeFS to upload or download objects. Specifically you won't be able to:
* Upload objects using the lakeFS GUI (**Works with presign mode**)
* Upload objects through Spark using the S3 gateway
* Run `lakectl fs` commands (unless using the `--direct` flag or using **presign mode** with `--pre-sign` flag)
* Run `lakectl fs` commands (unless using **presign mode** with `--pre-sign` flag)
* Use [Actions and Hooks](/howto/hooks/)

```json
Expand Down
9 changes: 5 additions & 4 deletions pkg/api/helpers/adapters.go → esti/adapters_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package helpers
package esti
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Do we use all the functions defined here only in Esti?
  2. I'm not sure about the new file name, it says adapter_test but there are no tests

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The helper package had adapters that include upload direct.
All the tests that used direct were removed. Except the GC test that I wasn't sure if we like to remove this case - until we will enable different way, like presign.
I've moved the code into the system test (esti) so I've kept the name and added _test as this code should be built only when running test.


import (
"context"
Expand All @@ -12,6 +12,7 @@ import (
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go/aws"
"github.com/treeverse/lakefs/pkg/api/helpers"
)

// ObjectStats metadata of an object stored on a backing store.
Expand Down Expand Up @@ -42,7 +43,7 @@ type AdapterFactory map[string]func() (ClientAdapter, error)
func NewAdapter(protocol string) (ClientAdapter, error) {
factory, ok := adapterFactory[protocol]
if !ok {
return nil, ErrUnsupportedProtocol
return nil, helpers.ErrUnsupportedProtocol
}
return factory()
}
Expand All @@ -67,7 +68,7 @@ func newS3Adapter() (ClientAdapter, error) {

func (s *s3Adapter) Upload(ctx context.Context, physicalAddress *url.URL, contents io.ReadSeeker) (ObjectStats, error) {
if physicalAddress.Scheme != s3Scheme {
return ObjectStats{}, fmt.Errorf("%s: %w", s3Scheme, ErrUnsupportedProtocol)
return ObjectStats{}, fmt.Errorf("%s: %w", s3Scheme, helpers.ErrUnsupportedProtocol)
}

key := strings.TrimPrefix(physicalAddress.Path, "/")
Expand All @@ -94,7 +95,7 @@ func (s *s3Adapter) Upload(ctx context.Context, physicalAddress *url.URL, conten

func (s *s3Adapter) Download(ctx context.Context, physicalAddress *url.URL) (io.ReadCloser, error) {
if physicalAddress.Scheme != s3Scheme {
return nil, fmt.Errorf("%s: %w", s3Scheme, ErrUnsupportedProtocol)
return nil, fmt.Errorf("%s: %w", s3Scheme, helpers.ErrUnsupportedProtocol)
}
// TODO(ariels): Allow customization of request
bucket := physicalAddress.Hostname()
Expand Down
6 changes: 3 additions & 3 deletions esti/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func TestAdminPermissions(t *testing.T) {
ctx, _, repo := setupTest(t)
defer tearDownTest(repo)

// creating new group should succeed
// creating a new group should succeed
const gid = "TestGroup"
resCreateGroup, err := client.CreateGroupWithResponse(ctx, apigen.CreateGroupJSONRequestBody{
Id: gid,
Expand Down Expand Up @@ -210,13 +210,13 @@ func newClientFromGroup(t *testing.T, context context.Context, logger logging.Lo

// Tests merge with different clients
func mergeAuthTest(t *testing.T, cli *apigen.ClientWithResponses, ctx context.Context, repo string, branch string) (*apigen.MergeIntoBranchResponse, error) {
uploadFileRandomData(ctx, t, repo, mainBranch, "README", false)
uploadFileRandomData(ctx, t, repo, mainBranch, "README")

resMainCommit, err := cli.CommitWithResponse(ctx, repo, mainBranch, &apigen.CommitParams{}, apigen.CommitJSONRequestBody{Message: "Initial content"})
require.NoError(t, err, "failed to commit initial content in merge auth test")
require.Equal(t, http.StatusCreated, resMainCommit.StatusCode())

uploadFileRandomData(ctx, t, repo, branch, "foo.txt", false)
uploadFileRandomData(ctx, t, repo, branch, "foo.txt")

resBranchCommit, err := cli.CommitWithResponse(ctx, repo, branch, &apigen.CommitParams{}, apigen.CommitJSONRequestBody{Message: "Additional content"})
require.NoError(t, err, "failed to commit additional content in merge auth test")
Expand Down
Loading