Skip to content

Commit

Permalink
add reduce processor (#137)
Browse files Browse the repository at this point in the history
  • Loading branch information
MikeGoldsmith authored Aug 22, 2024
1 parent d74349c commit 1b4c83d
Show file tree
Hide file tree
Showing 36 changed files with 2,132 additions and 0 deletions.
65 changes: 65 additions & 0 deletions reduceprocessor/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
# Reduce Processor

<!-- status autogenerated section -->
| Status | |
| ------------- |-----------|
| Stability | [development]: logs |
| Distributions | [] |
| Issues | [![Open issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aopen%20label%3Aprocessor%2Freduce%20&label=open&color=orange&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aopen+is%3Aissue+label%3Aprocessor%2Freduce) [![Closed issues](https://img.shields.io/github/issues-search/open-telemetry/opentelemetry-collector-contrib?query=is%3Aissue%20is%3Aclosed%20label%3Aprocessor%2Freduce%20&label=closed&color=blue&logo=opentelemetry)](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues?q=is%3Aclosed+is%3Aissue+label%3Aprocessor%2Freduce) |
| [Code Owners](https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/CONTRIBUTING.md#becoming-a-code-owner) | [@MikeGoldsmith](https://www.github.com/MikeGoldsmith), [@codeboten](https://www.github.com/codeboten) |

[development]: https://github.com/open-telemetry/opentelemetry-collector#development
<!-- end autogenerated section -->

This processor is used to combine related log events together based on a set of shared attributes.

## Configuration Options

| Name | Description | Required | Default Value |
| - | - | - | - |
| group_by | The list of attribute names used to group and aggregate log records. At least one attribute name is required. | Yes | `none` |
| reduce_timeout | The amount of time to wait after the last log record was received before an aggreated log record should be considered complete. | No | `10s` |
| max_reduce_timeout | The maximum amount of time an aggregated log record can be stored in the cache before it should be considered complete. | No | `60s` |
| max_reduce_count | The maximum number of log records that can be aggregated together. If the maximum is reached, the current aggregated log record is considered complete and a new aggregated log record is created. | No | `100` |
| cache_size | The maximum number of entries that can be stored in the cache. | No | `10000` |
| merge_strategies | A map of attribute names to a custom merge strategies. If an attribute is not found in the map, the default merge strategy of `First` is used. | No | `none` |
| reduce_count_attribute | The the attribute name used to store the count of log records on the aggregated log record'. If empty, the count is not stored. | No | `none` |
| first_seen_attribute | The attribute name used to store the timestamp of the first log record in the aggregated log record. If empty, the last seen time is not stored. | No | `none` |
| last_seen_attribute | The attribute name used to store the timestamp of the last log record in the aggregated log record. If empty, the last seen time is not stored. | No | `none` |

### Merge Strategies

| Name | Description |
| - | - |
| First | Keeps the first non-empty value. |
| Last | Keeps the last non-empty value. |
| Array | Combines multiple values into an array. |
| Concat | Concatenates each non-empty value together with a comma `,`. |

### Example configuration

The following is the minimal configuration of the processor:

```yaml
reduce:
group_by:
- "host.name"
```
The following is a complete configuration of the processor:
```yaml
reduce:
group_by:
- "host.name"
reduce_timeout: 10s
max_reduce_timeout: 60s
max_reduce_count: 100
cache_size: 10000
merge_strategies:
"some-attribute": first
"another-attribute": last
reduce_count_attribute: reduce_count
first_seen_attribute: first_timestamp
last_seen_attribute: last_timestamp
```
183 changes: 183 additions & 0 deletions reduceprocessor/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
package reduceprocessor

import (
"strings"
"time"

"github.com/cespare/xxhash/v2"
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil"
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
)

type cacheKey [16]byte

func newCacheKey(groupBy []string, resource pcommon.Resource, scope pcommon.InstrumentationScope, lr plog.LogRecord) (cacheKey, bool) {
// create a map to hold group by attributes
groupByAttrs := pcommon.NewMap()

// loop over group by attributes and try to find them in log record, scope and resource
for _, attrName := range groupBy {
// try to find each attribute in log record, scope and resource
// done in reverse order so that log record attributes take precedence
// over scope attributes and scope attributes take precedence over resource attributes
attr, ok := lr.Attributes().Get(attrName)
if ok {
attr.CopyTo(groupByAttrs.PutEmpty(attrName))
continue
}
if attr, ok = scope.Attributes().Get(attrName); ok {
attr.CopyTo(groupByAttrs.PutEmpty(attrName))
continue
}
if attr, ok = resource.Attributes().Get(attrName); ok {
attr.CopyTo(groupByAttrs.PutEmpty(attrName))
}
}

var key cacheKey
if groupByAttrs.Len() == 0 {
// no group by attributes found so we can't aggregate
return key, false
}

// generate hashes for group by attrs, body and severity
groupByAttrsHash := pdatautil.MapHash(groupByAttrs)
bodyHash := pdatautil.ValueHash(lr.Body())
severityHash := pdatautil.ValueHash(pcommon.NewValueStr(lr.SeverityText()))

// generate hash for log record
hash := xxhash.New()
hash.Write(groupByAttrsHash[:])
hash.Write(bodyHash[:])
hash.Write(severityHash[:])

copy(key[:], hash.Sum(nil))
return key, true
}

type cacheEntry struct {
createdAt time.Time
resource pcommon.Resource
scope pcommon.InstrumentationScope
log plog.LogRecord
count int
firstSeen pcommon.Timestamp
lastSeen pcommon.Timestamp
}

func newCacheEntry(resource pcommon.Resource, scope pcommon.InstrumentationScope, log plog.LogRecord) *cacheEntry {
return &cacheEntry{
createdAt: time.Now().UTC(),
resource: resource,
scope: scope,
log: log,
firstSeen: log.Timestamp(),
lastSeen: log.Timestamp(),
}
}

func (entry *cacheEntry) merge(mergeStrategies map[string]MergeStrategy, resource pcommon.Resource, scope pcommon.InstrumentationScope, logRecord plog.LogRecord) {
entry.lastSeen = entry.log.Timestamp()
mergeAttributes(mergeStrategies, entry.resource.Attributes(), resource.Attributes())
mergeAttributes(mergeStrategies, entry.scope.Attributes(), scope.Attributes())
mergeAttributes(mergeStrategies, entry.log.Attributes(), logRecord.Attributes())
}

func (entry *cacheEntry) IncrementCount(mergeCount int) {
entry.count += mergeCount
}

func mergeAttributes(mergeStrategies map[string]MergeStrategy, existingAttrs pcommon.Map, additionalAttrs pcommon.Map) {
// loop over new attributes and apply merge strategy
additionalAttrs.Range(func(attrName string, attrValue pcommon.Value) bool {
// get merge strategy using attribute name
mergeStrategy, ok := mergeStrategies[attrName]
if !ok {
// use default merge strategy if no strategy is defined for the attribute
mergeStrategy = First
}

switch mergeStrategy {
case First:
// add attribute if it doesn't exist
_, ok := existingAttrs.Get(attrName)
if !ok {
attrValue.CopyTo(existingAttrs.PutEmpty(attrName))
}
case Last:
// overwrite existing attribute if present
attrValue.CopyTo(existingAttrs.PutEmpty(attrName))
case Array:
// append value to existing value if it exists
existingValue, ok := existingAttrs.Get(attrName)
if ok {
// if existing value is a slice, append to it
// otherwise, create a new slice and append both values
// NOTE: not sure how this will deal with different data types :/
var slice pcommon.Slice
if existingValue.Type() == pcommon.ValueTypeSlice {
slice = existingValue.Slice()
} else {
slice = pcommon.NewSlice()
existingValue.CopyTo(slice.AppendEmpty())
}
attrValue.CopyTo(slice.AppendEmpty())

// update existing attribute with new slice
slice.CopyTo(existingAttrs.PutEmptySlice(attrName))
} else {
// add new attribute as it doesn't exist yet
attrValue.CopyTo(existingAttrs.PutEmpty(attrName))
}
case Concat:
// concatenate value with existing value if it exists
existingValue, ok := existingAttrs.Get(attrName)
if ok {
// concatenate existing value with new value using configured delimiter
strValue := strings.Join([]string{existingValue.AsString(), attrValue.AsString()}, ",")
existingAttrs.PutStr(attrName, strValue)
} else {
// add new attribute as it doesn't exist yet
attrValue.CopyTo(existingAttrs.PutEmpty(attrName))
}
}
return true
})
}

func (entry *cacheEntry) isInvalid(maxCount int, maxAge time.Duration) bool {
if entry.count >= maxCount {
return true
}
if maxAge > 0 && time.Since(entry.createdAt) >= maxAge {
return true
}
return false
}

func (entry *cacheEntry) toLogs(config *Config) plog.Logs {
logs := plog.NewLogs()

rl := logs.ResourceLogs().AppendEmpty()
entry.resource.CopyTo(rl.Resource())

sl := rl.ScopeLogs().AppendEmpty()
entry.scope.CopyTo(sl.Scope())

lr := sl.LogRecords().AppendEmpty()
entry.log.CopyTo(lr)

// add merge count, first seen and last seen attributes if configured
if config.ReduceCountAttribute != "" {
lr.Attributes().PutInt(config.ReduceCountAttribute, int64(entry.count))
}
if config.FirstSeenAttribute != "" {
lr.Attributes().PutStr(config.FirstSeenAttribute, entry.firstSeen.String())
}
if config.LastSeenAttribute != "" {
lr.Attributes().PutStr(config.LastSeenAttribute, entry.lastSeen.String())
}

return logs
}
53 changes: 53 additions & 0 deletions reduceprocessor/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package reduceprocessor

import (
"errors"
"time"

"go.opentelemetry.io/collector/component"
)

type MergeStrategy int

const (
First MergeStrategy = iota
Last
Array
Concat
)

type Config struct {
// GroupBy is the list of attribute names used to group and aggregate log records. At least one attribute name is required.
GroupBy []string `mapstructure:"group_by"`

// MaxReduceTimeout is the maximum amount of time an aggregated log record can be stored in the cache before it should be considered complete. Default is 60s.
MaxReduceTimeout time.Duration `mapstructure:"max_reduce_timeout"`

// MaxReduceCount is the maximum number of log records that can be aggregated together. If the maximum is reached, the current aggregated log record is considered complete and a new aggregated log record is created. Default is 100.
MaxReduceCount int `mapstructure:"max_reduce_count"`

// CacheSize is the maximum number of entries that can be stored in the cache. Default is 10000.
CacheSize int `mapstructure:"cache_size"`

// MergeStrategies is a map of attribute names to a custom merge strategies. If an attribute is not found in the map, the default merge strategy of `First`` is used.
MergeStrategies map[string]MergeStrategy `mapstructure:"merge_strategies"`

// ReduceCountAttribute is the attribute name used to store the count of log records on the aggregated log record'. If empty, the count is not stored. Default is "".
ReduceCountAttribute string `mapstructure:"reduce_count_attribute"`

// FirstSeenAttribute is the attribute name used to store the timestamp of the first log record in the aggregated log record. If empty, the last seen time is not stored. Default is "".
FirstSeenAttribute string `mapstructure:"first_seen_attribute"`

// LastSeenAttribute is attribute name used to store the timestamp of the last log record in the aggregated log record. If empty, the last seen time is not stored. Default is "".
LastSeenAttribute string `mapstructure:"last_seen_attribute"`
}

var _ component.Config = (*Config)(nil)

// Validate checks if the processor configuration is valid
func (cfg *Config) Validate() error {
if len(cfg.GroupBy) == 0 {
return errors.New("group_by must contain at least one attribute name")
}
return nil
}
16 changes: 16 additions & 0 deletions reduceprocessor/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package reduceprocessor

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestEmptyGroupByReturnsError(t *testing.T) {
cfg := &Config{
GroupBy: []string{},
}
err := cfg.Validate()
require.Error(t, err)
require.Equal(t, "group_by must contain at least one attribute name", err.Error())
}
4 changes: 4 additions & 0 deletions reduceprocessor/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
//go:generate mdatagen metadata.yaml

// Package reduceprocessor contains a processor that deduplicates log records using a LRU cache.
package reduceprocessor // import
31 changes: 31 additions & 0 deletions reduceprocessor/documentation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
[comment]: <> (Code generated by mdatagen. DO NOT EDIT.)

# reduce

## Internal Telemetry

The following telemetry is emitted by this component.

### otelcol_reduce_processor_combined

Number of log events that were combined

| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| {records} | Histogram | Int |

### otelcol_reduce_processor_output

Number of aggreated log events output

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| {records} | Sum | Int | true |

### otelcol_reduce_processor_received

Number of log events received

| Unit | Metric Type | Value Type | Monotonic |
| ---- | ----------- | ---------- | --------- |
| {records} | Sum | Int | true |
Loading

0 comments on commit 1b4c83d

Please sign in to comment.