Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
132863: cloud: add WithClientName option r=jeffswenson a=jeffswenson

The WithClientName option allows clients of the cloud package to
specific a client name. This name is used by the cloud input/output
network metrics to attribute network usage to a specific sub component.

The initial consumers of the API are the backup job processor and the
cdc cloud storage sink. This makes it possible to distinguish backup
bytes from other cloud clients like CDC.

Fixes: cockroachdb#132862
Release note: None

Co-authored-by: Jeff Swenson <[email protected]>
  • Loading branch information
craig[bot] and jeffswenson committed Oct 17, 2024
2 parents 7374102 + 9a8b055 commit f38e635
Show file tree
Hide file tree
Showing 15 changed files with 103 additions and 52 deletions.
2 changes: 1 addition & 1 deletion pkg/ccl/backupccl/backup_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ func runBackupProcessor(
progCh: progCh,
settings: &flowCtx.Cfg.Settings.SV,
}
storage, err := flowCtx.Cfg.ExternalStorage(ctx, dest)
storage, err := flowCtx.Cfg.ExternalStorage(ctx, dest, cloud.WithClientName("backup"))
if err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/changefeedccl/sink_cloudstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,8 @@ func makeCloudStorageSink(

// We make the external storage with a nil IOAccountingInterceptor since we
// record usage metrics via s.metrics.
if s.es, err = makeExternalStorageFromURI(ctx, u.String(), user, cloud.WithIOAccountingInterceptor(nil)); err != nil {
s.es, err = makeExternalStorageFromURI(ctx, u.String(), user, cloud.WithIOAccountingInterceptor(nil), cloud.WithClientName("cdc"))
if err != nil {
return nil, err
}
if mb != nil && s.es != nil {
Expand Down
6 changes: 6 additions & 0 deletions pkg/ccl/changefeedccl/sink_cloudstorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,12 @@ func TestCloudStorageSink(t *testing.T) {
clientFactory := blobs.TestBlobServiceClient(settings.ExternalIODir)
externalStorageFromURI := func(ctx context.Context, uri string, user username.SQLUsername, opts ...cloud.ExternalStorageOption) (cloud.ExternalStorage,
error) {
var options cloud.ExternalStorageOptions
for _, opt := range opts {
opt(&options)
}
require.Equal(t, options.ClientName, "cdc")

return cloud.ExternalStorageFromURI(ctx, uri, base.ExternalIODirConfig{}, settings,
clientFactory,
user,
Expand Down
1 change: 1 addition & 0 deletions pkg/cloud/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ go_test(
name = "cloud_test",
srcs = [
"cloud_io_test.go",
"options_test.go",
"uris_test.go",
],
embed = [":cloud"],
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloud/amazon/aws_kms.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func MakeAWSKMS(ctx context.Context, uri string, env cloud.KMSEnv) (cloud.KMS, e
return nil, errors.New(
"custom endpoints disallowed for aws kms due to --aws-kms-disable-http flag")
}
client, err := cloud.MakeHTTPClient(env.ClusterSettings(), cloud.NilMetrics, "aws", "KMS")
client, err := cloud.MakeHTTPClient(env.ClusterSettings(), cloud.NilMetrics, "aws", "KMS", "")
if err != nil {
return nil, err
}
Expand Down
65 changes: 33 additions & 32 deletions pkg/cloud/amazon/s3_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,12 +103,13 @@ var NightlyEnvVarKMSParams = map[string]string{
}

type s3Storage struct {
bucket *string
conf *cloudpb.ExternalStorage_S3
ioConf base.ExternalIODirConfig
settings *cluster.Settings
prefix string
metrics *cloud.Metrics
bucket *string
conf *cloudpb.ExternalStorage_S3
ioConf base.ExternalIODirConfig
settings *cluster.Settings
prefix string
metrics *cloud.Metrics
storageOptions cloud.ExternalStorageOptions

opts s3ClientConfig
cached *s3Client
Expand Down Expand Up @@ -446,13 +447,14 @@ func MakeS3Storage(
}

s := &s3Storage{
bucket: aws.String(conf.Bucket),
conf: conf,
ioConf: args.IOConf,
prefix: conf.Prefix,
metrics: args.MetricsRecorder,
settings: args.Settings,
opts: clientConfig(conf),
bucket: aws.String(conf.Bucket),
conf: conf,
ioConf: args.IOConf,
prefix: conf.Prefix,
metrics: args.MetricsRecorder,
settings: args.Settings,
opts: clientConfig(conf),
storageOptions: args.ExternalStorageOptions(),
}

reuse := reuseSession.Get(&args.Settings.SV)
Expand All @@ -472,7 +474,7 @@ func MakeS3Storage(
// other callers from making clients in the meantime, not just to avoid making
// duplicate clients in a race but also because making clients concurrently
// can fail if the AWS metadata server hits its rate limit.
client, _, err := newClient(ctx, args.MetricsRecorder, s.opts, s.settings)
client, _, err := s.newClient(ctx)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -521,11 +523,10 @@ func constructEndpointURI(endpoint string) (string, error) {
// config's region is empty, used the passed bucket to determine a region and
// configures the client with it as well as returning it (so the caller can
// remember it for future calls).
func newClient(
ctx context.Context, metrics *cloud.Metrics, conf s3ClientConfig, settings *cluster.Settings,
) (s3Client, string, error) {
func (s *s3Storage) newClient(ctx context.Context) (s3Client, string, error) {

// Open a span if client creation will do IO/RPCs to find creds/bucket region.
if conf.region == "" || conf.auth == cloud.AuthParamImplicit {
if s.opts.region == "" || s.opts.auth == cloud.AuthParamImplicit {
var sp *tracing.Span
ctx, sp = tracing.ChildSpan(ctx, "s3.newClient")
defer sp.Finish()
Expand All @@ -536,7 +537,7 @@ func newClient(
loadOptions = append(loadOptions, option)
}

client, err := cloud.MakeHTTPClient(settings, metrics, "aws", conf.bucket)
client, err := cloud.MakeHTTPClient(s.settings, s.metrics, "aws", s.opts.bucket, s.storageOptions.ClientName)
if err != nil {
return s3Client{}, "", err
}
Expand All @@ -547,7 +548,7 @@ func newClient(
addLoadOption(config.WithRetryMaxAttempts(retryMaxAttempts))

addLoadOption(config.WithLogger(newLogAdapter(ctx)))
if conf.verbose {
if s.opts.verbose {
addLoadOption(config.WithClientLogMode(awsVerboseLogging))
}

Expand All @@ -558,10 +559,10 @@ func newClient(
})
})

switch conf.auth {
switch s.opts.auth {
case "", cloud.AuthParamSpecified:
addLoadOption(config.WithCredentialsProvider(
aws.NewCredentialsCache(credentials.NewStaticCredentialsProvider(conf.accessKey, conf.secret, conf.tempToken))))
aws.NewCredentialsCache(credentials.NewStaticCredentialsProvider(s.opts.accessKey, s.opts.secret, s.opts.tempToken))))
case cloud.AuthParamImplicit:
}

Expand All @@ -571,16 +572,16 @@ func newClient(
}

var endpointURI string
if conf.endpoint != "" {
if s.opts.endpoint != "" {
var err error
endpointURI, err = constructEndpointURI(conf.endpoint)
endpointURI, err = constructEndpointURI(s.opts.endpoint)
if err != nil {
return s3Client{}, "", err
}
}

if conf.assumeRoleProvider.roleARN != "" {
for _, delegateProvider := range conf.delegateRoleProviders {
if s.opts.assumeRoleProvider.roleARN != "" {
for _, delegateProvider := range s.opts.delegateRoleProviders {
client := sts.NewFromConfig(cfg, func(options *sts.Options) {
if endpointURI != "" {
options.BaseEndpoint = aws.String(endpointURI)
Expand All @@ -596,11 +597,11 @@ func newClient(
}
})

creds := stscreds.NewAssumeRoleProvider(client, conf.assumeRoleProvider.roleARN, withExternalID(conf.assumeRoleProvider.externalID))
creds := stscreds.NewAssumeRoleProvider(client, s.opts.assumeRoleProvider.roleARN, withExternalID(s.opts.assumeRoleProvider.externalID))
cfg.Credentials = creds
}

region := conf.region
region := s.opts.region
if region == "" {
// Set a hint because we have no region specified, we will override this
// below once we get the actual bucket region.
Expand All @@ -610,7 +611,7 @@ func newClient(
if endpointURI != "" {
options.BaseEndpoint = aws.String(endpointURI)
}
}), conf.bucket)
}), s.opts.bucket)
return err
}); err != nil {
return s3Client{}, "", errors.Wrap(err, "could not find s3 bucket's region")
Expand All @@ -624,7 +625,7 @@ func newClient(
}
})
u := manager.NewUploader(c, func(uploader *manager.Uploader) {
uploader.PartSize = cloud.WriteChunkSize.Get(&settings.SV)
uploader.PartSize = cloud.WriteChunkSize.Get(&s.settings.SV)
})
return s3Client{client: c, uploader: u}, region, nil
}
Expand All @@ -633,7 +634,7 @@ func (s *s3Storage) getClient(ctx context.Context) (*s3.Client, error) {
if s.cached != nil {
return s.cached.client, nil
}
client, region, err := newClient(ctx, s.metrics, s.opts, s.settings)
client, region, err := s.newClient(ctx)
if err != nil {
return nil, err
}
Expand All @@ -647,7 +648,7 @@ func (s *s3Storage) getUploader(ctx context.Context) (*manager.Uploader, error)
if s.cached != nil {
return s.cached.uploader, nil
}
client, region, err := newClient(ctx, s.metrics, s.opts, s.settings)
client, region, err := s.newClient(ctx)
if err != nil {
return nil, err
}
Expand Down
12 changes: 8 additions & 4 deletions pkg/cloud/amazon/s3_storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,11 +615,15 @@ func TestNewClientErrorsOnBucketRegion(t *testing.T) {

testSettings := cluster.MakeTestingClusterSettings()
ctx := context.Background()
cfg := s3ClientConfig{
bucket: "bucket-does-not-exist-v1i3m",
auth: cloud.AuthParamImplicit,
s3 := s3Storage{
opts: s3ClientConfig{
bucket: "bucket-does-not-exist-v1i3m",
auth: cloud.AuthParamImplicit,
},
metrics: cloud.NilMetrics,
settings: testSettings,
}
_, _, err = newClient(ctx, cloud.NilMetrics, cfg, testSettings)
_, _, err = s3.newClient(ctx)
require.Regexp(t, "could not find s3 bucket's region", err)
}

Expand Down
8 changes: 2 additions & 6 deletions pkg/cloud/azure/azure_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,8 @@ func makeAzureStorage(
return nil, errors.Wrap(err, "azure: account name is not valid")
}

t, err := cloud.MakeHTTPClient(args.Settings, args.MetricsRecorder, "azure", dest.AzureConfig.Container)
options := args.ExternalStorageOptions()
t, err := cloud.MakeHTTPClient(args.Settings, args.MetricsRecorder, "azure", dest.AzureConfig.Container, options.ClientName)
if err != nil {
return nil, errors.Wrap(err, "azure: unable to create transport")
}
Expand Down Expand Up @@ -253,11 +254,6 @@ func makeAzureStorage(
"implicit credentials disallowed for azure due to --external-io-disable-implicit-credentials flag")
}

options := cloud.ExternalStorageOptions{}
for _, o := range args.Options {
o(&options)
}

defaultCredentialsOptions := &DefaultAzureCredentialWithFileOptions{}
if knobs := options.AzureStorageTestingKnobs; knobs != nil {
defaultCredentialsOptions.testingKnobs = knobs.(*TestingKnobs)
Expand Down
8 changes: 4 additions & 4 deletions pkg/cloud/cloud_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,9 +77,9 @@ var httpMetrics = settings.RegisterBoolSetting(
// MakeHTTPClient makes an http client configured with the common settings used
// for interacting with cloud storage (timeouts, retries, CA certs, etc).
func MakeHTTPClient(
settings *cluster.Settings, metrics *Metrics, cloud, bucket string,
settings *cluster.Settings, metrics *Metrics, cloud, bucket, client string,
) (*http.Client, error) {
t, err := MakeTransport(settings, metrics, cloud, bucket)
t, err := MakeTransport(settings, metrics, cloud, bucket, client)
if err != nil {
return nil, err
}
Expand All @@ -99,7 +99,7 @@ func MakeHTTPClientForTransport(t http.RoundTripper) (*http.Client, error) {
// used for interacting with cloud storage (timeouts, retries, CA certs, etc).
// Prefer MakeHTTPClient where possible.
func MakeTransport(
settings *cluster.Settings, metrics *Metrics, cloud, bucket string,
settings *cluster.Settings, metrics *Metrics, cloud, bucket, client string,
) (*http.Transport, error) {
var tlsConf *tls.Config
if pem := httpCustomCA.Get(&settings.SV); pem != "" {
Expand All @@ -121,7 +121,7 @@ func MakeTransport(
// most bulk jobs.
t.MaxIdleConnsPerHost = 64
if metrics != nil {
t.DialContext = metrics.NetMetrics.Wrap(t.DialContext, cloud, bucket)
t.DialContext = metrics.NetMetrics.Wrap(t.DialContext, cloud, bucket, client)
}
return t, nil
}
Expand Down
10 changes: 10 additions & 0 deletions pkg/cloud/external_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,12 +182,22 @@ type EarlyBootExternalStorageContext struct {
MetricsRecorder *Metrics
}

// ExternalStorageOptions rolls up the Options into a struct.
func (e *EarlyBootExternalStorageContext) ExternalStorageOptions() ExternalStorageOptions {
var options ExternalStorageOptions
for _, option := range e.Options {
option(&options)
}
return options
}

// ExternalStorageOptions holds dependencies and values that can be
// overridden by callers of an ExternalStorageFactory via a passed
// ExternalStorageOption.
type ExternalStorageOptions struct {
ioAccountingInterceptor ReadWriterInterceptor
AzureStorageTestingKnobs base.ModuleTestingKnobs
ClientName string
}

// ExternalStorageConstructor is a function registered to create instances
Expand Down
3 changes: 2 additions & 1 deletion pkg/cloud/gcp/gcs_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,8 @@ func makeGCSStorage(
opts = append(opts, assumeOpt)
}

baseTransport, err := cloud.MakeTransport(args.Settings, args.MetricsRecorder, "gcs", conf.Bucket)
clientName := args.ExternalStorageOptions().ClientName
baseTransport, err := cloud.MakeTransport(args.Settings, args.MetricsRecorder, "gcs", conf.Bucket, clientName)
if err != nil {
return nil, errors.Wrap(err, "failed to create http transport")
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/cloud/httpsink/http_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ func MakeHTTPStorage(
return nil, errors.Errorf("HTTP storage requested but prefix path not provided")
}

client, err := cloud.MakeHTTPClient(args.Settings, args.MetricsRecorder, "http", base)
clientName := args.ExternalStorageOptions().ClientName
client, err := cloud.MakeHTTPClient(args.Settings, args.MetricsRecorder, "http", base, clientName)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cloud/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func MakeMetrics(cidrLookup *cidr.Lookup) metric.Struct {
ConnsOpened: metric.NewCounter(connsOpened),
ConnsReused: metric.NewCounter(connsReused),
TLSHandhakes: metric.NewCounter(tlsHandhakes),
NetMetrics: cidrLookup.MakeNetMetrics(cloudWriteBytes, cloudReadBytes, "cloud", "bucket"),
NetMetrics: cidrLookup.MakeNetMetrics(cloudWriteBytes, cloudReadBytes, "cloud", "bucket", "client"),
}
}

Expand Down
7 changes: 7 additions & 0 deletions pkg/cloud/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,10 @@ func WithAzureStorageTestingKnobs(knobs base.ModuleTestingKnobs) ExternalStorage
opts.AzureStorageTestingKnobs = knobs
}
}

// WithClientName sets the "client" label on network metrics.
func WithClientName(name string) ExternalStorageOption {
return func(opts *ExternalStorageOptions) {
opts.ClientName = name
}
}
23 changes: 23 additions & 0 deletions pkg/cloud/options_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package cloud

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestClientName(t *testing.T) {
options := func(options ...ExternalStorageOption) ExternalStorageOptions {
context := EarlyBootExternalStorageContext{
Options: options,
}
return context.ExternalStorageOptions()
}
require.Empty(t, options().ClientName)
require.Equal(t, options(WithClientName("this-is-the-name")).ClientName, "this-is-the-name")
}

0 comments on commit f38e635

Please sign in to comment.