Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Documentation improvements for the aggregate processor. #5035

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
"Then, the processor performs an action on each group, helping reduce unnecessary log volume and " +
"creating aggregated logs over time.")
public class AggregateProcessorConfig {

static int DEFAULT_GROUP_DURATION_SECONDS = 180;

@JsonPropertyDescription("An unordered list by which to group events. Events with the same values as these keys are put into the same group. " +
Expand All @@ -33,16 +32,16 @@ public class AggregateProcessorConfig {
@NotEmpty
private List<String> identificationKeys;

@JsonPropertyDescription("The amount of time that a group should exist before it is concluded automatically. Supports ISO_8601 notation strings (\"PT20.345S\", \"PT15M\", etc.) as well as simple notation for seconds (\"60s\") and milliseconds (\"1500ms\"). Default value is 180s.")
@JsonProperty("group_duration")
private Duration groupDuration = Duration.ofSeconds(DEFAULT_GROUP_DURATION_SECONDS);

@JsonPropertyDescription("The action to be performed on each group. One of the available aggregate actions must be provided.")
@JsonProperty("action")
@NotNull
@UsesDataPrepperPlugin(pluginType = AggregateAction.class)
private PluginModel aggregateAction;

@JsonPropertyDescription("The amount of time that a group should exist before it is concluded automatically. Supports ISO_8601 notation strings (\"PT20.345S\", \"PT15M\", etc.) as well as simple notation for seconds (\"60s\") and milliseconds (\"1500ms\"). Default value is 180s.")
@JsonProperty("group_duration")
private Duration groupDuration = Duration.ofSeconds(DEFAULT_GROUP_DURATION_SECONDS);

@JsonPropertyDescription("When <code>local_mode<code> is set to true, the aggregation is performed locally on each node instead of forwarding events to a specific node based on the <code>identification_keys</code> using a hash function. Default is false.")
@JsonProperty("local_mode")
@NotNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,13 @@

import java.util.List;

import com.fasterxml.jackson.annotation.JsonClassDescription;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;

@JsonPropertyOrder
@JsonClassDescription("Appends multiple events into a single event.")
public class AppendAggregateActionConfig {

@JsonProperty("keys_to_append")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public class CountAggregateAction implements AggregateAction {
public final String countKey;
public final String startTimeKey;
public final String endTimeKey;
public final String outputFormat;
public final OutputFormat outputFormat;
private long startTimeNanos;
private final String metricName;
private final IdentificationKeysHasher uniqueKeysHasher;
Expand Down Expand Up @@ -141,7 +141,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
Instant endTime = (Instant)groupState.get(endTimeKey);
groupState.remove(endTimeKey);
groupState.remove(UNIQUE_KEYS_SETKEY);
if (outputFormat.equals(OutputFormat.RAW.toString())) {
if (outputFormat == OutputFormat.RAW) {
groupState.put(startTimeKey, startTime.atZone(ZoneId.of(ZoneId.systemDefault().toString())).format(DateTimeFormatter.ofPattern(DATE_FORMAT)));
event = JacksonEvent.builder()
.withEventType(EVENT_TYPE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,42 +5,49 @@

package org.opensearch.dataprepper.plugins.processor.aggregate.actions;

import java.util.HashSet;
import java.util.List;
import java.util.Set;

import com.fasterxml.jackson.annotation.JsonClassDescription;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;

@JsonPropertyOrder
@JsonClassDescription("The <code>count</code> action counts events that belong to the same group and " +
"generates a new event with values of the <code>identification_keys</code> and the count, which indicates the number of new events.")
public class CountAggregateActionConfig {
static final String SUM_METRIC_NAME = "count";
public static final String DEFAULT_COUNT_KEY = "aggr._count";
public static final String DEFAULT_START_TIME_KEY = "aggr._start_time";
public static final String DEFAULT_END_TIME_KEY = "aggr._end_time";
public static final Set<String> validOutputFormats = new HashSet<>(Set.of(OutputFormat.OTEL_METRICS.toString(), OutputFormat.RAW.toString()));

@JsonPropertyDescription("Key used for storing the count. Default name is aggr._count.")
@JsonProperty("count_key")
String countKey = DEFAULT_COUNT_KEY;
@JsonPropertyDescription("Format of the aggregated event. Specifying <code>otel_metrics</code> outputs aggregate events in OTel metrics SUM type with count as value. " +
"Specifying <code>raw</code> outputs aggregate events as with the <code>count_key</code> field as a count value and includes the <code>start_time_key</code> and <code>end_time_key</code> keys.")
@JsonProperty("output_format")
OutputFormat outputFormat = OutputFormat.OTEL_METRICS;

@JsonPropertyDescription("Metric name to be used when otel format is used.")
@JsonPropertyDescription("Metric name to be used when the OTel metrics format is used. The default value is <code>count</code>.")
@JsonProperty("metric_name")
String metricName = SUM_METRIC_NAME;

@JsonPropertyDescription("List of unique keys to count.")
@JsonProperty("unique_keys")
List<String> uniqueKeys = null;
@JsonPropertyDescription("The key in the aggregate event that will have the count value. " +
"This is the count of events in the aggregation. Default name is <code>aggr._count</code>.")
@JsonProperty("count_key")
String countKey = DEFAULT_COUNT_KEY;

@JsonPropertyDescription("Key used for storing the start time. Default name is aggr._start_time.")
@JsonPropertyDescription("The key in the aggregate event that will have the start time of the aggregation. " +
"Default name is <code>aggr._start_time</code>.")
@JsonProperty("start_time_key")
String startTimeKey = DEFAULT_START_TIME_KEY;

@JsonPropertyDescription("Key used for storing the end time. Default name is aggr._end_time.")
@JsonPropertyDescription("The key in the aggregate event that will have the end time of the aggregation. " +
"Default name is <code>aggr._end_time</code>.")
@JsonProperty("end_time_key")
String endTimeKey = DEFAULT_END_TIME_KEY;

@JsonPropertyDescription("Format of the aggregated event. otel_metrics is the default output format which outputs in OTel metrics SUM type with count as value. Other options is - raw - which generates a JSON object with the count_key field as a count value and the start_time_key field with aggregation start time as value.")
@JsonProperty("output_format")
String outputFormat = OutputFormat.OTEL_METRICS.toString();
@JsonPropertyDescription("List of unique keys to count.")
@JsonProperty("unique_keys")
List<String> uniqueKeys = null;

public String getMetricName() {
return metricName;
Expand All @@ -62,10 +69,7 @@ public String getStartTimeKey() {
return startTimeKey;
}

public String getOutputFormat() {
if (!validOutputFormats.contains(outputFormat)) {
throw new IllegalArgumentException("Unknown output format " + outputFormat);
}
public OutputFormat getOutputFormat() {
return outputFormat;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class HistogramAggregateAction implements AggregateAction {
private final String bucketsKey;
private final String startTimeKey;
private final String endTimeKey;
private final String outputFormat;
private final OutputFormat outputFormat;
private final String sumKey;
private final String maxKey;
private final String minKey;
Expand Down Expand Up @@ -217,7 +217,7 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA
List<Exemplar> exemplarList = new ArrayList<>();
exemplarList.add(createExemplar("min", minEvent, minValue));
exemplarList.add(createExemplar("max", maxEvent, maxValue));
if (outputFormat.equals(OutputFormat.RAW.toString())) {
if (outputFormat == OutputFormat.RAW) {
groupState.put(histogramKey, key);
groupState.put(durationKey, endTimeNanos-startTimeNanos);
groupState.put(bucketsKey, Arrays.copyOfRange(this.buckets, 1, this.buckets.length-1));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,21 @@

package org.opensearch.dataprepper.plugins.processor.aggregate.actions;

import java.util.Set;
import java.util.List;
import java.util.HashSet;

import com.fasterxml.jackson.annotation.JsonClassDescription;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import jakarta.validation.constraints.NotNull;

@JsonPropertyOrder
@JsonClassDescription("The <code>histogram</code> action aggregates events belonging to the same " +
"group and generates a new event with values of the <code>identification_keys</code> " +
"and histogram of the aggregated events based on a configured <code>key</code>. " +
"The histogram contains the number of events, sum, buckets, bucket counts, and optionally " +
"min and max of the values corresponding to the <code>key</code>. The action drops all events " +
"that make up the combined event.")
public class HistogramAggregateActionConfig {
public static final String HISTOGRAM_METRIC_NAME = "histogram";
public static final String DEFAULT_GENERATED_KEY_PREFIX = "aggr._";
Expand All @@ -24,13 +32,16 @@ public class HistogramAggregateActionConfig {
public static final String START_TIME_KEY = "startTime";
public static final String END_TIME_KEY = "endTime";
public static final String DURATION_KEY = "duration";
public static final Set<String> validOutputFormats = new HashSet<>(Set.of(OutputFormat.OTEL_METRICS.toString(), OutputFormat.RAW.toString()));

@JsonPropertyDescription("Name of the field in the events the histogram generates.")
@JsonProperty("key")
@NotNull
String key;

@JsonPropertyDescription("Format of the aggregated event. otel_metrics is the default output format which outputs in OTel metrics SUM type with count as value. Other options is - raw - which generates a JSON object with the count_key field as a count value and the start_time_key field with aggregation start time as value.")
@JsonProperty("output_format")
OutputFormat outputFormat = OutputFormat.OTEL_METRICS;

@JsonPropertyDescription("The name of units for the values in the key. For example, bytes, traces etc")
@JsonProperty("units")
@NotNull
Expand All @@ -49,10 +60,6 @@ public class HistogramAggregateActionConfig {
@NotNull
List<Number> buckets;

@JsonPropertyDescription("Format of the aggregated event. otel_metrics is the default output format which outputs in OTel metrics SUM type with count as value. Other options is - raw - which generates a JSON object with the count_key field as a count value and the start_time_key field with aggregation start time as value.")
@JsonProperty("output_format")
String outputFormat = OutputFormat.OTEL_METRICS.toString();

@JsonPropertyDescription("A Boolean value indicating whether the histogram should include the min and max of the values in the aggregation.")
@JsonProperty("record_minmax")
boolean recordMinMax = false;
Expand Down Expand Up @@ -120,10 +127,7 @@ public List<Number> getBuckets() {
return buckets;
}

public String getOutputFormat() {
if (!validOutputFormats.contains(outputFormat)) {
throw new IllegalArgumentException("Unknown output format " + outputFormat);
}
public OutputFormat getOutputFormat() {
return outputFormat;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.processor.aggregate.actions;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;

import java.util.Arrays;
import java.util.Map;
Expand Down Expand Up @@ -37,4 +38,8 @@ static OutputFormat fromOptionValue(final String option) {
return ACTIONS_MAP.get(option.toLowerCase());
}

@JsonValue
public String getOptionValue() {
return name;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@

package org.opensearch.dataprepper.plugins.processor.aggregate.actions;

import com.fasterxml.jackson.annotation.JsonClassDescription;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import jakarta.validation.constraints.NotNull;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import jakarta.validation.constraints.AssertTrue;

@JsonPropertyOrder
@JsonClassDescription("The <code>percent_sampler</code> action controls the number of events aggregated based " +
"on a percentage of events. The action drops any events not included in the percentage.")
public class PercentSamplerAggregateActionConfig {
@JsonPropertyDescription("The percentage of events to be processed during a one second interval. Must be greater than 0.0 and less than 100.0")
@JsonPropertyDescription("The percentage of events to be processed during a one second interval. Must be greater than 0.0 and less than 100.0.")
@JsonProperty("percent")
@NotNull
private double percent;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.dataprepper.plugins.processor.aggregate.actions;

import com.fasterxml.jackson.annotation.JsonClassDescription;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.JacksonEvent;
Expand All @@ -22,7 +24,7 @@
* most recently handled Event.
* @since 1.3
*/
@DataPrepperPlugin(name = "put_all", pluginType = AggregateAction.class)
@DataPrepperPlugin(name = "put_all", pluginType = AggregateAction.class, pluginConfigurationType = PutAllAggregateAction.PutAllAggregateActionConfig.class)
public class PutAllAggregateAction implements AggregateAction {
static final String EVENT_TYPE = "event";

Expand All @@ -43,4 +45,10 @@ public AggregateActionOutput concludeGroup(final AggregateActionInput aggregateA

return new AggregateActionOutput(List.of(event));
}

@JsonPropertyOrder
@JsonClassDescription("The <code>put_all</code> action combines events belonging to the same group by overwriting existing keys and adding new keys, similarly to the Java `Map.putAll`. " +
"The action drops all events that make up the combined event.")
static class PutAllAggregateActionConfig {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
@DataPrepperPlugin(name = "rate_limiter", pluginType = AggregateAction.class, pluginConfigurationType = RateLimiterAggregateActionConfig.class)
public class RateLimiterAggregateAction implements AggregateAction {
private final RateLimiter rateLimiter;
private final String rateLimiterMode;
private final RateLimiterMode rateLimiterMode;

@DataPrepperPluginConstructor
public RateLimiterAggregateAction(final RateLimiterAggregateActionConfig ratelimiterAggregateActionConfig) {
Expand All @@ -33,7 +33,7 @@ public RateLimiterAggregateAction(final RateLimiterAggregateActionConfig ratelim

@Override
public AggregateActionResponse handleEvent(final Event event, final AggregateActionInput aggregateActionInput) {
if (rateLimiterMode.equals(RateLimiterMode.DROP.toString())) {
if (rateLimiterMode == RateLimiterMode.DROP) {
if (!rateLimiter.tryAcquire()) {
return AggregateActionResponse.nullEventResponse();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,32 +5,32 @@

package org.opensearch.dataprepper.plugins.processor.aggregate.actions;

import java.util.Set;
import java.util.HashSet;
import com.fasterxml.jackson.annotation.JsonClassDescription;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import jakarta.validation.constraints.NotNull;

@JsonPropertyOrder
@JsonClassDescription("The <code>rate_limiter</code> action controls the number of events aggregated per second. " +
"By default, <code>rate_limiter</code> blocks the <code>aggregate</code> processor from running if it receives more events than the configured number allowed. " +
"You can overwrite the number events that triggers the <code>rate_limited</code> by using the <code>when_exceeds</code> configuration option.")
public class RateLimiterAggregateActionConfig {
public static final Set<String> validRateLimiterModes = new HashSet<>(Set.of(RateLimiterMode.BLOCK.toString(), RateLimiterMode.DROP.toString()));

@JsonPropertyDescription("The number of events allowed per second.")
@JsonProperty("events_per_second")
@NotNull
int eventsPerSecond;

@JsonPropertyDescription("Indicates what action the rate_limiter takes when the number of events received is greater than the number of events allowed per second. Default value is block, which blocks the processor from running after the maximum number of events allowed per second is reached until the next second. Alternatively, the drop option drops the excess events received in that second. Default is block")
@JsonPropertyDescription("Indicates what action the <code>rate_limiter</code> takes when the number of events received is greater than the number of events allowed per second. " +
"Default value is block, which blocks the processor from running after the maximum number of events allowed per second is reached until the next second. Alternatively, the drop option drops the excess events received in that second. Default is block")
@JsonProperty("when_exceeds")
String whenExceedsMode = RateLimiterMode.BLOCK.toString();
RateLimiterMode whenExceedsMode = RateLimiterMode.BLOCK;

public int getEventsPerSecond() {
return eventsPerSecond;
}

public String getWhenExceeds() {
if (!validRateLimiterModes.contains(whenExceedsMode)) {
throw new IllegalArgumentException("Unknown rate limiter mode " + whenExceedsMode);
}
public RateLimiterMode getWhenExceeds() {
return whenExceedsMode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.dataprepper.plugins.processor.aggregate.actions;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;

import java.util.Arrays;
import java.util.Map;
Expand Down Expand Up @@ -37,4 +38,9 @@ static RateLimiterMode fromOptionValue(final String option) {
return ACTIONS_MAP.get(option.toLowerCase());
}

@JsonValue
public String getOptionValue() {
return name;
}

}
Loading
Loading