Skip to content

Commit

Permalink
Merge pull request #140 from milinddethe15/support-custom-roundtripper
Browse files Browse the repository at this point in the history
Add support for custom roundtripper
  • Loading branch information
GiedriusS authored Oct 3, 2024
2 parents 075bf61 + 7f505a2 commit 8897e65
Show file tree
Hide file tree
Showing 23 changed files with 303 additions and 94 deletions.
17 changes: 9 additions & 8 deletions client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package client
import (
"context"
"fmt"
"net/http"
"strings"

"github.com/thanos-io/objstore"
Expand Down Expand Up @@ -49,7 +50,7 @@ type BucketConfig struct {

// NewBucket initializes and returns new object storage clients.
// NOTE: confContentYaml can contain secrets.
func NewBucket(logger log.Logger, confContentYaml []byte, component string) (objstore.Bucket, error) {
func NewBucket(logger log.Logger, confContentYaml []byte, component string, rt http.RoundTripper) (objstore.Bucket, error) {
level.Info(logger).Log("msg", "loading bucket configuration")
bucketConf := &BucketConfig{}
if err := yaml.UnmarshalStrict(confContentYaml, bucketConf); err != nil {
Expand All @@ -64,23 +65,23 @@ func NewBucket(logger log.Logger, confContentYaml []byte, component string) (obj
var bucket objstore.Bucket
switch strings.ToUpper(string(bucketConf.Type)) {
case string(GCS):
bucket, err = gcs.NewBucket(context.Background(), logger, config, component)
bucket, err = gcs.NewBucket(context.Background(), logger, config, component, rt)
case string(S3):
bucket, err = s3.NewBucket(logger, config, component)
bucket, err = s3.NewBucket(logger, config, component, rt)
case string(AZURE):
bucket, err = azure.NewBucket(logger, config, component)
bucket, err = azure.NewBucket(logger, config, component, rt)
case string(SWIFT):
bucket, err = swift.NewContainer(logger, config)
bucket, err = swift.NewContainer(logger, config, rt)
case string(COS):
bucket, err = cos.NewBucket(logger, config, component)
bucket, err = cos.NewBucket(logger, config, component, rt)
case string(ALIYUNOSS):
bucket, err = oss.NewBucket(logger, config, component)
bucket, err = oss.NewBucket(logger, config, component, rt)
case string(FILESYSTEM):
bucket, err = filesystem.NewBucketFromConfig(config)
case string(BOS):
bucket, err = bos.NewBucket(logger, config, component)
case string(OCI):
bucket, err = oci.NewBucket(logger, config)
bucket, err = oci.NewBucket(logger, config, rt)
case string(OBS):
bucket, err = obs.NewBucket(logger, config)
default:
Expand Down
6 changes: 3 additions & 3 deletions client/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func ExampleBucket() {
}

// Create a new bucket.
bucket, err := NewBucket(log.NewNopLogger(), confContentYaml, "example")
bucket, err := NewBucket(log.NewNopLogger(), confContentYaml, "example", nil)
if err != nil {
panic(err)
}
Expand All @@ -46,7 +46,7 @@ func ExampleTracingBucketUsingOpenTracing() { //nolint:govet
}

// Create a new bucket.
bucket, err := NewBucket(log.NewNopLogger(), confContentYaml, "example")
bucket, err := NewBucket(log.NewNopLogger(), confContentYaml, "example", nil)
if err != nil {
panic(err)
}
Expand All @@ -72,7 +72,7 @@ func ExampleTracingBucketUsingOpenTelemetry() { //nolint:govet
}

// Create a new bucket.
bucket, err := NewBucket(log.NewNopLogger(), confContentYaml, "example")
bucket, err := NewBucket(log.NewNopLogger(), confContentYaml, "example", nil)
if err != nil {
panic(err)
}
Expand Down
12 changes: 12 additions & 0 deletions errutil/rt_error.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package errutil

import "net/http"

// ErrorRoundTripper is a custom RoundTripper that always returns an error.
type ErrorRoundTripper struct {
Err error
}

func (ert *ErrorRoundTripper) RoundTrip(*http.Request) (*http.Response, error) {
return nil, ert.Err
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ require (
github.com/prometheus/procfs v0.11.1 // indirect
github.com/rs/xid v1.5.0 // indirect
github.com/sony/gobreaker v0.5.0 // indirect
github.com/stretchr/objx v0.5.2 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,9 @@ github.com/sony/gobreaker v0.5.0 h1:dRCvqm0P490vZPmy7ppEk2qCnCieBooFJ+YoXGYB+yg=
github.com/sony/gobreaker v0.5.0/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY=
github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
Expand Down
12 changes: 8 additions & 4 deletions providers/azure/azure.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package azure
import (
"context"
"io"
"net/http"
"os"
"strings"
"testing"
Expand Down Expand Up @@ -145,7 +146,7 @@ type Bucket struct {
}

// NewBucket returns a new Bucket using the provided Azure config.
func NewBucket(logger log.Logger, azureConfig []byte, component string) (*Bucket, error) {
func NewBucket(logger log.Logger, azureConfig []byte, component string, rt http.RoundTripper) (*Bucket, error) {
level.Debug(logger).Log("msg", "creating new Azure bucket connection", "component", component)
conf, err := parseConfig(azureConfig)
if err != nil {
Expand All @@ -154,11 +155,14 @@ func NewBucket(logger log.Logger, azureConfig []byte, component string) (*Bucket
if conf.MSIResource != "" {
level.Warn(logger).Log("msg", "The field msi_resource has been deprecated and should no longer be set")
}
return NewBucketWithConfig(logger, conf, component)
return NewBucketWithConfig(logger, conf, component, rt)
}

// NewBucketWithConfig returns a new Bucket using the provided Azure config struct.
func NewBucketWithConfig(logger log.Logger, conf Config, component string) (*Bucket, error) {
func NewBucketWithConfig(logger log.Logger, conf Config, component string, rt http.RoundTripper) (*Bucket, error) {
if rt != nil {
conf.HTTPConfig.Transport = rt
}
if err := conf.validate(); err != nil {
return nil, err
}
Expand Down Expand Up @@ -355,7 +359,7 @@ func NewTestBucket(t testing.TB, component string) (objstore.Bucket, func(), err
if err != nil {
return nil, nil, err
}
bkt, err := NewBucket(log.NewNopLogger(), bc, component)
bkt, err := NewBucket(log.NewNopLogger(), bc, component, nil)
if err != nil {
t.Errorf("Cannot create Azure storage container:")
return nil, nil, err
Expand Down
18 changes: 17 additions & 1 deletion providers/azure/azure_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ import (
"time"

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/pkg/errors"

"github.com/thanos-io/objstore/errutil"
"github.com/thanos-io/objstore/exthttp"
)

Expand All @@ -20,7 +23,7 @@ type TestCase struct {
}

var validConfig = []byte(`storage_account: "myStorageAccount"
storage_account_key: "abc123"
storage_account_key: "bXlTdXBlclNlY3JldEtleTEyMyFAIw=="
container: "MyContainer"
endpoint: "blob.core.windows.net"
reader_config:
Expand Down Expand Up @@ -222,3 +225,16 @@ http_config:
testutil.Ok(t, err)
testutil.Equals(t, true, transport.TLSClientConfig.InsecureSkipVerify)
}

func TestNewBucketWithErrorRoundTripper(t *testing.T) {
cfg, err := parseConfig(validConfig)
testutil.Ok(t, err)

rt := &errutil.ErrorRoundTripper{Err: errors.New("RoundTripper error")}

_, err = NewBucketWithConfig(log.NewNopLogger(), cfg, "test", rt)

// We expect an error from the RoundTripper
testutil.NotOk(t, err)
testutil.Assert(t, errors.Is(err, rt.Err), "Expected RoundTripper error, got: %v", err)
}
8 changes: 6 additions & 2 deletions providers/azure/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,14 @@ import (
const DirDelim = "/"

func getContainerClient(conf Config) (*container.Client, error) {
dt, err := exthttp.DefaultTransport(conf.HTTPConfig)
var rt http.RoundTripper
rt, err := exthttp.DefaultTransport(conf.HTTPConfig)
if err != nil {
return nil, err
}
if conf.HTTPConfig.Transport != nil {
rt = conf.HTTPConfig.Transport
}
opt := &container.ClientOptions{
ClientOptions: azcore.ClientOptions{
Retry: policy.RetryOptions{
Expand All @@ -35,7 +39,7 @@ func getContainerClient(conf Config) (*container.Client, error) {
Telemetry: policy.TelemetryOptions{
ApplicationID: "Thanos",
},
Transport: &http.Client{Transport: dt},
Transport: &http.Client{Transport: rt},
},
}

Expand Down
1 change: 1 addition & 0 deletions providers/bos/bos.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func parseConfig(conf []byte) (Config, error) {

// NewBucket new bos bucket.
func NewBucket(logger log.Logger, conf []byte, component string) (*Bucket, error) {
// TODO(https://github.com/thanos-io/objstore/pull/140): Add support for custom roundtripper.
if logger == nil {
logger = log.NewNopLogger()
}
Expand Down
20 changes: 13 additions & 7 deletions providers/cos/cos.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func parseConfig(conf []byte) (Config, error) {
}

// NewBucket returns a new Bucket using the provided cos configuration.
func NewBucket(logger log.Logger, conf []byte, component string) (*Bucket, error) {
func NewBucket(logger log.Logger, conf []byte, component string, rt http.RoundTripper) (*Bucket, error) {
if logger == nil {
logger = log.NewNopLogger()
}
Expand All @@ -104,12 +104,11 @@ func NewBucket(logger log.Logger, conf []byte, component string) (*Bucket, error
if err != nil {
return nil, errors.Wrap(err, "parsing cos configuration")
}

return NewBucketWithConfig(logger, config, component)
return NewBucketWithConfig(logger, config, component, rt)
}

// NewBucketWithConfig returns a new Bucket using the provided cos config values.
func NewBucketWithConfig(logger log.Logger, config Config, component string) (*Bucket, error) {
func NewBucketWithConfig(logger log.Logger, config Config, component string, rt http.RoundTripper) (*Bucket, error) {
if err := config.validate(); err != nil {
return nil, errors.Wrap(err, "validate cos configuration")
}
Expand All @@ -128,7 +127,14 @@ func NewBucketWithConfig(logger log.Logger, config Config, component string) (*B
}
}
b := &cos.BaseURL{BucketURL: bucketURL}
tpt, _ := exthttp.DefaultTransport(config.HTTPConfig)
var tpt http.RoundTripper
tpt, err = exthttp.DefaultTransport(config.HTTPConfig)
if err != nil {
return nil, err
}
if rt != nil {
tpt = rt
}
client := cos.NewClient(b, &http.Client{
Transport: &cos.AuthorizationTransport{
SecretID: config.SecretId,
Expand Down Expand Up @@ -485,7 +491,7 @@ func NewTestBucket(t testing.TB) (objstore.Bucket, func(), error) {
return nil, nil, err
}

b, err := NewBucket(log.NewNopLogger(), bc, "thanos-e2e-test")
b, err := NewBucket(log.NewNopLogger(), bc, "thanos-e2e-test", nil)
if err != nil {
return nil, nil, err
}
Expand All @@ -506,7 +512,7 @@ func NewTestBucket(t testing.TB) (objstore.Bucket, func(), error) {
return nil, nil, err
}

b, err := NewBucket(log.NewNopLogger(), bc, "thanos-e2e-test")
b, err := NewBucket(log.NewNopLogger(), bc, "thanos-e2e-test", nil)
if err != nil {
return nil, nil, err
}
Expand Down
22 changes: 22 additions & 0 deletions providers/cos/cos_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,16 @@
package cos

import (
"context"
"testing"
"time"

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
"github.com/pkg/errors"
"github.com/prometheus/common/model"

"github.com/thanos-io/objstore/errutil"
"github.com/thanos-io/objstore/exthttp"
)

Expand Down Expand Up @@ -137,3 +141,21 @@ func TestConfig_validate(t *testing.T) {
})
}
}

func TestNewBucketWithErrorRoundTripper(t *testing.T) {
config := Config{
Bucket: "bucket",
AppId: "123",
Region: "test",
SecretId: "sid",
SecretKey: "skey",
}
rt := &errutil.ErrorRoundTripper{Err: errors.New("RoundTripper error")}

bkt, err := NewBucketWithConfig(log.NewNopLogger(), config, "test", rt)
testutil.Ok(t, err)
_, err = bkt.Get(context.Background(), "Test")
// We expect an error from the RoundTripper
testutil.NotOk(t, err)
testutil.Assert(t, errors.Is(err, rt.Err), "Expected RoundTripper error, got: %v", err)
}
29 changes: 16 additions & 13 deletions providers/gcs/gcs.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ type Config struct {
// ChunkSizeBytes controls the maximum number of bytes of the object that the
// Writer will attempt to send to the server in a single request
// Used as storage.Writer.ChunkSize of https://pkg.go.dev/google.golang.org/cloud/storage#Writer
ChunkSizeBytes int `yaml:"chunk_size_bytes"`
ChunkSizeBytes int `yaml:"chunk_size_bytes"`
noAuth bool `yaml:"no_auth"`
}

// Bucket implements the store.Bucket and shipper.Bucket interfaces against GCS.
Expand All @@ -76,20 +77,22 @@ func parseConfig(conf []byte) (Config, error) {
}

// NewBucket returns a new Bucket against the given bucket handle.
func NewBucket(ctx context.Context, logger log.Logger, conf []byte, component string) (*Bucket, error) {
func NewBucket(ctx context.Context, logger log.Logger, conf []byte, component string, rt http.RoundTripper) (*Bucket, error) {
config, err := parseConfig(conf)
if err != nil {
return nil, err
}
return NewBucketWithConfig(ctx, logger, config, component)
return NewBucketWithConfig(ctx, logger, config, component, rt)
}

// NewBucketWithConfig returns a new Bucket with gcs Config struct.
func NewBucketWithConfig(ctx context.Context, logger log.Logger, gc Config, component string) (*Bucket, error) {
func NewBucketWithConfig(ctx context.Context, logger log.Logger, gc Config, component string, rt http.RoundTripper) (*Bucket, error) {
if gc.Bucket == "" {
return nil, errors.New("missing Google Cloud Storage bucket name for stored blocks")
}

if rt != nil {
gc.HTTPConfig.Transport = rt
}
var opts []option.ClientOption

// If ServiceAccount is provided, use them in GCS client, otherwise fallback to Google default logic.
Expand All @@ -100,7 +103,9 @@ func NewBucketWithConfig(ctx context.Context, logger log.Logger, gc Config, comp
}
opts = append(opts, option.WithCredentials(credentials))
}

if gc.noAuth {
opts = append(opts, option.WithoutAuthentication())
}
opts = append(opts,
option.WithUserAgent(fmt.Sprintf("thanos-%s/%s (%s)", component, version.Version, runtime.Version())),
)
Expand All @@ -120,14 +125,12 @@ func appendHttpOptions(gc Config, opts []option.ClientOption) ([]option.ClientOp
// Check if a roundtripper has been set in the config
// otherwise build the default transport.
var rt http.RoundTripper
rt, err := exthttp.DefaultTransport(gc.HTTPConfig)
if err != nil {
return nil, err
}
if gc.HTTPConfig.Transport != nil {
rt = gc.HTTPConfig.Transport
} else {
var err error
rt, err = exthttp.DefaultTransport(gc.HTTPConfig)
if err != nil {
return nil, err
}
}

// GCS uses some defaults when "options.WithHTTPClient" is not used that are important when we call
Expand Down Expand Up @@ -312,7 +315,7 @@ func NewTestBucket(t testing.TB, project string) (objstore.Bucket, func(), error
return nil, nil, err
}

b, err := NewBucket(ctx, log.NewNopLogger(), bc, "thanos-e2e-test")
b, err := NewBucket(ctx, log.NewNopLogger(), bc, "thanos-e2e-test", nil)
if err != nil {
return nil, nil, err
}
Expand Down
Loading

0 comments on commit 8897e65

Please sign in to comment.