diff --git a/pkg/ccl/backupccl/backup_processor.go b/pkg/ccl/backupccl/backup_processor.go index 894c1c2f78bd..d55b454b41f5 100644 --- a/pkg/ccl/backupccl/backup_processor.go +++ b/pkg/ccl/backupccl/backup_processor.go @@ -409,7 +409,7 @@ func runBackupProcessor( progCh: progCh, settings: &flowCtx.Cfg.Settings.SV, } - storage, err := flowCtx.Cfg.ExternalStorage(ctx, dest) + storage, err := flowCtx.Cfg.ExternalStorage(ctx, dest, cloud.WithClientName("backup")) if err != nil { return err } diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage.go b/pkg/ccl/changefeedccl/sink_cloudstorage.go index 79794e433608..b0fd796a671a 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage.go @@ -487,7 +487,8 @@ func makeCloudStorageSink( // We make the external storage with a nil IOAccountingInterceptor since we // record usage metrics via s.metrics. - if s.es, err = makeExternalStorageFromURI(ctx, u.String(), user, cloud.WithIOAccountingInterceptor(nil)); err != nil { + s.es, err = makeExternalStorageFromURI(ctx, u.String(), user, cloud.WithIOAccountingInterceptor(nil), cloud.WithClientName("cdc")) + if err != nil { return nil, err } if mb != nil && s.es != nil { diff --git a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go index a363115da304..5814b31fe060 100644 --- a/pkg/ccl/changefeedccl/sink_cloudstorage_test.go +++ b/pkg/ccl/changefeedccl/sink_cloudstorage_test.go @@ -172,6 +172,12 @@ func TestCloudStorageSink(t *testing.T) { clientFactory := blobs.TestBlobServiceClient(settings.ExternalIODir) externalStorageFromURI := func(ctx context.Context, uri string, user username.SQLUsername, opts ...cloud.ExternalStorageOption) (cloud.ExternalStorage, error) { + var options cloud.ExternalStorageOptions + for _, opt := range opts { + opt(&options) + } + require.Equal(t, options.ClientName, "cdc") + return cloud.ExternalStorageFromURI(ctx, uri, base.ExternalIODirConfig{}, settings, clientFactory, user, diff --git a/pkg/cloud/BUILD.bazel b/pkg/cloud/BUILD.bazel index 9ebbba110bd8..75517bd83426 100644 --- a/pkg/cloud/BUILD.bazel +++ b/pkg/cloud/BUILD.bazel @@ -41,6 +41,7 @@ go_test( name = "cloud_test", srcs = [ "cloud_io_test.go", + "options_test.go", "uris_test.go", ], embed = [":cloud"], diff --git a/pkg/cloud/amazon/aws_kms.go b/pkg/cloud/amazon/aws_kms.go index 070fd7c2d987..cef0e4d8b940 100644 --- a/pkg/cloud/amazon/aws_kms.go +++ b/pkg/cloud/amazon/aws_kms.go @@ -144,7 +144,7 @@ func MakeAWSKMS(ctx context.Context, uri string, env cloud.KMSEnv) (cloud.KMS, e return nil, errors.New( "custom endpoints disallowed for aws kms due to --aws-kms-disable-http flag") } - client, err := cloud.MakeHTTPClient(env.ClusterSettings(), cloud.NilMetrics, "aws", "KMS") + client, err := cloud.MakeHTTPClient(env.ClusterSettings(), cloud.NilMetrics, "aws", "KMS", "") if err != nil { return nil, err } diff --git a/pkg/cloud/amazon/s3_storage.go b/pkg/cloud/amazon/s3_storage.go index 12bc264e4426..b804e1c88c6d 100644 --- a/pkg/cloud/amazon/s3_storage.go +++ b/pkg/cloud/amazon/s3_storage.go @@ -103,12 +103,13 @@ var NightlyEnvVarKMSParams = map[string]string{ } type s3Storage struct { - bucket *string - conf *cloudpb.ExternalStorage_S3 - ioConf base.ExternalIODirConfig - settings *cluster.Settings - prefix string - metrics *cloud.Metrics + bucket *string + conf *cloudpb.ExternalStorage_S3 + ioConf base.ExternalIODirConfig + settings *cluster.Settings + prefix string + metrics *cloud.Metrics + storageOptions cloud.ExternalStorageOptions opts s3ClientConfig cached *s3Client @@ -446,13 +447,14 @@ func MakeS3Storage( } s := &s3Storage{ - bucket: aws.String(conf.Bucket), - conf: conf, - ioConf: args.IOConf, - prefix: conf.Prefix, - metrics: args.MetricsRecorder, - settings: args.Settings, - opts: clientConfig(conf), + bucket: aws.String(conf.Bucket), + conf: conf, + ioConf: args.IOConf, + prefix: conf.Prefix, + metrics: args.MetricsRecorder, + settings: args.Settings, + opts: clientConfig(conf), + storageOptions: args.ExternalStorageOptions(), } reuse := reuseSession.Get(&args.Settings.SV) @@ -472,7 +474,7 @@ func MakeS3Storage( // other callers from making clients in the meantime, not just to avoid making // duplicate clients in a race but also because making clients concurrently // can fail if the AWS metadata server hits its rate limit. - client, _, err := newClient(ctx, args.MetricsRecorder, s.opts, s.settings) + client, _, err := s.newClient(ctx) if err != nil { return nil, err } @@ -521,11 +523,10 @@ func constructEndpointURI(endpoint string) (string, error) { // config's region is empty, used the passed bucket to determine a region and // configures the client with it as well as returning it (so the caller can // remember it for future calls). -func newClient( - ctx context.Context, metrics *cloud.Metrics, conf s3ClientConfig, settings *cluster.Settings, -) (s3Client, string, error) { +func (s *s3Storage) newClient(ctx context.Context) (s3Client, string, error) { + // Open a span if client creation will do IO/RPCs to find creds/bucket region. - if conf.region == "" || conf.auth == cloud.AuthParamImplicit { + if s.opts.region == "" || s.opts.auth == cloud.AuthParamImplicit { var sp *tracing.Span ctx, sp = tracing.ChildSpan(ctx, "s3.newClient") defer sp.Finish() @@ -536,7 +537,7 @@ func newClient( loadOptions = append(loadOptions, option) } - client, err := cloud.MakeHTTPClient(settings, metrics, "aws", conf.bucket) + client, err := cloud.MakeHTTPClient(s.settings, s.metrics, "aws", s.opts.bucket, s.storageOptions.ClientName) if err != nil { return s3Client{}, "", err } @@ -547,7 +548,7 @@ func newClient( addLoadOption(config.WithRetryMaxAttempts(retryMaxAttempts)) addLoadOption(config.WithLogger(newLogAdapter(ctx))) - if conf.verbose { + if s.opts.verbose { addLoadOption(config.WithClientLogMode(awsVerboseLogging)) } @@ -558,10 +559,10 @@ func newClient( }) }) - switch conf.auth { + switch s.opts.auth { case "", cloud.AuthParamSpecified: addLoadOption(config.WithCredentialsProvider( - aws.NewCredentialsCache(credentials.NewStaticCredentialsProvider(conf.accessKey, conf.secret, conf.tempToken)))) + aws.NewCredentialsCache(credentials.NewStaticCredentialsProvider(s.opts.accessKey, s.opts.secret, s.opts.tempToken)))) case cloud.AuthParamImplicit: } @@ -571,16 +572,16 @@ func newClient( } var endpointURI string - if conf.endpoint != "" { + if s.opts.endpoint != "" { var err error - endpointURI, err = constructEndpointURI(conf.endpoint) + endpointURI, err = constructEndpointURI(s.opts.endpoint) if err != nil { return s3Client{}, "", err } } - if conf.assumeRoleProvider.roleARN != "" { - for _, delegateProvider := range conf.delegateRoleProviders { + if s.opts.assumeRoleProvider.roleARN != "" { + for _, delegateProvider := range s.opts.delegateRoleProviders { client := sts.NewFromConfig(cfg, func(options *sts.Options) { if endpointURI != "" { options.BaseEndpoint = aws.String(endpointURI) @@ -596,11 +597,11 @@ func newClient( } }) - creds := stscreds.NewAssumeRoleProvider(client, conf.assumeRoleProvider.roleARN, withExternalID(conf.assumeRoleProvider.externalID)) + creds := stscreds.NewAssumeRoleProvider(client, s.opts.assumeRoleProvider.roleARN, withExternalID(s.opts.assumeRoleProvider.externalID)) cfg.Credentials = creds } - region := conf.region + region := s.opts.region if region == "" { // Set a hint because we have no region specified, we will override this // below once we get the actual bucket region. @@ -610,7 +611,7 @@ func newClient( if endpointURI != "" { options.BaseEndpoint = aws.String(endpointURI) } - }), conf.bucket) + }), s.opts.bucket) return err }); err != nil { return s3Client{}, "", errors.Wrap(err, "could not find s3 bucket's region") @@ -624,7 +625,7 @@ func newClient( } }) u := manager.NewUploader(c, func(uploader *manager.Uploader) { - uploader.PartSize = cloud.WriteChunkSize.Get(&settings.SV) + uploader.PartSize = cloud.WriteChunkSize.Get(&s.settings.SV) }) return s3Client{client: c, uploader: u}, region, nil } @@ -633,7 +634,7 @@ func (s *s3Storage) getClient(ctx context.Context) (*s3.Client, error) { if s.cached != nil { return s.cached.client, nil } - client, region, err := newClient(ctx, s.metrics, s.opts, s.settings) + client, region, err := s.newClient(ctx) if err != nil { return nil, err } @@ -647,7 +648,7 @@ func (s *s3Storage) getUploader(ctx context.Context) (*manager.Uploader, error) if s.cached != nil { return s.cached.uploader, nil } - client, region, err := newClient(ctx, s.metrics, s.opts, s.settings) + client, region, err := s.newClient(ctx) if err != nil { return nil, err } diff --git a/pkg/cloud/amazon/s3_storage_test.go b/pkg/cloud/amazon/s3_storage_test.go index f1bafbd17e41..c67ee32c55a2 100644 --- a/pkg/cloud/amazon/s3_storage_test.go +++ b/pkg/cloud/amazon/s3_storage_test.go @@ -615,11 +615,15 @@ func TestNewClientErrorsOnBucketRegion(t *testing.T) { testSettings := cluster.MakeTestingClusterSettings() ctx := context.Background() - cfg := s3ClientConfig{ - bucket: "bucket-does-not-exist-v1i3m", - auth: cloud.AuthParamImplicit, + s3 := s3Storage{ + opts: s3ClientConfig{ + bucket: "bucket-does-not-exist-v1i3m", + auth: cloud.AuthParamImplicit, + }, + metrics: cloud.NilMetrics, + settings: testSettings, } - _, _, err = newClient(ctx, cloud.NilMetrics, cfg, testSettings) + _, _, err = s3.newClient(ctx) require.Regexp(t, "could not find s3 bucket's region", err) } diff --git a/pkg/cloud/azure/azure_storage.go b/pkg/cloud/azure/azure_storage.go index b2b92a1770e6..55cdb340afaf 100644 --- a/pkg/cloud/azure/azure_storage.go +++ b/pkg/cloud/azure/azure_storage.go @@ -220,7 +220,8 @@ func makeAzureStorage( return nil, errors.Wrap(err, "azure: account name is not valid") } - t, err := cloud.MakeHTTPClient(args.Settings, args.MetricsRecorder, "azure", dest.AzureConfig.Container) + options := args.ExternalStorageOptions() + t, err := cloud.MakeHTTPClient(args.Settings, args.MetricsRecorder, "azure", dest.AzureConfig.Container, options.ClientName) if err != nil { return nil, errors.Wrap(err, "azure: unable to create transport") } @@ -253,11 +254,6 @@ func makeAzureStorage( "implicit credentials disallowed for azure due to --external-io-disable-implicit-credentials flag") } - options := cloud.ExternalStorageOptions{} - for _, o := range args.Options { - o(&options) - } - defaultCredentialsOptions := &DefaultAzureCredentialWithFileOptions{} if knobs := options.AzureStorageTestingKnobs; knobs != nil { defaultCredentialsOptions.testingKnobs = knobs.(*TestingKnobs) diff --git a/pkg/cloud/cloud_io.go b/pkg/cloud/cloud_io.go index a5fa6432bec0..f6ed2b639f29 100644 --- a/pkg/cloud/cloud_io.go +++ b/pkg/cloud/cloud_io.go @@ -77,9 +77,9 @@ var httpMetrics = settings.RegisterBoolSetting( // MakeHTTPClient makes an http client configured with the common settings used // for interacting with cloud storage (timeouts, retries, CA certs, etc). func MakeHTTPClient( - settings *cluster.Settings, metrics *Metrics, cloud, bucket string, + settings *cluster.Settings, metrics *Metrics, cloud, bucket, client string, ) (*http.Client, error) { - t, err := MakeTransport(settings, metrics, cloud, bucket) + t, err := MakeTransport(settings, metrics, cloud, bucket, client) if err != nil { return nil, err } @@ -99,7 +99,7 @@ func MakeHTTPClientForTransport(t http.RoundTripper) (*http.Client, error) { // used for interacting with cloud storage (timeouts, retries, CA certs, etc). // Prefer MakeHTTPClient where possible. func MakeTransport( - settings *cluster.Settings, metrics *Metrics, cloud, bucket string, + settings *cluster.Settings, metrics *Metrics, cloud, bucket, client string, ) (*http.Transport, error) { var tlsConf *tls.Config if pem := httpCustomCA.Get(&settings.SV); pem != "" { @@ -121,7 +121,7 @@ func MakeTransport( // most bulk jobs. t.MaxIdleConnsPerHost = 64 if metrics != nil { - t.DialContext = metrics.NetMetrics.Wrap(t.DialContext, cloud, bucket) + t.DialContext = metrics.NetMetrics.Wrap(t.DialContext, cloud, bucket, client) } return t, nil } diff --git a/pkg/cloud/external_storage.go b/pkg/cloud/external_storage.go index 407a45ce0249..f726d80fe063 100644 --- a/pkg/cloud/external_storage.go +++ b/pkg/cloud/external_storage.go @@ -182,12 +182,22 @@ type EarlyBootExternalStorageContext struct { MetricsRecorder *Metrics } +// ExternalStorageOptions rolls up the Options into a struct. +func (e *EarlyBootExternalStorageContext) ExternalStorageOptions() ExternalStorageOptions { + var options ExternalStorageOptions + for _, option := range e.Options { + option(&options) + } + return options +} + // ExternalStorageOptions holds dependencies and values that can be // overridden by callers of an ExternalStorageFactory via a passed // ExternalStorageOption. type ExternalStorageOptions struct { ioAccountingInterceptor ReadWriterInterceptor AzureStorageTestingKnobs base.ModuleTestingKnobs + ClientName string } // ExternalStorageConstructor is a function registered to create instances diff --git a/pkg/cloud/gcp/gcs_storage.go b/pkg/cloud/gcp/gcs_storage.go index 6dbdfcb4448e..62fe09648825 100644 --- a/pkg/cloud/gcp/gcs_storage.go +++ b/pkg/cloud/gcp/gcs_storage.go @@ -185,7 +185,8 @@ func makeGCSStorage( opts = append(opts, assumeOpt) } - baseTransport, err := cloud.MakeTransport(args.Settings, args.MetricsRecorder, "gcs", conf.Bucket) + clientName := args.ExternalStorageOptions().ClientName + baseTransport, err := cloud.MakeTransport(args.Settings, args.MetricsRecorder, "gcs", conf.Bucket, clientName) if err != nil { return nil, errors.Wrap(err, "failed to create http transport") } diff --git a/pkg/cloud/httpsink/http_storage.go b/pkg/cloud/httpsink/http_storage.go index 1b3969584cbb..525bfc9357a4 100644 --- a/pkg/cloud/httpsink/http_storage.go +++ b/pkg/cloud/httpsink/http_storage.go @@ -66,7 +66,8 @@ func MakeHTTPStorage( return nil, errors.Errorf("HTTP storage requested but prefix path not provided") } - client, err := cloud.MakeHTTPClient(args.Settings, args.MetricsRecorder, "http", base) + clientName := args.ExternalStorageOptions().ClientName + client, err := cloud.MakeHTTPClient(args.Settings, args.MetricsRecorder, "http", base, clientName) if err != nil { return nil, err } diff --git a/pkg/cloud/metrics.go b/pkg/cloud/metrics.go index 8e07b34471b3..5c802bf8367b 100644 --- a/pkg/cloud/metrics.go +++ b/pkg/cloud/metrics.go @@ -133,7 +133,7 @@ func MakeMetrics(cidrLookup *cidr.Lookup) metric.Struct { ConnsOpened: metric.NewCounter(connsOpened), ConnsReused: metric.NewCounter(connsReused), TLSHandhakes: metric.NewCounter(tlsHandhakes), - NetMetrics: cidrLookup.MakeNetMetrics(cloudWriteBytes, cloudReadBytes, "cloud", "bucket"), + NetMetrics: cidrLookup.MakeNetMetrics(cloudWriteBytes, cloudReadBytes, "cloud", "bucket", "client"), } } diff --git a/pkg/cloud/options.go b/pkg/cloud/options.go index f9234c02d294..e4bf68c3dfb0 100644 --- a/pkg/cloud/options.go +++ b/pkg/cloud/options.go @@ -23,3 +23,10 @@ func WithAzureStorageTestingKnobs(knobs base.ModuleTestingKnobs) ExternalStorage opts.AzureStorageTestingKnobs = knobs } } + +// WithClientName sets the "client" label on network metrics. +func WithClientName(name string) ExternalStorageOption { + return func(opts *ExternalStorageOptions) { + opts.ClientName = name + } +} diff --git a/pkg/cloud/options_test.go b/pkg/cloud/options_test.go new file mode 100644 index 000000000000..b54b9348196f --- /dev/null +++ b/pkg/cloud/options_test.go @@ -0,0 +1,23 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package cloud + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestClientName(t *testing.T) { + options := func(options ...ExternalStorageOption) ExternalStorageOptions { + context := EarlyBootExternalStorageContext{ + Options: options, + } + return context.ExternalStorageOptions() + } + require.Empty(t, options().ClientName) + require.Equal(t, options(WithClientName("this-is-the-name")).ClientName, "this-is-the-name") +}