From 9a8b055c6404e6d76c63a6a30938c46e7eb9cd03 Mon Sep 17 00:00:00 2001 From: Jeff Swenson Date: Thu, 17 Oct 2024 15:33:54 -0400 Subject: [PATCH] cloud: add WithClientName option The WithClientName option allows clients of the cloud package to specific a client name. This name is used by the cloud input/output network metrics to attribute network usage to a specific sub component. The initial consumers of the API are the backup job processor and the cdc cloud storage sink. This makes it possible to distinguish backup bytes from other cloud clients like CDC. Fixes: #132862 Release note: None --- pkg/ccl/backupccl/backup_processor.go | 2 +- pkg/ccl/changefeedccl/sink_cloudstorage.go | 3 +- .../changefeedccl/sink_cloudstorage_test.go | 6 ++ pkg/cloud/BUILD.bazel | 1 + pkg/cloud/amazon/aws_kms.go | 2 +- pkg/cloud/amazon/s3_storage.go | 65 ++++++++++--------- pkg/cloud/amazon/s3_storage_test.go | 12 ++-- pkg/cloud/azure/azure_storage.go | 8 +-- pkg/cloud/cloud_io.go | 8 +-- pkg/cloud/external_storage.go | 10 +++ pkg/cloud/gcp/gcs_storage.go | 3 +- pkg/cloud/httpsink/http_storage.go | 3 +- pkg/cloud/metrics.go | 2 +- pkg/cloud/options.go | 7 ++ pkg/cloud/options_test.go | 23 +++++++ 15 files changed, 103 insertions(+), 52 deletions(-) create mode 100644 pkg/cloud/options_test.go diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index 894c1c2f78bd..d55b454b41f5 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -409,7 +409,7 @@ func runBackupProcessor( progCh: progCh, settings: &flowCtx.Cfg.Settings.SV, } - storage, err := flowCtx.Cfg.ExternalStorage(ctx, dest) + storage, err := flowCtx.Cfg.ExternalStorage(ctx, dest, cloud.WithClientName("backup")) if err != nil { return err } diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage.go b/pkg/ccl/changefeedccl/sink_cloudstorage.go index 79794e433608..b0fd796a671a 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage.go @@ -487,7 +487,8 @@ func makeCloudStorageSink( // We make the external storage with a nil IOAccountingInterceptor since we // record usage metrics via s.metrics. - if s.es, err = makeExternalStorageFromURI(ctx, u.String(), user, cloud.WithIOAccountingInterceptor(nil)); err != nil { + s.es, err = makeExternalStorageFromURI(ctx, u.String(), user, cloud.WithIOAccountingInterceptor(nil), cloud.WithClientName("cdc")) + if err != nil { return nil, err } if mb != nil && s.es != nil { diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go index a363115da304..5814b31fe060 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go @@ -172,6 +172,12 @@ func TestCloudStorageSink(t *testing.T) { clientFactory := blobs.TestBlobServiceClient(settings.ExternalIODir) externalStorageFromURI := func(ctx context.Context, uri string, user username.SQLUsername, opts ...cloud.ExternalStorageOption) (cloud.ExternalStorage, error) { + var options cloud.ExternalStorageOptions + for _, opt := range opts { + opt(&options) + } + require.Equal(t, options.ClientName, "cdc") + return cloud.ExternalStorageFromURI(ctx, uri, base.ExternalIODirConfig{}, settings, clientFactory, user, diff --git a/pkg/cloud/BUILD.bazel b/pkg/cloud/BUILD.bazel index 9ebbba110bd8..75517bd83426 100644 --- a/pkg/cloud/BUILD.bazel +++ b/pkg/cloud/BUILD.bazel @@ -41,6 +41,7 @@ go_test( name = "cloud_test", srcs = [ "cloud_io_test.go", + "options_test.go", "uris_test.go", ], embed = [":cloud"], diff --git a/pkg/cloud/amazon/aws_kms.go b/pkg/cloud/amazon/aws_kms.go index 070fd7c2d987..cef0e4d8b940 100644 --- a/pkg/cloud/amazon/aws_kms.go +++ b/pkg/cloud/amazon/aws_kms.go @@ -144,7 +144,7 @@ func MakeAWSKMS(ctx context.Context, uri string, env cloud.KMSEnv) (cloud.KMS, e return nil, errors.New( "custom endpoints disallowed for aws kms due to --aws-kms-disable-http flag") } - client, err := cloud.MakeHTTPClient(env.ClusterSettings(), cloud.NilMetrics, "aws", "KMS") + client, err := cloud.MakeHTTPClient(env.ClusterSettings(), cloud.NilMetrics, "aws", "KMS", "") if err != nil { return nil, err } diff --git a/pkg/cloud/amazon/s3_storage.go b/pkg/cloud/amazon/s3_storage.go index 12bc264e4426..b804e1c88c6d 100644 --- a/pkg/cloud/amazon/s3_storage.go +++ b/pkg/cloud/amazon/s3_storage.go @@ -103,12 +103,13 @@ var NightlyEnvVarKMSParams = map[string]string{ } type s3Storage struct { - bucket *string - conf *cloudpb.ExternalStorage_S3 - ioConf base.ExternalIODirConfig - settings *cluster.Settings - prefix string - metrics *cloud.Metrics + bucket *string + conf *cloudpb.ExternalStorage_S3 + ioConf base.ExternalIODirConfig + settings *cluster.Settings + prefix string + metrics *cloud.Metrics + storageOptions cloud.ExternalStorageOptions opts s3ClientConfig cached *s3Client @@ -446,13 +447,14 @@ func MakeS3Storage( } s := &s3Storage{ - bucket: aws.String(conf.Bucket), - conf: conf, - ioConf: args.IOConf, - prefix: conf.Prefix, - metrics: args.MetricsRecorder, - settings: args.Settings, - opts: clientConfig(conf), + bucket: aws.String(conf.Bucket), + conf: conf, + ioConf: args.IOConf, + prefix: conf.Prefix, + metrics: args.MetricsRecorder, + settings: args.Settings, + opts: clientConfig(conf), + storageOptions: args.ExternalStorageOptions(), } reuse := reuseSession.Get(&args.Settings.SV) @@ -472,7 +474,7 @@ func MakeS3Storage( // other callers from making clients in the meantime, not just to avoid making // duplicate clients in a race but also because making clients concurrently // can fail if the AWS metadata server hits its rate limit. - client, _, err := newClient(ctx, args.MetricsRecorder, s.opts, s.settings) + client, _, err := s.newClient(ctx) if err != nil { return nil, err } @@ -521,11 +523,10 @@ func constructEndpointURI(endpoint string) (string, error) { // config's region is empty, used the passed bucket to determine a region and // configures the client with it as well as returning it (so the caller can // remember it for future calls). -func newClient( - ctx context.Context, metrics *cloud.Metrics, conf s3ClientConfig, settings *cluster.Settings, -) (s3Client, string, error) { +func (s *s3Storage) newClient(ctx context.Context) (s3Client, string, error) { + // Open a span if client creation will do IO/RPCs to find creds/bucket region. - if conf.region == "" || conf.auth == cloud.AuthParamImplicit { + if s.opts.region == "" || s.opts.auth == cloud.AuthParamImplicit { var sp *tracing.Span ctx, sp = tracing.ChildSpan(ctx, "s3.newClient") defer sp.Finish() @@ -536,7 +537,7 @@ func newClient( loadOptions = append(loadOptions, option) } - client, err := cloud.MakeHTTPClient(settings, metrics, "aws", conf.bucket) + client, err := cloud.MakeHTTPClient(s.settings, s.metrics, "aws", s.opts.bucket, s.storageOptions.ClientName) if err != nil { return s3Client{}, "", err } @@ -547,7 +548,7 @@ func newClient( addLoadOption(config.WithRetryMaxAttempts(retryMaxAttempts)) addLoadOption(config.WithLogger(newLogAdapter(ctx))) - if conf.verbose { + if s.opts.verbose { addLoadOption(config.WithClientLogMode(awsVerboseLogging)) } @@ -558,10 +559,10 @@ func newClient( }) }) - switch conf.auth { + switch s.opts.auth { case "", cloud.AuthParamSpecified: addLoadOption(config.WithCredentialsProvider( - aws.NewCredentialsCache(credentials.NewStaticCredentialsProvider(conf.accessKey, conf.secret, conf.tempToken)))) + aws.NewCredentialsCache(credentials.NewStaticCredentialsProvider(s.opts.accessKey, s.opts.secret, s.opts.tempToken)))) case cloud.AuthParamImplicit: } @@ -571,16 +572,16 @@ func newClient( } var endpointURI string - if conf.endpoint != "" { + if s.opts.endpoint != "" { var err error - endpointURI, err = constructEndpointURI(conf.endpoint) + endpointURI, err = constructEndpointURI(s.opts.endpoint) if err != nil { return s3Client{}, "", err } } - if conf.assumeRoleProvider.roleARN != "" { - for _, delegateProvider := range conf.delegateRoleProviders { + if s.opts.assumeRoleProvider.roleARN != "" { + for _, delegateProvider := range s.opts.delegateRoleProviders { client := sts.NewFromConfig(cfg, func(options *sts.Options) { if endpointURI != "" { options.BaseEndpoint = aws.String(endpointURI) @@ -596,11 +597,11 @@ func newClient( } }) - creds := stscreds.NewAssumeRoleProvider(client, conf.assumeRoleProvider.roleARN, withExternalID(conf.assumeRoleProvider.externalID)) + creds := stscreds.NewAssumeRoleProvider(client, s.opts.assumeRoleProvider.roleARN, withExternalID(s.opts.assumeRoleProvider.externalID)) cfg.Credentials = creds } - region := conf.region + region := s.opts.region if region == "" { // Set a hint because we have no region specified, we will override this // below once we get the actual bucket region. @@ -610,7 +611,7 @@ func newClient( if endpointURI != "" { options.BaseEndpoint = aws.String(endpointURI) } - }), conf.bucket) + }), s.opts.bucket) return err }); err != nil { return s3Client{}, "", errors.Wrap(err, "could not find s3 bucket's region") @@ -624,7 +625,7 @@ func newClient( } }) u := manager.NewUploader(c, func(uploader *manager.Uploader) { - uploader.PartSize = cloud.WriteChunkSize.Get(&settings.SV) + uploader.PartSize = cloud.WriteChunkSize.Get(&s.settings.SV) }) return s3Client{client: c, uploader: u}, region, nil } @@ -633,7 +634,7 @@ func (s *s3Storage) getClient(ctx context.Context) (*s3.Client, error) { if s.cached != nil { return s.cached.client, nil } - client, region, err := newClient(ctx, s.metrics, s.opts, s.settings) + client, region, err := s.newClient(ctx) if err != nil { return nil, err } @@ -647,7 +648,7 @@ func (s *s3Storage) getUploader(ctx context.Context) (*manager.Uploader, error) if s.cached != nil { return s.cached.uploader, nil } - client, region, err := newClient(ctx, s.metrics, s.opts, s.settings) + client, region, err := s.newClient(ctx) if err != nil { return nil, err } diff --git a/pkg/cloud/amazon/s3_storage_test.go b/pkg/cloud/amazon/s3_storage_test.go index f1bafbd17e41..c67ee32c55a2 100644 --- a/pkg/cloud/amazon/s3_storage_test.go +++ b/pkg/cloud/amazon/s3_storage_test.go @@ -615,11 +615,15 @@ func TestNewClientErrorsOnBucketRegion(t *testing.T) { testSettings := cluster.MakeTestingClusterSettings() ctx := context.Background() - cfg := s3ClientConfig{ - bucket: "bucket-does-not-exist-v1i3m", - auth: cloud.AuthParamImplicit, + s3 := s3Storage{ + opts: s3ClientConfig{ + bucket: "bucket-does-not-exist-v1i3m", + auth: cloud.AuthParamImplicit, + }, + metrics: cloud.NilMetrics, + settings: testSettings, } - _, _, err = newClient(ctx, cloud.NilMetrics, cfg, testSettings) + _, _, err = s3.newClient(ctx) require.Regexp(t, "could not find s3 bucket's region", err) } diff --git a/pkg/cloud/azure/azure_storage.go b/pkg/cloud/azure/azure_storage.go index b2b92a1770e6..55cdb340afaf 100644 --- a/pkg/cloud/azure/azure_storage.go +++ b/pkg/cloud/azure/azure_storage.go @@ -220,7 +220,8 @@ func makeAzureStorage( return nil, errors.Wrap(err, "azure: account name is not valid") } - t, err := cloud.MakeHTTPClient(args.Settings, args.MetricsRecorder, "azure", dest.AzureConfig.Container) + options := args.ExternalStorageOptions() + t, err := cloud.MakeHTTPClient(args.Settings, args.MetricsRecorder, "azure", dest.AzureConfig.Container, options.ClientName) if err != nil { return nil, errors.Wrap(err, "azure: unable to create transport") } @@ -253,11 +254,6 @@ func makeAzureStorage( "implicit credentials disallowed for azure due to --external-io-disable-implicit-credentials flag") } - options := cloud.ExternalStorageOptions{} - for _, o := range args.Options { - o(&options) - } - defaultCredentialsOptions := &DefaultAzureCredentialWithFileOptions{} if knobs := options.AzureStorageTestingKnobs; knobs != nil { defaultCredentialsOptions.testingKnobs = knobs.(*TestingKnobs) diff --git a/pkg/cloud/cloud_io.go b/pkg/cloud/cloud_io.go index a5fa6432bec0..f6ed2b639f29 100644 --- a/pkg/cloud/cloud_io.go +++ b/pkg/cloud/cloud_io.go @@ -77,9 +77,9 @@ var httpMetrics = settings.RegisterBoolSetting( // MakeHTTPClient makes an http client configured with the common settings used // for interacting with cloud storage (timeouts, retries, CA certs, etc). func MakeHTTPClient( - settings *cluster.Settings, metrics *Metrics, cloud, bucket string, + settings *cluster.Settings, metrics *Metrics, cloud, bucket, client string, ) (*http.Client, error) { - t, err := MakeTransport(settings, metrics, cloud, bucket) + t, err := MakeTransport(settings, metrics, cloud, bucket, client) if err != nil { return nil, err } @@ -99,7 +99,7 @@ func MakeHTTPClientForTransport(t http.RoundTripper) (*http.Client, error) { // used for interacting with cloud storage (timeouts, retries, CA certs, etc). // Prefer MakeHTTPClient where possible. func MakeTransport( - settings *cluster.Settings, metrics *Metrics, cloud, bucket string, + settings *cluster.Settings, metrics *Metrics, cloud, bucket, client string, ) (*http.Transport, error) { var tlsConf *tls.Config if pem := httpCustomCA.Get(&settings.SV); pem != "" { @@ -121,7 +121,7 @@ func MakeTransport( // most bulk jobs. t.MaxIdleConnsPerHost = 64 if metrics != nil { - t.DialContext = metrics.NetMetrics.Wrap(t.DialContext, cloud, bucket) + t.DialContext = metrics.NetMetrics.Wrap(t.DialContext, cloud, bucket, client) } return t, nil } diff --git a/pkg/cloud/external_storage.go b/pkg/cloud/external_storage.go index 407a45ce0249..f726d80fe063 100644 --- a/pkg/cloud/external_storage.go +++ b/pkg/cloud/external_storage.go @@ -182,12 +182,22 @@ type EarlyBootExternalStorageContext struct { MetricsRecorder *Metrics } +// ExternalStorageOptions rolls up the Options into a struct. +func (e *EarlyBootExternalStorageContext) ExternalStorageOptions() ExternalStorageOptions { + var options ExternalStorageOptions + for _, option := range e.Options { + option(&options) + } + return options +} + // ExternalStorageOptions holds dependencies and values that can be // overridden by callers of an ExternalStorageFactory via a passed // ExternalStorageOption. type ExternalStorageOptions struct { ioAccountingInterceptor ReadWriterInterceptor AzureStorageTestingKnobs base.ModuleTestingKnobs + ClientName string } // ExternalStorageConstructor is a function registered to create instances diff --git a/pkg/cloud/gcp/gcs_storage.go b/pkg/cloud/gcp/gcs_storage.go index 6dbdfcb4448e..62fe09648825 100644 --- a/pkg/cloud/gcp/gcs_storage.go +++ b/pkg/cloud/gcp/gcs_storage.go @@ -185,7 +185,8 @@ func makeGCSStorage( opts = append(opts, assumeOpt) } - baseTransport, err := cloud.MakeTransport(args.Settings, args.MetricsRecorder, "gcs", conf.Bucket) + clientName := args.ExternalStorageOptions().ClientName + baseTransport, err := cloud.MakeTransport(args.Settings, args.MetricsRecorder, "gcs", conf.Bucket, clientName) if err != nil { return nil, errors.Wrap(err, "failed to create http transport") } diff --git a/pkg/cloud/httpsink/http_storage.go b/pkg/cloud/httpsink/http_storage.go index 1b3969584cbb..525bfc9357a4 100644 --- a/pkg/cloud/httpsink/http_storage.go +++ b/pkg/cloud/httpsink/http_storage.go @@ -66,7 +66,8 @@ func MakeHTTPStorage( return nil, errors.Errorf("HTTP storage requested but prefix path not provided") } - client, err := cloud.MakeHTTPClient(args.Settings, args.MetricsRecorder, "http", base) + clientName := args.ExternalStorageOptions().ClientName + client, err := cloud.MakeHTTPClient(args.Settings, args.MetricsRecorder, "http", base, clientName) if err != nil { return nil, err } diff --git a/pkg/cloud/metrics.go b/pkg/cloud/metrics.go index 8e07b34471b3..5c802bf8367b 100644 --- a/pkg/cloud/metrics.go +++ b/pkg/cloud/metrics.go @@ -133,7 +133,7 @@ func MakeMetrics(cidrLookup *cidr.Lookup) metric.Struct { ConnsOpened: metric.NewCounter(connsOpened), ConnsReused: metric.NewCounter(connsReused), TLSHandhakes: metric.NewCounter(tlsHandhakes), - NetMetrics: cidrLookup.MakeNetMetrics(cloudWriteBytes, cloudReadBytes, "cloud", "bucket"), + NetMetrics: cidrLookup.MakeNetMetrics(cloudWriteBytes, cloudReadBytes, "cloud", "bucket", "client"), } } diff --git a/pkg/cloud/options.go b/pkg/cloud/options.go index f9234c02d294..e4bf68c3dfb0 100644 --- a/pkg/cloud/options.go +++ b/pkg/cloud/options.go @@ -23,3 +23,10 @@ func WithAzureStorageTestingKnobs(knobs base.ModuleTestingKnobs) ExternalStorage opts.AzureStorageTestingKnobs = knobs } } + +// WithClientName sets the "client" label on network metrics. +func WithClientName(name string) ExternalStorageOption { + return func(opts *ExternalStorageOptions) { + opts.ClientName = name + } +} diff --git a/pkg/cloud/options_test.go b/pkg/cloud/options_test.go new file mode 100644 index 000000000000..b54b9348196f --- /dev/null +++ b/pkg/cloud/options_test.go @@ -0,0 +1,23 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package cloud + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestClientName(t *testing.T) { + options := func(options ...ExternalStorageOption) ExternalStorageOptions { + context := EarlyBootExternalStorageContext{ + Options: options, + } + return context.ExternalStorageOptions() + } + require.Empty(t, options().ClientName) + require.Equal(t, options(WithClientName("this-is-the-name")).ClientName, "this-is-the-name") +}