From 223d19d605b9a1c3791c29065e4242825e7bd917 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?C=C3=A9lian=20Garcia?= Date: Wed, 20 Nov 2024 11:32:11 +0100 Subject: [PATCH] [receiver/azuremonitorreceiver] feat: multi subscriptions support and automatic discovery MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: CĂ©lian Garcia --- receiver/azuremonitorreceiver/config.go | 18 +- receiver/azuremonitorreceiver/go.mod | 1 + receiver/azuremonitorreceiver/go.sum | 2 + .../internal/metadata/metrics.go | 23 +- receiver/azuremonitorreceiver/scraper.go | 217 +++++++++++++----- 5 files changed, 178 insertions(+), 83 deletions(-) diff --git a/receiver/azuremonitorreceiver/config.go b/receiver/azuremonitorreceiver/config.go index 6e835003e781..6c31107601d1 100644 --- a/receiver/azuremonitorreceiver/config.go +++ b/receiver/azuremonitorreceiver/config.go @@ -21,12 +21,12 @@ const ( var ( // Predefined error responses for configuration validation failures - errMissingTenantID = errors.New(`TenantID" is not specified in config`) - errMissingSubscriptionID = errors.New(`SubscriptionID" is not specified in config`) - errMissingClientID = errors.New(`ClientID" is not specified in config`) - errMissingClientSecret = errors.New(`ClientSecret" is not specified in config`) - errMissingFedTokenFile = errors.New(`FederatedTokenFile is not specified in config`) - errInvalidCloud = errors.New(`Cloud" is invalid`) + errMissingTenantID = errors.New(`"TenantID" is not specified in config`) + errMissingSubscriptionIDs = errors.New(`neither "SubscriptionID" nor "SubscriptionIDs" nor "DiscoverSubscription" is specified in the config`) + errMissingClientID = errors.New(`"ClientID" is not specified in config`) + errMissingClientSecret = errors.New(`"ClientSecret" is not specified in config`) + errMissingFedTokenFile = errors.New(`"FederatedTokenFile"" is not specified in config`) + errInvalidCloud = errors.New(`"Cloud" is invalid`) monitorServices = []string{ "Microsoft.EventGrid/eventSubscriptions", @@ -234,6 +234,8 @@ type Config struct { MetricsBuilderConfig metadata.MetricsBuilderConfig `mapstructure:",squash"` Cloud string `mapstructure:"cloud"` SubscriptionID string `mapstructure:"subscription_id"` + SubscriptionIDs []string `mapstructure:"subscription_ids"` + DiscoverSubscriptions bool `mapstructure:"discover_subscriptions"` Authentication string `mapstructure:"auth"` TenantID string `mapstructure:"tenant_id"` ClientID string `mapstructure:"client_id"` @@ -257,8 +259,8 @@ const ( // Validate validates the configuration by checking for missing or invalid fields func (c Config) Validate() (err error) { - if c.SubscriptionID == "" { - err = multierr.Append(err, errMissingSubscriptionID) + if c.SubscriptionID == "" && len(c.SubscriptionIDs) == 0 && !c.DiscoverSubscriptions { + err = multierr.Append(err, errMissingSubscriptionIDs) } switch c.Authentication { diff --git a/receiver/azuremonitorreceiver/go.mod b/receiver/azuremonitorreceiver/go.mod index efae88e7efd2..bf121e8be493 100644 --- a/receiver/azuremonitorreceiver/go.mod +++ b/receiver/azuremonitorreceiver/go.mod @@ -7,6 +7,7 @@ require ( github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.8.0 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor v0.11.0 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0 + github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armsubscriptions v1.3.0 github.com/google/go-cmp v0.6.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.113.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.113.0 diff --git a/receiver/azuremonitorreceiver/go.sum b/receiver/azuremonitorreceiver/go.sum index 3dda1c2c93bc..2c1c6b5b93bb 100644 --- a/receiver/azuremonitorreceiver/go.sum +++ b/receiver/azuremonitorreceiver/go.sum @@ -14,6 +14,8 @@ github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor v0.11.0 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor v0.11.0/go.mod h1:jj6P8ybImR+5topJ+eH6fgcemSFBmU6/6bFF8KkwuDI= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0 h1:Dd+RhdJn0OTtVGaeDLZpcumkIVCtA/3/Fo42+eoYvVM= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources v1.2.0/go.mod h1:5kakwfW5CjC9KK+Q4wjXAg+ShuIm2mBMua0ZFj2C8PE= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armsubscriptions v1.3.0 h1:wxQx2Bt4xzPIKvW59WQf1tJNx/ZZKPfN+EhPX3Z6CYY= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armsubscriptions v1.3.0/go.mod h1:TpiwjwnW/khS0LKs4vW5UmmT9OWcxaveS8U7+tlknzo= github.com/AzureAD/microsoft-authentication-extensions-for-go/cache v0.1.1 h1:WJTmL004Abzc5wDB5VtZG2PJk5ndYDgVacGqfirKxjM= github.com/AzureAD/microsoft-authentication-extensions-for-go/cache v0.1.1/go.mod h1:tCcJZ0uHAmvjsVYzEFivsRTN00oz5BEsRgQHu5JZ9WE= github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 h1:XHOnouVk1mxXfQidrMEnLlPk9UMeRtyBTnEFtxkV0kU= diff --git a/receiver/azuremonitorreceiver/internal/metadata/metrics.go b/receiver/azuremonitorreceiver/internal/metadata/metrics.go index 75af602be1c2..d632d2fd04a2 100644 --- a/receiver/azuremonitorreceiver/internal/metadata/metrics.go +++ b/receiver/azuremonitorreceiver/internal/metadata/metrics.go @@ -108,16 +108,7 @@ func (mb *MetricsBuilder) updateCapacity(rm pmetric.ResourceMetrics) { // ResourceMetricsOption applies changes to provided resource metrics. type ResourceMetricsOption func(ResourceAttributesSettings, pmetric.ResourceMetrics) -// WithAzureMonitorSubscriptionID sets provided value as "azuremonitor.subscription_id" attribute for current resource. -func WithAzureMonitorSubscriptionID(val string) ResourceMetricsOption { - return func(ras ResourceAttributesSettings, rm pmetric.ResourceMetrics) { - if ras.AzureMonitorSubscriptionID.Enabled { - rm.Resource().Attributes().PutStr("azuremonitor.subscription_id", val) - } - } -} - -// WithAzuremonitorTenantID sets provided value as "azuremonitor.tenant_id" attribute for current resource. +// WithAzureMonitorTenantID sets provided value as "azuremonitor.tenant_id" attribute for current resource. func WithAzureMonitorTenantID(val string) ResourceMetricsOption { return func(ras ResourceAttributesSettings, rm pmetric.ResourceMetrics) { if ras.AzureMonitorTenantID.Enabled { @@ -209,6 +200,7 @@ func (mb *MetricsBuilder) addMetric(resourceMetricID, logicalMetricID, unit stri } func (mb *MetricsBuilder) AddDataPoint( + subscriptionID, resourceID, metric, aggregation, @@ -218,7 +210,7 @@ func (mb *MetricsBuilder) AddDataPoint( val float64, ) { logicalMetricID := getLogicalMetricID(metric, aggregation) - resourceMetricID := getLogicalResourceMetricID(resourceID, logicalMetricID) + resourceMetricID := getLogicalResourceMetricID(subscriptionID, resourceID, logicalMetricID) m, exists := mb.getMetric(resourceMetricID) if !exists { @@ -232,6 +224,11 @@ func (mb *MetricsBuilder) AddDataPoint( dp.SetStartTimestamp(mb.startTime) dp.SetTimestamp(ts) dp.SetDoubleValue(val) + + if mb.resourceAttributesSettings.AzureMonitorSubscriptionID.Enabled { + dp.Attributes().PutStr("azuremonitor.subscription_id", subscriptionID) + } + dp.Attributes().PutStr("azuremonitor.resource_id", resourceID) for key, value := range attributes { dp.Attributes().PutStr(key, *value) @@ -242,8 +239,8 @@ func getLogicalMetricID(metric, aggregation string) string { return strings.ToLower(fmt.Sprintf("%s%s_%s", metricsPrefix, strings.ReplaceAll(metric, " ", "_"), aggregation)) } -func getLogicalResourceMetricID(resourceID, logicalMetricID string) string { - return fmt.Sprintf("%s/%s", strings.ToLower(resourceID), logicalMetricID) +func getLogicalResourceMetricID(subscriptionID, resourceID, logicalMetricID string) string { + return fmt.Sprintf("%s/%s/%s", strings.ToLower(subscriptionID), strings.ToLower(resourceID), logicalMetricID) } func (mb *MetricsBuilder) EmitAllMetrics(ils pmetric.ScopeMetrics) { diff --git a/receiver/azuremonitorreceiver/scraper.go b/receiver/azuremonitorreceiver/scraper.go index 99d24600faa5..083225be0754 100644 --- a/receiver/azuremonitorreceiver/scraper.go +++ b/receiver/azuremonitorreceiver/scraper.go @@ -21,6 +21,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/monitor/armmonitor" "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armresources" + "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/resources/armsubscriptions" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/pmetric" @@ -59,6 +60,15 @@ const ( tagPrefix = "tags_" ) +// azureSubscription is an extract of armsubscriptions.Subscription. +// It designates a common structure between complex structures retrieved from the AP +// and simple subscriptions ids that you can find in config. +type azureSubscription struct { + SubscriptionID string + DisplayName *string + resourcesUpdated time.Time +} + type azureResource struct { attributes map[string]*string metricsByCompositeKey map[metricsCompositeKey]*azureResourceMetrics @@ -98,13 +108,17 @@ type azureScraper struct { cred azcore.TokenCredential clientResources armClient + clientSubscriptions armSubscriptionClient clientMetricsDefinitions metricsDefinitionsClientInterface clientMetricsValues metricsValuesClient - cfg *Config - settings component.TelemetrySettings - resources map[string]*azureResource - resourcesUpdated time.Time + cfg *Config + settings component.TelemetrySettings + // resources on which we'll collect metrics. Stored by resource id and subscription id. + resources map[string]map[string]*azureResource + // subscriptions on which we'll look up resources. Stored by subscription id. + subscriptions map[string]*azureSubscription + subscriptionsUpdated time.Time mb *metadata.MetricsBuilder azDefaultCredentialsFunc func(options *azidentity.DefaultAzureCredentialOptions) (*azidentity.DefaultAzureCredential, error) azIDCredentialsFunc func(string, string, string, *azidentity.ClientSecretCredentialOptions) (*azidentity.ClientSecretCredential, error) @@ -140,17 +154,26 @@ func (s *azureScraper) getArmClientOptions() *arm.ClientOptions { return &options } -func (s *azureScraper) getArmClient() (armClient, error) { - client, err := s.armClientFunc(s.cfg.SubscriptionID, s.cred, s.armClientOptions) - return client, err +func (s *azureScraper) getArmClient(subscriptionID string) (armClient, error) { + return s.armClientFunc(subscriptionID, s.cred, s.armClientOptions) +} + +type armSubscriptionClient interface { + NewListPager(options *armsubscriptions.ClientListOptions) *runtime.Pager[armsubscriptions.ClientListResponse] + NewListLocationsPager(subscriptionID string, options *armsubscriptions.ClientListLocationsOptions) *runtime.Pager[armsubscriptions.ClientListLocationsResponse] +} + +func (s *azureScraper) getArmSubscriptionClient() armSubscriptionClient { + client, _ := armsubscriptions.NewClient(s.cred, s.armClientOptions) + return client } type metricsDefinitionsClientInterface interface { NewListPager(resourceURI string, options *armmonitor.MetricDefinitionsClientListOptions) *runtime.Pager[armmonitor.MetricDefinitionsClientListResponse] } -func (s *azureScraper) getMetricsDefinitionsClient() (metricsDefinitionsClientInterface, error) { - client, err := s.armMonitorDefinitionsClientFunc(s.cfg.SubscriptionID, s.cred, s.armClientOptions) +func (s *azureScraper) getMetricsDefinitionsClient(subscriptionID string) (metricsDefinitionsClientInterface, error) { + client, err := s.armMonitorDefinitionsClientFunc(subscriptionID, s.cred, s.armClientOptions) return client, err } @@ -160,31 +183,32 @@ type metricsValuesClient interface { ) } -func (s *azureScraper) GetMetricsValuesClient() (metricsValuesClient, error) { - client, err := s.armMonitorMetricsClientFunc(s.cfg.SubscriptionID, s.cred, s.armClientOptions) +func (s *azureScraper) GetMetricsValuesClient(subscriptionID string) (metricsValuesClient, error) { + client, err := s.armMonitorMetricsClientFunc(subscriptionID, s.cred, s.armClientOptions) return client, err } -func (s *azureScraper) start(_ context.Context, _ component.Host) (err error) { +func (s *azureScraper) start(ctx context.Context, _ component.Host) (err error) { if err = s.loadCredentials(); err != nil { return err } s.armClientOptions = s.getArmClientOptions() - s.clientResources, err = s.getArmClient() - if err != nil { - return err - } - s.clientMetricsDefinitions, err = s.getMetricsDefinitionsClient() - if err != nil { - return err - } - s.clientMetricsValues, err = s.GetMetricsValuesClient() - if err != nil { - return err - } - s.resources = map[string]*azureResource{} + s.clientSubscriptions = s.getArmSubscriptionClient() + + if !s.cfg.DiscoverSubscriptions { + s.resources[s.cfg.SubscriptionID] = make(map[string]*azureResource) + ids := []string{s.cfg.SubscriptionID} + ids = append(ids, s.cfg.SubscriptionIDs...) + for _, id := range ids { + if id != "" { + s.subscriptions[id] = &azureSubscription{ + SubscriptionID: s.cfg.SubscriptionID, + } + } + } + } return } @@ -220,37 +244,92 @@ func (s *azureScraper) loadCredentials() (err error) { } func (s *azureScraper) scrape(ctx context.Context) (pmetric.Metrics, error) { - s.getResources(ctx) - resourcesIDsWithDefinitions := make(chan string) - - go func() { - defer close(resourcesIDsWithDefinitions) - for resourceID := range s.resources { - s.getResourceMetricsDefinitions(ctx, resourceID) - resourcesIDsWithDefinitions <- resourceID - } - }() - + if !(time.Since(s.subscriptionsUpdated).Seconds() < s.cfg.CacheResources) { + s.getSubscriptions(ctx) + } var wg sync.WaitGroup - for resourceID := range resourcesIDsWithDefinitions { + for _, subscription := range s.subscriptions { wg.Add(1) - go func(resourceID string) { + go func(subscriptionID string) { defer wg.Done() - s.getResourceMetricsValues(ctx, resourceID) - }(resourceID) + + s.getResources(ctx, subscriptionID) + + resourcesIDsWithDefinitions := make(chan string) + go func(subscriptionID string) { + defer close(resourcesIDsWithDefinitions) + for resourceID := range s.resources[subscriptionID] { + s.getResourceMetricsDefinitions(ctx, subscriptionID, resourceID) + resourcesIDsWithDefinitions <- resourceID + } + }(subscriptionID) + + var wg2 sync.WaitGroup + for resourceID := range resourcesIDsWithDefinitions { + wg2.Add(1) + go func(subscriptionID, resourceID string) { + defer wg2.Done() + s.getResourceMetricsValues(ctx, subscriptionID, resourceID) + }(subscriptionID, resourceID) + } + + wg2.Wait() + }(subscription.SubscriptionID) + } + wg.Wait() return s.mb.Emit( - metadata.WithAzureMonitorSubscriptionID(s.cfg.SubscriptionID), metadata.WithAzureMonitorTenantID(s.cfg.TenantID), ), nil } -func (s *azureScraper) getResources(ctx context.Context) { - if time.Since(s.resourcesUpdated).Seconds() < s.cfg.CacheResources { +func (s *azureScraper) getSubscriptions(ctx context.Context) { + opts := &armsubscriptions.ClientListOptions{} + pager := s.clientSubscriptions.NewListPager(opts) + + existingSubscriptions := map[string]void{} + for id := range s.subscriptions { + existingSubscriptions[id] = void{} + } + + for pager.More() { + nextResult, err := pager.NextPage(ctx) + if err != nil { + s.settings.Logger.Error("failed to get Azure Subscriptions", zap.Error(err)) + return + } + + for _, subscription := range nextResult.Value { + s.resources[*subscription.SubscriptionID] = make(map[string]*azureResource) + s.subscriptions[*subscription.SubscriptionID] = &azureSubscription{ + SubscriptionID: *subscription.SubscriptionID, + DisplayName: subscription.DisplayName, + } + delete(existingSubscriptions, *subscription.SubscriptionID) + } + } + if len(existingSubscriptions) > 0 { + for idToDelete := range existingSubscriptions { + delete(s.subscriptions, idToDelete) + } + } + + s.subscriptionsUpdated = time.Now() + return +} + +func (s *azureScraper) getResources(ctx context.Context, subscriptionID string) { + if time.Since(s.subscriptions[subscriptionID].resourcesUpdated).Seconds() < s.cfg.CacheResources { return } + clientResources, clientErr := s.getArmClient(subscriptionID) + if clientErr != nil { + s.settings.Logger.Error("failed initializing client to get Azure Resources", zap.Error(clientErr)) + return + } + existingResources := map[string]void{} for id := range s.resources { existingResources[id] = void{} @@ -261,7 +340,7 @@ func (s *azureScraper) getResources(ctx context.Context) { Filter: &filter, } - pager := s.clientResources.NewListPager(opts) + pager := clientResources.NewListPager(opts) for pager.More() { nextResult, err := pager.NextPage(ctx) @@ -280,7 +359,7 @@ func (s *azureScraper) getResources(ctx context.Context) { if resource.Location != nil { attributes[attributeLocation] = resource.Location } - s.resources[*resource.ID] = &azureResource{ + s.resources[subscriptionID][*resource.ID] = &azureResource{ attributes: attributes, tags: resource.Tags, } @@ -294,7 +373,8 @@ func (s *azureScraper) getResources(ctx context.Context) { } } - s.resourcesUpdated = time.Now() + s.subscriptions[subscriptionID].resourcesUpdated = time.Now() + return } func getResourceGroupFromID(id string) string { @@ -321,14 +401,20 @@ func (s *azureScraper) getResourcesFilter() string { return fmt.Sprintf("(resourceType eq '%s')%s", resourcesTypeFilter, resourcesGroupFilterString) } -func (s *azureScraper) getResourceMetricsDefinitions(ctx context.Context, resourceID string) { - if time.Since(s.resources[resourceID].metricsDefinitionsUpdated).Seconds() < s.cfg.CacheResourcesDefinitions { +func (s *azureScraper) getResourceMetricsDefinitions(ctx context.Context, subscriptionID, resourceID string) { + if time.Since(s.resources[subscriptionID][resourceID].metricsDefinitionsUpdated).Seconds() < s.cfg.CacheResourcesDefinitions { + return + } + + clientMetricsDefinitions, clientErr := s.getMetricsDefinitionsClient(subscriptionID) + if clientErr != nil { + s.settings.Logger.Error("failed to initialize the client to get Azure Metrics definitions", zap.Error(clientErr)) return } - s.resources[resourceID].metricsByCompositeKey = map[metricsCompositeKey]*azureResourceMetrics{} + s.resources[subscriptionID][resourceID].metricsByCompositeKey = map[metricsCompositeKey]*azureResourceMetrics{} - pager := s.clientMetricsDefinitions.NewListPager(resourceID, nil) + pager := clientMetricsDefinitions.NewListPager(resourceID, nil) for pager.More() { nextResult, err := pager.NextPage(ctx) if err != nil { @@ -351,24 +437,30 @@ func (s *azureScraper) getResourceMetricsDefinitions(ctx context.Context, resour sort.Strings(dimensionsSlice) compositeKey.dimensions = strings.Join(dimensionsSlice, ",") } - s.storeMetricsDefinition(resourceID, name, compositeKey) + s.storeMetricsDefinition(subscriptionID, resourceID, name, compositeKey) } } - s.resources[resourceID].metricsDefinitionsUpdated = time.Now() + s.resources[subscriptionID][resourceID].metricsDefinitionsUpdated = time.Now() } -func (s *azureScraper) storeMetricsDefinition(resourceID, name string, compositeKey metricsCompositeKey) { - if _, ok := s.resources[resourceID].metricsByCompositeKey[compositeKey]; ok { - s.resources[resourceID].metricsByCompositeKey[compositeKey].metrics = append( - s.resources[resourceID].metricsByCompositeKey[compositeKey].metrics, name, +func (s *azureScraper) storeMetricsDefinition(subscriptionID, resourceID, name string, compositeKey metricsCompositeKey) { + if _, ok := s.resources[subscriptionID][resourceID].metricsByCompositeKey[compositeKey]; ok { + s.resources[subscriptionID][resourceID].metricsByCompositeKey[compositeKey].metrics = append( + s.resources[subscriptionID][resourceID].metricsByCompositeKey[compositeKey].metrics, name, ) } else { - s.resources[resourceID].metricsByCompositeKey[compositeKey] = &azureResourceMetrics{metrics: []string{name}} + s.resources[subscriptionID][resourceID].metricsByCompositeKey[compositeKey] = &azureResourceMetrics{metrics: []string{name}} } } -func (s *azureScraper) getResourceMetricsValues(ctx context.Context, resourceID string) { - res := *s.resources[resourceID] +func (s *azureScraper) getResourceMetricsValues(ctx context.Context, subscriptionID, resourceID string) { + res := *s.resources[subscriptionID][resourceID] + + clientMetricsValues, clientErr := s.GetMetricsValuesClient(subscriptionID) + if clientErr != nil { + s.settings.Logger.Error("failed to initialize client for Azure Metrics values", zap.Error(clientErr)) + return + } for compositeKey, metricsByGrain := range res.metricsByCompositeKey { if time.Since(metricsByGrain.metricsValuesUpdated).Seconds() < float64(timeGrains[compositeKey.timeGrain]) { @@ -394,7 +486,7 @@ func (s *azureScraper) getResourceMetricsValues(ctx context.Context, resourceID ) start = end - result, err := s.clientMetricsValues.List( + result, err := clientMetricsValues.List( ctx, resourceID, &opts, @@ -422,7 +514,7 @@ func (s *azureScraper) getResourceMetricsValues(ctx context.Context, resourceID } } for _, metricValue := range timeseriesElement.Data { - s.processTimeseriesData(resourceID, metric, metricValue, attributes) + s.processTimeseriesData(subscriptionID, resourceID, metric, metricValue, attributes) } } } @@ -466,7 +558,7 @@ func getResourceMetricsValuesRequestOptions( } func (s *azureScraper) processTimeseriesData( - resourceID string, + subscriptionID, resourceID string, metric *armmonitor.Metric, metricValue *armmonitor.MetricValue, attributes map[string]*string, @@ -489,6 +581,7 @@ func (s *azureScraper) processTimeseriesData( for _, aggregation := range aggregationsData { if aggregation.value != nil { s.mb.AddDataPoint( + subscriptionID, resourceID, *metric.Name.Value, aggregation.name,