Skip to content

Commit

Permalink
Storage byte range support (#49)
Browse files Browse the repository at this point in the history
* Add response metadata

* Add byte range support for s3 client

* Add byte range support for cloudfront

* Add byte range support for webfolder

* Add GetPartialObject method on storage

* Remove GetPartialObject implementation from webfolder

* Make cloudfront.GetPartialObject consistent with s3.GetPartialObject

* Rename GetPartialObject method to GetPartially
  • Loading branch information
anggiaj authored Mar 3, 2020
1 parent f7ad58d commit c910e75
Show file tree
Hide file tree
Showing 15 changed files with 363 additions and 44 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@ node_modules/
coverage.json
config.env
profile.cov

vendor
5 changes: 5 additions & 0 deletions internal/handler/image_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,8 @@ func (m *mockStorage) Get(ctx context.Context, path string) storage.IResponse {
args := m.Called(ctx, path)
return storage.NewResponse(args[0].([]byte), args.Int(1), args.Error(2))
}

func (m *mockStorage) GetPartially(ctx context.Context, path string, opt *storage.GetPartiallyRequestOptions) storage.IResponse {
args := m.Called(ctx, path, opt)
return storage.NewResponse(args[0].([]byte), args.Int(1), args.Error(2)).WithMetadata(args[3].(*storage.ResponseMetadata))
}
4 changes: 4 additions & 0 deletions pkg/router/routes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,3 +41,7 @@ type mockStorage struct {
func (m *mockStorage) Get(ctx context.Context, path string) storage.IResponse {
return storage.NewResponse([]byte(nil), http.StatusOK, nil)
}

func (m *mockStorage) GetPartially(ctx context.Context, path string, metadata *storage.GetPartiallyRequestOptions) storage.IResponse {
return storage.NewResponse([]byte(nil), http.StatusOK, nil).WithMetadata(nil)
}
52 changes: 44 additions & 8 deletions pkg/storage/aws/cloudfront/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,40 @@ type Storage struct {
// This method figures out how to get the data from the cloudfront storage backend.
func (s *Storage) Get(ctx context.Context, path string) storage.IResponse {
res, err := s.client.Get(s.getURL(path), nil)
if err != nil {
if res != nil {
return storage.NewResponse([]byte(nil), res.StatusCode, err)
}
return storage.NewResponse([]byte(nil), http.StatusUnprocessableEntity, err)
if errRes, ok := s.hasError(res, err); ok {
return errRes
}
if res.StatusCode == http.StatusForbidden {
return storage.NewResponse([]byte(nil), res.StatusCode, errors.New("forbidden"))
body, _ := ioutil.ReadAll(res.Body)
return storage.NewResponse(body, res.StatusCode, nil)
}

// GetPartially takes in the Context, path and opt as an argument and returns an IResponse interface implementation.
// This method figures out how to get partial data from the cloudfront storage backend.
func (s *Storage) GetPartially(ctx context.Context, path string, opt *storage.GetPartiallyRequestOptions) storage.IResponse {
var h http.Header
if opt != nil && opt.Range != "" {
h = http.Header{}
h.Add(storage.HeaderRange, opt.Range)
} else {
return s.Get(ctx, path)
}

res, err := s.client.Get(s.getURL(path), h)
if errRes, ok := s.hasError(res, err); ok {
return errRes
}

body, _ := ioutil.ReadAll(res.Body)
return storage.NewResponse([]byte(body), res.StatusCode, nil)
return storage.
NewResponse(body, res.StatusCode, nil).
WithMetadata(&storage.ResponseMetadata{
AcceptRanges: res.Header.Get(storage.HeaderAcceptRanges),
ContentLength: res.Header.Get(storage.HeaderContentLength),
ContentRange: res.Header.Get(storage.HeaderContentRange),
ContentType: res.Header.Get(storage.HeaderContentType),
ETag: res.Header.Get(storage.HeaderETag),
LastModified: res.Header.Get(storage.HeaderLastModified),
})
}

func (s *Storage) getURL(path string) string {
Expand All @@ -52,6 +75,19 @@ func (s *Storage) getProtocol() string {
return "http"
}

func (s *Storage) hasError(res *http.Response, err error) (storage.IResponse, bool) {
if err != nil {
if res != nil {
return storage.NewResponse([]byte(nil), res.StatusCode, err), true
}
return storage.NewResponse([]byte(nil), http.StatusUnprocessableEntity, err), true
}
if res.StatusCode == http.StatusForbidden {
return storage.NewResponse([]byte(nil), res.StatusCode, errors.New("forbidden")), true
}
return nil, false
}

// NewStorage returns a new cloudfront.Storage instance
func NewStorage(opts ...Option) *Storage {
s := Storage{}
Expand Down
62 changes: 58 additions & 4 deletions pkg/storage/aws/cloudfront/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,23 @@ import (
"bytes"
"context"
"fmt"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"io"
"io/ioutil"
"net/http"
"testing"

"github.com/gojek/darkroom/pkg/storage"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
)

const (
validHost = "cloudfront.net"
validPath = "/path/to/valid-file"
invalidPath = "/path/to/invalid-file"
validRange = "bytes=100-200"
)

type StorageTestSuite struct {
Expand Down Expand Up @@ -94,6 +97,57 @@ func (s *StorageTestSuite) TestStorage_GetSuccessResponse() {
assert.Equal(s.T(), []byte("response body"), res.Data())
}

func (s *StorageTestSuite) TestStorage_GetPartialObjectSuccessResponse() {
metadata := storage.ResponseMetadata{
AcceptRanges: "bytes",
ContentLength: "1024",
ContentType: "image/png",
ContentRange: "bytes 100-200/1024",
ETag: "32705ce195789d7bf07f3d44783c2988",
LastModified: "Wed, 21 Oct 2015 07:28:00 GMT",
}

reqHeader := http.Header{}
reqHeader.Add(storage.HeaderRange, validRange)

respHeader := http.Header{}
respHeader.Add(storage.HeaderAcceptRanges, metadata.AcceptRanges)
respHeader.Add(storage.HeaderContentLength, metadata.ContentLength)
respHeader.Add(storage.HeaderContentType, metadata.ContentType)
respHeader.Add(storage.HeaderContentRange, metadata.ContentRange)
respHeader.Add(storage.HeaderETag, metadata.ETag)
respHeader.Add(storage.HeaderLastModified, metadata.LastModified)

s.client.On("Get", fmt.Sprintf("%s://%s%s", s.storage.getProtocol(), validHost, validPath), reqHeader).
Return(&http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(bytes.NewReader([]byte("response body"))),
Header: respHeader,
}, nil)

opt := storage.GetPartiallyRequestOptions{Range: validRange}
res := s.storage.GetPartially(context.TODO(), validPath, &opt)

assert.Nil(s.T(), res.Error())
assert.Equal(s.T(), http.StatusOK, res.Status())
assert.Equal(s.T(), []byte("response body"), res.Data())
assert.Equal(s.T(), &metadata, res.Metadata())
}

func (s *StorageTestSuite) TestStorage_GetPartialObjectSuccessResponse_WhenRangeNotProvided() {
s.client.On("Get", fmt.Sprintf("%s://%s%s", s.storage.getProtocol(), validHost, validPath), http.Header(nil)).
Return(&http.Response{
StatusCode: http.StatusOK,
Body: ioutil.NopCloser(bytes.NewReader([]byte("response body"))),
}, nil)
res := s.storage.GetPartially(context.TODO(), validPath, nil)

assert.Nil(s.T(), res.Error())
assert.Equal(s.T(), http.StatusOK, res.Status())
assert.Equal(s.T(), []byte("response body"), res.Data())
assert.Nil(s.T(), res.Metadata())
}

func (s *StorageTestSuite) TestStorage_getURL() {
cases := []struct {
secureProtocol bool
Expand Down
84 changes: 77 additions & 7 deletions pkg/storage/aws/s3/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@ package s3

import (
"context"
"fmt"
"io/ioutil"
"net/http"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
"github.com/aws/aws-sdk-go/service/s3/s3iface"
"github.com/aws/aws-sdk-go/service/s3/s3manager"
"github.com/aws/aws-sdk-go/service/s3/s3manager/s3manageriface"
"github.com/gojek/darkroom/pkg/storage"
Expand All @@ -17,21 +22,23 @@ type Storage struct {
bucketRegion string
accessKey string
secretKey string
service s3iface.S3API
hystrixCmd storage.HystrixCommand
downloader s3manageriface.DownloaderAPI
}

// Get takes in the Context and path as an argument and returns an IResponse interface implementation.
// This method figures out how to get the data from the S3 storage backend.
func (s *Storage) Get(ctx context.Context, path string) storage.IResponse {
buff := &aws.WriteAtBuffer{}
input := s3.GetObjectInput{
Bucket: aws.String(s.bucketName),
Key: aws.String(path),
}

buff := &aws.WriteAtBuffer{}
responseChannel := make(chan error, 1)
makeNetworkCall(s.hystrixCmd.Name, s.hystrixCmd.Config, func() error {
_, err := s.downloader.Download(buff, &s3.GetObjectInput{
Bucket: aws.String(s.bucketName),
Key: aws.String(path),
})
_, err := s.downloader.Download(buff, &input)
responseChannel <- err
return err
}, func(e error) error {
Expand All @@ -40,7 +47,69 @@ func (s *Storage) Get(ctx context.Context, path string) storage.IResponse {
})
s3Err := <-responseChannel

return storage.NewResponse([]byte(buff.Bytes()), getStatusCodeFromError(s3Err), s3Err)
return storage.NewResponse(buff.Bytes(), getStatusCodeFromError(s3Err, nil), s3Err)
}

// GetPartially takes in the Context, path and opt as an argument and returns an IResponse interface implementation.
// This method figures out how to get partial data from the S3 storage backend.
func (s *Storage) GetPartially(ctx context.Context, path string, opt *storage.GetPartiallyRequestOptions) storage.IResponse {
if opt == nil || len(opt.Range) == 0 {
return s.Get(ctx, path)
}

input := s3.GetObjectInput{
Bucket: aws.String(s.bucketName),
Key: aws.String(path),
Range: &opt.Range,
}
type getObjectResponse struct {
output *s3.GetObjectOutput
err error
}
responseChannel := make(chan getObjectResponse, 1)
makeNetworkCall(s.hystrixCmd.Name, s.hystrixCmd.Config, func() error {
resp, err := s.service.GetObject(&input)
responseChannel <- getObjectResponse{
output: resp,
err: err,
}
return err
}, func(e error) error {
responseChannel <- getObjectResponse{
err: e,
}
return e
})

s3Resp := <-responseChannel

var metadata *storage.ResponseMetadata
var body []byte
var status int
if s3Resp.err == nil {
metadata = s.newMetadata(*s3Resp.output)
body, _ = ioutil.ReadAll(s3Resp.output.Body)
status = http.StatusPartialContent
}

return storage.
NewResponse(body, getStatusCodeFromError(s3Resp.err, &status), s3Resp.err).
WithMetadata(metadata)
}

func (s *Storage) newMetadata(output s3.GetObjectOutput) *storage.ResponseMetadata {
metadata := storage.ResponseMetadata{
AcceptRanges: aws.StringValue(output.AcceptRanges),
ContentLength: fmt.Sprintf("%d", aws.Int64Value(output.ContentLength)),
ContentRange: aws.StringValue(output.ContentRange),
ContentType: aws.StringValue(output.ContentType),
ETag: aws.StringValue(output.ETag),
}

if output.LastModified != nil {
metadata.LastModified = aws.TimeValue(output.LastModified).Format(http.TimeFormat)
}
return &metadata
}

// NewStorage returns a new s3.Storage instance
Expand All @@ -53,6 +122,7 @@ func NewStorage(opts ...Option) *Storage {
credentials.NewStaticCredentials(s.accessKey, s.secretKey, ""),
)
ssn, _ := session.NewSession(cfg)
s.downloader = s3manager.NewDownloaderWithClient(s3.New(ssn))
s.service = s3.New(ssn)
s.downloader = s3manager.NewDownloaderWithClient(s.service)
return &s
}
Loading

0 comments on commit c910e75

Please sign in to comment.