Skip to content

Commit

Permalink
extend storagedriver to add bulkdelete
Browse files Browse the repository at this point in the history
  • Loading branch information
sankalp-r committed Oct 9, 2024
1 parent a1a1e06 commit 0c8944f
Show file tree
Hide file tree
Showing 3 changed files with 270 additions and 3 deletions.
108 changes: 108 additions & 0 deletions registry/storage/driver/s3-aws/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -1314,6 +1314,30 @@ func (d *driver) doWalk(parentCtx context.Context, objectCount *int64, path, pre
return nil
}

// BulkDelete objects provided in the paths
func (d *driver) BulkDelete(ctx context.Context, path []string) (*s3.DeleteObjectsOutput, error) {
s := d.s3Client(ctx)
s3Objects := make([]*s3.ObjectIdentifier, 0, len(path))

for i := 0; i < len(path); i++ {
s3Objects = append(s3Objects, &s3.ObjectIdentifier{
Key: aws.String(d.s3Path(path[i])),
})
}

resp, err := s.DeleteObjects(&s3.DeleteObjectsInput{
Bucket: aws.String(d.Bucket),
Delete: &s3.Delete{
Objects: s3Objects,
Quiet: aws.Bool(false),
},
})
if err != nil {
return nil, err
}
return resp, nil
}

// directoryDiff finds all directories that are not in common between
// the previous and current paths in sorted order.
//
Expand Down Expand Up @@ -1705,3 +1729,87 @@ func (w *writer) flushPart() error {
w.pendingPart = nil
return nil
}

type DriverV2 struct {
storagedriver.StorageDriverV2
}

func NewDriverV2(params DriverParameters) (*DriverV2, error) {

s3obj := params.S3
if s3obj == nil {
if !params.V4Auth &&
(params.RegionEndpoint == "" ||
strings.Contains(params.RegionEndpoint, "s3.amazonaws.com")) {
return nil, fmt.Errorf("on Amazon S3 this storage driver can only be used with v4 authentication")
}

awsConfig := aws.NewConfig()

if params.AccessKey != "" && params.SecretKey != "" {
creds := credentials.NewStaticCredentials(
params.AccessKey,
params.SecretKey,
params.SessionToken,
)
awsConfig.WithCredentials(creds)
}

if params.RegionEndpoint != "" {
awsConfig.WithEndpoint(params.RegionEndpoint)
awsConfig.WithS3ForcePathStyle(params.ForcePathStyle)
}

awsConfig.WithS3UseAccelerate(params.Accelerate)
awsConfig.WithRegion(params.Region)
awsConfig.WithDisableSSL(!params.Secure)
if params.UseDualStack {
awsConfig.UseDualStackEndpoint = endpoints.DualStackEndpointStateEnabled
}

if params.SkipVerify {
httpTransport := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
awsConfig.WithHTTPClient(&http.Client{
Transport: httpTransport,
})
}

sess, err := session.NewSession(awsConfig)
if err != nil {
return nil, fmt.Errorf("failed to create new session with aws config: %v", err)
}

if params.UserAgent != "" {
sess.Handlers.Build.PushBack(request.MakeAddToUserAgentFreeFormHandler(params.UserAgent))
}

s3obj := s3.New(sess)

// enable S3 compatible signature v2 signing instead
if !params.V4Auth {
setv2Handlers(s3obj)
}
}

originalDriver := &driver{
S3: s3obj,
Bucket: params.Bucket,
ChunkSize: params.ChunkSize,
Encrypt: params.Encrypt,
KeyID: params.KeyID,
MultipartCopyChunkSize: params.MultipartCopyChunkSize,
MultipartCopyMaxConcurrency: params.MultipartCopyMaxConcurrency,
MultipartCopyThresholdSize: params.MultipartCopyThresholdSize,
MultipartCombineSmallPart: params.MultipartCombineSmallPart,
RootDirectory: params.RootDirectory,
StorageClass: params.StorageClass,
ObjectACL: params.ObjectACL,
LogS3APIRequests: params.LogS3APIRequests,
LogS3APIResponseHeaders: params.LogS3APIResponseHeaders,
}

// Return a new instance of DriverV2 that embeds the original driver
return &DriverV2{originalDriver}, nil
}
157 changes: 154 additions & 3 deletions registry/storage/driver/s3-aws/s3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,9 @@ import (
func Test(t *testing.T) { check.TestingT(t) }

var (
s3DriverConstructor func(rootDirectory, storageClass string) (*Driver, error)
skipS3 func() string
s3DriverConstructor func(rootDirectory, storageClass string) (*Driver, error)
skipS3 func() string
s3DriverV2Constructor func(rootDirectory, storageClass string) (*DriverV2, error)
)

func init() {
Expand Down Expand Up @@ -150,6 +151,99 @@ func init() {
return New(parameters)
}

s3DriverV2Constructor = func(rootDirectory, storageClass string) (*DriverV2, error) {
encryptBool := false
if encrypt != "" {
encryptBool, err = strconv.ParseBool(encrypt)
if err != nil {
return nil, err
}
}

secureBool := true
if secure != "" {
secureBool, err = strconv.ParseBool(secure)
if err != nil {
return nil, err
}
}

skipVerifyBool := false
if skipVerify != "" {
skipVerifyBool, err = strconv.ParseBool(skipVerify)
if err != nil {
return nil, err
}
}

v4Bool := true
if v4Auth != "" {
v4Bool, err = strconv.ParseBool(v4Auth)
if err != nil {
return nil, err
}
}
forcePathStyleBool := true
if forcePathStyle != "" {
forcePathStyleBool, err = strconv.ParseBool(forcePathStyle)
if err != nil {
return nil, err
}
}

useDualStackBool := false
if useDualStack != "" {
useDualStackBool, err = strconv.ParseBool(useDualStack)
}

multipartCombineSmallPart := true
if combineSmallPart != "" {
multipartCombineSmallPart, err = strconv.ParseBool(combineSmallPart)
if err != nil {
return nil, err
}
}

accelerateBool := true
if accelerate != "" {
accelerateBool, err = strconv.ParseBool(accelerate)
if err != nil {
return nil, err
}
}

parameters := DriverParameters{
nil,
accessKey,
secretKey,
bucket,
region,
regionEndpoint,
forcePathStyleBool,
encryptBool,
keyID,
secureBool,
skipVerifyBool,
v4Bool,
minChunkSize,
defaultMultipartCopyChunkSize,
defaultMultipartCopyMaxConcurrency,
defaultMultipartCopyThresholdSize,
multipartCombineSmallPart,
rootDirectory,
storageClass,
driverName + "-test",
objectACL,
sessionToken,
useDualStackBool,
accelerateBool,
false,
map[string]string{},
}

return NewDriverV2(parameters)
}

// Skip S3 storage driver tests if environment variable parameters are not provided
skipS3 = func() string {
if accessKey == "" || secretKey == "" || region == "" || bucket == "" || encrypt == "" {
Expand All @@ -159,7 +253,7 @@ func init() {
}

testsuites.RegisterSuite(func() (storagedriver.StorageDriver, error) {
return s3DriverConstructor(root, s3.StorageClassStandard)
return s3DriverV2Constructor(root, s3.StorageClassStandard)
}, skipS3)
}

Expand Down Expand Up @@ -825,6 +919,63 @@ func TestListObjectsV2(t *testing.T) {
}
}

func TestBulkDelete(t *testing.T) {
if skipS3() != "" {
t.Skip(skipS3())
}

rootDir := t.TempDir()

drvr, err := s3DriverV2Constructor(rootDir, s3.StorageClassStandard)
if err != nil {
t.Fatalf("unexpected error creating driver with standard storage: %v", err)
}

files := []string{
"file1",
"file2",
"file3",
"file4",
}

for _, file := range files {
err := drvr.PutContent(context.Background(), file, []byte("content "+file))
if err != nil {
fmt.Printf("unable to init file %s: %s\n", file, err)
continue
}
}

testCases := []struct {
desc string
deleteFiles []string
expectedDeletedFiles []string
expectedDeleteErrorFiles []string
}{
{
desc: "test-case-1",
deleteFiles: []string{"file1", "file2"},
expectedDeletedFiles: []string{"file1", "file2"},
expectedDeleteErrorFiles: nil,
},
}

for _, testCase := range testCases {
t.Run(testCase.desc, func(t *testing.T) {

res, err := drvr.BulkDelete(context.Background(), testCase.deleteFiles)
if err != nil {
t.Fatalf("unexpected error deleting: %v", err)
}

if !reflect.DeepEqual(res.Deleted, testCase.expectedDeletedFiles) {
t.Fatalf("unexpected bulk delete result")
}
})
}

}

func compareWalked(t *testing.T, expected, walked []string) {
if len(walked) != len(expected) {
t.Fatalf("Mismatch number of fileInfo walked %d expected %d; walked %s; expected %s;", len(walked), len(expected), walked, expected)
Expand Down
8 changes: 8 additions & 0 deletions registry/storage/driver/storagedriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"regexp"
"strconv"
"strings"

"github.com/aws/aws-sdk-go/service/s3"
)

// Version is a string representing the storage driver version, of the form
Expand Down Expand Up @@ -212,3 +214,9 @@ func (e Errors) Error() string {
return msg
}
}

// StorageDriverV2 extends StorageDriver
type StorageDriverV2 interface {
StorageDriver
BulkDelete(ctx context.Context, path []string) (*s3.DeleteObjectsOutput, error)
}

0 comments on commit 0c8944f

Please sign in to comment.