Skip to content

Commit

Permalink
mcap cli: Add support for azure and s3 remote files
Browse files Browse the repository at this point in the history
  • Loading branch information
RileyEv committed Aug 23, 2023
1 parent 2a1f9c6 commit 78b3da9
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 23 deletions.
2 changes: 1 addition & 1 deletion go/cli/mcap/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module github.com/foxglove/mcap/go/cli/mcap
go 1.18

require (
cloud.google.com/go/storage v1.23.0
gocloud.dev v0.33.0
github.com/fatih/color v1.13.0
github.com/foxglove/mcap/go/mcap v0.4.0
github.com/foxglove/mcap/go/ros v0.0.0-20230114025807-456e6a6ca1be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,25 @@ import (
"fmt"
"io"

"cloud.google.com/go/storage"
"gocloud.dev/blob"
)

type GCSReadSeekCloser struct {
type GoCloudReadSeekCloser struct {
size int64
object *storage.ObjectHandle
key string
ctx context.Context
offset int64
r io.ReadCloser
bucket *blob.Bucket
}

func (r *GCSReadSeekCloser) Read(p []byte) (int, error) {
func (r *GoCloudReadSeekCloser) Read(p []byte) (int, error) {
n, err := r.r.Read(p)
r.offset += int64(n)
return n, err
}

func (r *GCSReadSeekCloser) Seek(offset int64, whence int) (int64, error) {
func (r *GoCloudReadSeekCloser) Seek(offset int64, whence int) (int64, error) {
var seekTo int64
switch whence {
case io.SeekCurrent:
Expand All @@ -41,7 +42,7 @@ func (r *GCSReadSeekCloser) Seek(offset int64, whence int) (int64, error) {
if err != nil {
return 0, err
}
reader, err := r.object.NewRangeReader(r.ctx, seekTo, -1)
reader, err := r.bucket.NewRangeReader(r.ctx, r.key, seekTo, -1, nil)
if err != nil {
return 0, err
}
Expand All @@ -51,19 +52,21 @@ func (r *GCSReadSeekCloser) Seek(offset int64, whence int) (int64, error) {
return seekTo, nil
}

func (r *GCSReadSeekCloser) Close() error {
func (r *GoCloudReadSeekCloser) Close() error {
return r.r.Close()
}

func NewGCSReadSeekCloser(ctx context.Context, object *storage.ObjectHandle) (*GCSReadSeekCloser, error) {
r, err := object.NewReader(ctx)
func NewGoCloudReadSeekCloser(ctx context.Context, bucket *blob.Bucket, key string) (*GoCloudReadSeekCloser, error) {
r, err := bucket.NewReader(ctx, key, nil)
if err != nil {
return nil, err
}
return &GCSReadSeekCloser{
size: r.Attrs.Size,
object: object,

return &GoCloudReadSeekCloser{
size: r.Size(),
key: key,
r: r,
ctx: ctx,
bucket: bucket,
}, nil
}
60 changes: 50 additions & 10 deletions go/cli/mcap/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ import (
"os"
"regexp"

"cloud.google.com/go/storage"
"github.com/foxglove/mcap/go/mcap"
"github.com/olekukonko/tablewriter"
"gocloud.dev/blob"
_ "gocloud.dev/blob/azureblob"
_ "gocloud.dev/blob/gcsblob"
_ "gocloud.dev/blob/s3blob"
)

var (
Expand Down Expand Up @@ -49,16 +52,36 @@ func GetReader(ctx context.Context, filename string) (func() error, io.ReadSeekC
if scheme != "" {
switch scheme {
case "gs":
client, err := storage.NewClient(ctx)
bucketClient, err := blob.OpenBucket(ctx, fmt.Sprintf("gs://%v", bucket))
if err != nil {
return close, nil, fmt.Errorf("failed to create GCS client: %v", err)
return close, nil, err
}
close = client.Close
object := client.Bucket(bucket).Object(path)
rs, err = NewGCSReadSeekCloser(ctx, object)
close = bucketClient.Close
rs, err = NewGoCloudReadSeekCloser(ctx, bucketClient, path)
if err != nil {
return close, nil, fmt.Errorf("failed to build read seek closer: %w", err)
}
case "s3":
bucketClient, err := blob.OpenBucket(ctx, fmt.Sprintf("s3://%v?awssdk=v2", bucket))
close = bucketClient.Close
if err != nil {
return close, nil, err
}
rs, err = NewGoCloudReadSeekCloser(ctx, bucketClient, path)
if err != nil {
return close, nil, fmt.Errorf("failed to build read seek closer: %w", err)
}
case "azblob":
bucketClient, err := blob.OpenBucket(ctx, fmt.Sprintf("azblob://%v", bucket))
if err != nil {
return close, nil, err
}
close = bucketClient.Close
rs, err = NewGoCloudReadSeekCloser(ctx, bucketClient, path)
if err != nil {
return close, nil, fmt.Errorf("failed to build read seek closer: %w", err)
}

default:
return close, nil, fmt.Errorf("Unsupported remote file scheme: %s", scheme)
}
Expand All @@ -81,12 +104,29 @@ func WithReader(ctx context.Context, filename string, f func(remote bool, rs io.
remote = true
switch scheme {
case "gs":
client, err := storage.NewClient(ctx)
bucketClient, err := blob.OpenBucket(ctx, fmt.Sprintf("gs://%v", bucket))
if err != nil {
return err
}
rs, err = NewGoCloudReadSeekCloser(ctx, bucketClient, path)
if err != nil {
return fmt.Errorf("failed to build read seek closer: %w", err)
}
case "s3":
bucketClient, err := blob.OpenBucket(ctx, fmt.Sprintf("s3://%v?awssdk=v2", bucket))
if err != nil {
return err
}
rs, err = NewGoCloudReadSeekCloser(ctx, bucketClient, path)
if err != nil {
return fmt.Errorf("failed to build read seek closer: %w", err)
}
case "azblob":
bucketClient, err := blob.OpenBucket(ctx, fmt.Sprintf("azblob://%v", bucket))
if err != nil {
return fmt.Errorf("failed to create GCS client: %v", err)
return err
}
object := client.Bucket(bucket).Object(path)
rs, err = NewGCSReadSeekCloser(ctx, object)
rs, err = NewGoCloudReadSeekCloser(ctx, bucketClient, path)
if err != nil {
return fmt.Errorf("failed to build read seek closer: %w", err)
}
Expand Down

0 comments on commit 78b3da9

Please sign in to comment.