Skip to content

Commit

Permalink
feat(sdk-metrics): add aggregation cardinality limit (#5128)
Browse files Browse the repository at this point in the history
Co-authored-by: Marc Pichler <[email protected]>
  • Loading branch information
povilasv and pichlermarc authored Nov 20, 2024
1 parent 91b9abd commit a834861
Show file tree
Hide file tree
Showing 15 changed files with 349 additions and 13 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ For semantic convention package changes, see the [semconv CHANGELOG](packages/se

### :boom: Breaking Change

* feat(sdk-metrics): Add support for aggregation cardinality limit with a default limit of 2000. This limit can be customized via views [#5182](https://github.com/open-telemetry/opentelemetry-js/pull/5128)

### :rocket: (Enhancement)

### :bug: (Bug Fix)
Expand Down
21 changes: 21 additions & 0 deletions packages/sdk-metrics/src/export/CardinalitySelector.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Copyright The OpenTelemetry Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { InstrumentType } from '../InstrumentDescriptor';
/**
* Cardinality Limit selector based on metric instrument types.
*/
export type CardinalitySelector = (instrumentType: InstrumentType) => number;
18 changes: 18 additions & 0 deletions packages/sdk-metrics/src/export/MetricReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import {
DEFAULT_AGGREGATION_SELECTOR,
DEFAULT_AGGREGATION_TEMPORALITY_SELECTOR,
} from './AggregationSelector';
import { CardinalitySelector } from './CardinalitySelector';

export interface MetricReaderOptions {
/**
Expand All @@ -45,6 +46,11 @@ export interface MetricReaderOptions {
* not configured, cumulative is used for all instruments.
*/
aggregationTemporalitySelector?: AggregationTemporalitySelector;
/**
* Cardinality selector based on metric instrument types. If not configured,
* a default value is used.
*/
cardinalitySelector?: CardinalitySelector;
/**
* **Note, this option is experimental**. Additional MetricProducers to use as a source of
* aggregated metric data in addition to the SDK's metric data. The resource returned by
Expand All @@ -68,6 +74,7 @@ export abstract class MetricReader {
private _sdkMetricProducer?: MetricProducer;
private readonly _aggregationTemporalitySelector: AggregationTemporalitySelector;
private readonly _aggregationSelector: AggregationSelector;
private readonly _cardinalitySelector?: CardinalitySelector;

constructor(options?: MetricReaderOptions) {
this._aggregationSelector =
Expand All @@ -76,6 +83,7 @@ export abstract class MetricReader {
options?.aggregationTemporalitySelector ??
DEFAULT_AGGREGATION_TEMPORALITY_SELECTOR;
this._metricProducers = options?.metricProducers ?? [];
this._cardinalitySelector = options?.cardinalitySelector;
}

/**
Expand Down Expand Up @@ -116,6 +124,16 @@ export abstract class MetricReader {
return this._aggregationTemporalitySelector(instrumentType);
}

/**
* Select the cardinality limit for the given {@link InstrumentType} for this
* reader.
*/
selectCardinalityLimit(instrumentType: InstrumentType): number {
return this._cardinalitySelector
? this._cardinalitySelector(instrumentType)
: 2000; // default value if no selector is provided
}

/**
* Handle once the SDK has initialized this {@link MetricReader}
* Overriding this method is optional.
Expand Down
8 changes: 6 additions & 2 deletions packages/sdk-metrics/src/state/AsyncMetricStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,14 @@ export class AsyncMetricStorage<T extends Maybe<Accumulation>>
_instrumentDescriptor: InstrumentDescriptor,
aggregator: Aggregator<T>,
private _attributesProcessor: AttributesProcessor,
collectorHandles: MetricCollectorHandle[]
collectorHandles: MetricCollectorHandle[],
private _aggregationCardinalityLimit?: number
) {
super(_instrumentDescriptor);
this._deltaMetricStorage = new DeltaMetricProcessor(aggregator);
this._deltaMetricStorage = new DeltaMetricProcessor(
aggregator,
this._aggregationCardinalityLimit
);
this._temporalMetricStorage = new TemporalMetricProcessor(
aggregator,
collectorHandles
Expand Down
47 changes: 41 additions & 6 deletions packages/sdk-metrics/src/state/DeltaMetricProcessor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/

import { Context, HrTime, Attributes } from '@opentelemetry/api';
import { Maybe } from '../utils';
import { Maybe, hashAttributes } from '../utils';
import { Accumulation, Aggregator } from '../aggregator/types';
import { AttributeHashMap } from './HashMap';

Expand All @@ -31,19 +31,40 @@ export class DeltaMetricProcessor<T extends Maybe<Accumulation>> {
// TODO: find a reasonable mean to clean the memo;
// https://github.com/open-telemetry/opentelemetry-specification/pull/2208
private _cumulativeMemoStorage = new AttributeHashMap<T>();
private _cardinalityLimit: number;
private _overflowAttributes = { 'otel.metric.overflow': true };
private _overflowHashCode: string;

constructor(private _aggregator: Aggregator<T>) {}
constructor(
private _aggregator: Aggregator<T>,
aggregationCardinalityLimit?: number
) {
this._cardinalityLimit = (aggregationCardinalityLimit ?? 2000) - 1;
this._overflowHashCode = hashAttributes(this._overflowAttributes);
}

record(
value: number,
attributes: Attributes,
_context: Context,
collectionTime: HrTime
) {
const accumulation = this._activeCollectionStorage.getOrDefault(
attributes,
() => this._aggregator.createAccumulation(collectionTime)
);
let accumulation = this._activeCollectionStorage.get(attributes);

if (!accumulation) {
if (this._activeCollectionStorage.size >= this._cardinalityLimit) {
const overflowAccumulation = this._activeCollectionStorage.getOrDefault(
this._overflowAttributes,
() => this._aggregator.createAccumulation(collectionTime)
);
overflowAccumulation?.record(value);
return;
}

accumulation = this._aggregator.createAccumulation(collectionTime);
this._activeCollectionStorage.set(attributes, accumulation);
}

accumulation?.record(value);
}

Expand All @@ -66,6 +87,19 @@ export class DeltaMetricProcessor<T extends Maybe<Accumulation>> {
hashCode
)!;
delta = this._aggregator.diff(previous, accumulation);
} else {
// If the cardinality limit is reached, we need to change the attributes
if (this._cumulativeMemoStorage.size >= this._cardinalityLimit) {
attributes = this._overflowAttributes;
hashCode = this._overflowHashCode;
if (this._cumulativeMemoStorage.has(attributes, hashCode)) {
const previous = this._cumulativeMemoStorage.get(
attributes,
hashCode
)!;
delta = this._aggregator.diff(previous, accumulation);
}
}
}
// Merge with uncollected active delta.
if (this._activeCollectionStorage.has(attributes, hashCode)) {
Expand All @@ -92,6 +126,7 @@ export class DeltaMetricProcessor<T extends Maybe<Accumulation>> {
collect() {
const unreportedDelta = this._activeCollectionStorage;
this._activeCollectionStorage = new AttributeHashMap();

return unreportedDelta;
}
}
13 changes: 10 additions & 3 deletions packages/sdk-metrics/src/state/MeterSharedState.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ export class MeterSharedState {
viewDescriptor,
aggregator,
view.attributesProcessor,
this._meterProviderSharedState.metricCollectors
this._meterProviderSharedState.metricCollectors,
view.aggregationCardinalityLimit
) as R;
this.metricStorageRegistry.register(viewStorage);
return viewStorage;
Expand All @@ -162,12 +163,17 @@ export class MeterSharedState {
if (compatibleStorage != null) {
return compatibleStorage;
}

const aggregator = aggregation.createAggregator(descriptor);
const cardinalityLimit = collector.selectCardinalityLimit(
descriptor.type
);
const storage = new MetricStorageType(
descriptor,
aggregator,
AttributesProcessor.Noop(),
[collector]
[collector],
cardinalityLimit
) as R;
this.metricStorageRegistry.registerForCollector(collector, storage);
return storage;
Expand All @@ -190,6 +196,7 @@ interface MetricStorageConstructor {
instrumentDescriptor: InstrumentDescriptor,
aggregator: Aggregator<Maybe<Accumulation>>,
attributesProcessor: AttributesProcessor,
collectors: MetricCollectorHandle[]
collectors: MetricCollectorHandle[],
aggregationCardinalityLimit?: number
): MetricStorage;
}
9 changes: 9 additions & 0 deletions packages/sdk-metrics/src/state/MetricCollector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,14 @@ export class MetricCollector implements MetricProducer {
selectAggregation(instrumentType: InstrumentType) {
return this._metricReader.selectAggregation(instrumentType);
}

/**
* Select the cardinality limit for the given {@link InstrumentType} for this
* collector.
*/
selectCardinalityLimit(instrumentType: InstrumentType): number {
return this._metricReader.selectCardinalityLimit?.(instrumentType) ?? 2000;
}
}

/**
Expand All @@ -98,4 +106,5 @@ export class MetricCollector implements MetricProducer {
*/
export interface MetricCollectorHandle {
selectAggregationTemporality: AggregationTemporalitySelector;
selectCardinalityLimit(instrumentType: InstrumentType): number;
}
8 changes: 6 additions & 2 deletions packages/sdk-metrics/src/state/SyncMetricStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,14 @@ export class SyncMetricStorage<T extends Maybe<Accumulation>>
instrumentDescriptor: InstrumentDescriptor,
aggregator: Aggregator<T>,
private _attributesProcessor: AttributesProcessor,
collectorHandles: MetricCollectorHandle[]
collectorHandles: MetricCollectorHandle[],
private _aggregationCardinalityLimit?: number
) {
super(instrumentDescriptor);
this._deltaMetricStorage = new DeltaMetricProcessor(aggregator);
this._deltaMetricStorage = new DeltaMetricProcessor(
aggregator,
this._aggregationCardinalityLimit
);
this._temporalMetricStorage = new TemporalMetricProcessor(
aggregator,
collectorHandles
Expand Down
15 changes: 15 additions & 0 deletions packages/sdk-metrics/src/view/View.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ export type ViewOptions = {
* aggregation: new LastValueAggregation()
*/
aggregation?: Aggregation;
/**
* Alters the metric stream:
* Sets a limit on the number of unique attribute combinations (cardinality) that can be aggregated.
* If not provided, the default limit will be used.
*
* @example <caption>sets the cardinality limit to 1000</caption>
* aggregationCardinalityLimit: 1000
*/
aggregationCardinalityLimit?: number;
/**
* Instrument selection criteria:
* The original type of the Instrument(s).
Expand Down Expand Up @@ -138,6 +147,7 @@ export class View {
readonly attributesProcessor: AttributesProcessor;
readonly instrumentSelector: InstrumentSelector;
readonly meterSelector: MeterSelector;
readonly aggregationCardinalityLimit?: number;

/**
* Create a new {@link View} instance.
Expand All @@ -161,6 +171,10 @@ export class View {
* Alters the metric stream:
* If provided, the attributes that are not in the list will be ignored.
* If not provided, all attribute keys will be used by default.
* @param viewOptions.aggregationCardinalityLimit
* Alters the metric stream:
* Sets a limit on the number of unique attribute combinations (cardinality) that can be aggregated.
* If not provided, the default limit of 2000 will be used.
* @param viewOptions.aggregation
* Alters the metric stream:
* Alters the {@link Aggregation} of the metric stream.
Expand Down Expand Up @@ -232,5 +246,6 @@ export class View {
version: viewOptions.meterVersion,
schemaUrl: viewOptions.meterSchemaUrl,
});
this.aggregationCardinalityLimit = viewOptions.aggregationCardinalityLimit;
}
}
Loading

0 comments on commit a834861

Please sign in to comment.