Skip to content

Commit

Permalink
[processor/tailsampling] Decision cache for non-sampled trace IDs (op…
Browse files Browse the repository at this point in the history
…en-telemetry#36040)

Re-opening
open-telemetry#33722
so it can be finished.
I've addressed @jpkrohling's open comments from that PR.

Description:

Adds a decision cache for non sampled IDs. This is off-by-default.

It's the same as the sampled IDs cache but is separate and has a
separate size.

**Link to tracking Issue:
open-telemetry#31583

Testing:
Unit test added. Cache type itself already tested.

Documentation:

Updated the readme with config options and descriptions.

---------

Co-authored-by: Juraci Paixão Kröhling <[email protected]>
  • Loading branch information
2 people authored and shivanthzen committed Dec 4, 2024
1 parent 2ff09d0 commit eb6ed8d
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 28 deletions.
6 changes: 6 additions & 0 deletions .chloggen/jmoe_decision-cache-non-sampled.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
change_type: enhancement
component: tailsamplingprocessor
note: Adds decision cache for non-sampled trace IDs
issues: [31583]
subtext:
change_logs: []
18 changes: 13 additions & 5 deletions processor/tailsamplingprocessor/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,18 @@ The following configuration options can also be modified:
- `decision_wait` (default = 30s): Wait time since the first span of a trace before making a sampling decision
- `num_traces` (default = 50000): Number of traces kept in memory.
- `expected_new_traces_per_sec` (default = 0): Expected number of new traces (helps in allocating data structures)
- `decision_cache` (default = `sampled_cache_size: 0`): Configures amount of trace IDs to be kept in an LRU cache,
persisting the "keep" decisions for traces that may have already been released from memory.
By default, the size is 0 and the cache is inactive.
If using, configure this as much higher than `num_traces` so decisions for trace IDs are kept
- `decision_cache`: Options for configuring caches for sampling decisions. You may want to vary the size of these caches
depending on how many "keep" vs "drop" decisions you expect from your policies. For example, you may allocate a
larger `non_sampled_cache_size` if you expect most traces to be dropped.
Additionally, if using, configure this as much higher than `num_traces` so decisions for trace IDs are kept
longer than the span data for the trace.
- `sampled_cache_size` (default = 0): Configures amount of trace IDs to be kept in an LRU cache,
persisting the "keep" decisions for traces that may have already been released from memory.
By default, the size is 0 and the cache is inactive.
- `non_sampled_cache_size` (default = 0) Configures amount of trace IDs to be kept in an LRU cache,
persisting the "drop" decisions for traces that may have already been released from memory.
By default, the size is 0 and the cache is inactive.


Each policy will result in a decision, and the processor will evaluate them to make a final decision:

Expand All @@ -70,7 +77,8 @@ processors:
num_traces: 100
expected_new_traces_per_sec: 10
decision_cache:
sampled_cache_size: 100000
sampled_cache_size: 100_000
non_sampled_cache_size: 100_000
policies:
[
{
Expand Down
7 changes: 6 additions & 1 deletion processor/tailsamplingprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,16 @@ type OTTLConditionCfg struct {
}

type DecisionCacheConfig struct {
// SampledCacheSize specifies the size of the cache that holds the sampled trace IDs
// SampledCacheSize specifies the size of the cache that holds the sampled trace IDs.
// This value will be the maximum amount of trace IDs that the cache can hold before overwriting previous IDs.
// For effective use, this value should be at least an order of magnitude higher than Config.NumTraces.
// If left as default 0, a no-op DecisionCache will be used.
SampledCacheSize int `mapstructure:"sampled_cache_size"`
// NonSampledCacheSize specifies the size of the cache that holds the non-sampled trace IDs.
// This value will be the maximum amount of trace IDs that the cache can hold before overwriting previous IDs.
// For effective use, this value should be at least an order of magnitude higher than Config.NumTraces.
// If left as default 0, a no-op DecisionCache will be used.
NonSampledCacheSize int `mapstructure:"non_sampled_cache_size"`
}

// Config holds the configuration for tail-based sampling.
Expand Down
2 changes: 1 addition & 1 deletion processor/tailsamplingprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestLoadConfig(t *testing.T) {
DecisionWait: 10 * time.Second,
NumTraces: 100,
ExpectedNewTracesPerSec: 10,
DecisionCache: DecisionCacheConfig{SampledCacheSize: 500},
DecisionCache: DecisionCacheConfig{SampledCacheSize: 1_000, NonSampledCacheSize: 10_000},
PolicyCfgs: []PolicyCfg{
{
sharedPolicyCfg: sharedPolicyCfg{
Expand Down
66 changes: 46 additions & 20 deletions processor/tailsamplingprocessor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,16 +48,17 @@ type tailSamplingSpanProcessor struct {
telemetry *metadata.TelemetryBuilder
logger *zap.Logger

nextConsumer consumer.Traces
maxNumTraces uint64
policies []*policy
idToTrace sync.Map
policyTicker timeutils.TTicker
tickerFrequency time.Duration
decisionBatcher idbatcher.Batcher
sampledIDCache cache.Cache[bool]
deleteChan chan pcommon.TraceID
numTracesOnMap *atomic.Uint64
nextConsumer consumer.Traces
maxNumTraces uint64
policies []*policy
idToTrace sync.Map
policyTicker timeutils.TTicker
tickerFrequency time.Duration
decisionBatcher idbatcher.Batcher
sampledIDCache cache.Cache[bool]
nonSampledIDCache cache.Cache[bool]
deleteChan chan pcommon.TraceID
numTracesOnMap *atomic.Uint64
}

// spanAndScope a structure for holding information about span and its instrumentation scope.
Expand Down Expand Up @@ -89,23 +90,32 @@ func newTracesProcessor(ctx context.Context, set processor.Settings, nextConsume
if err != nil {
return nil, err
}
sampledDecisions := cache.NewNopDecisionCache[bool]()
nopCache := cache.NewNopDecisionCache[bool]()
sampledDecisions := nopCache
nonSampledDecisions := nopCache
if cfg.DecisionCache.SampledCacheSize > 0 {
sampledDecisions, err = cache.NewLRUDecisionCache[bool](cfg.DecisionCache.SampledCacheSize)
if err != nil {
return nil, err
}
}
if cfg.DecisionCache.NonSampledCacheSize > 0 {
nonSampledDecisions, err = cache.NewLRUDecisionCache[bool](cfg.DecisionCache.NonSampledCacheSize)
if err != nil {
return nil, err
}
}

tsp := &tailSamplingSpanProcessor{
ctx: ctx,
telemetry: telemetry,
nextConsumer: nextConsumer,
maxNumTraces: cfg.NumTraces,
sampledIDCache: sampledDecisions,
logger: telemetrySettings.Logger,
numTracesOnMap: &atomic.Uint64{},
deleteChan: make(chan pcommon.TraceID, cfg.NumTraces),
ctx: ctx,
telemetry: telemetry,
nextConsumer: nextConsumer,
maxNumTraces: cfg.NumTraces,
sampledIDCache: sampledDecisions,
nonSampledIDCache: nonSampledDecisions,
logger: telemetrySettings.Logger,
numTracesOnMap: &atomic.Uint64{},
deleteChan: make(chan pcommon.TraceID, cfg.NumTraces),
}
tsp.policyTicker = &timeutils.PolicyTicker{OnTickFunc: tsp.samplingPolicyOnTick}

Expand Down Expand Up @@ -188,6 +198,13 @@ func withSampledDecisionCache(c cache.Cache[bool]) Option {
}
}

// withSampledDecisionCache sets the cache which the processor uses to store recently sampled trace IDs.
func withNonSampledDecisionCache(c cache.Cache[bool]) Option {
return func(tsp *tailSamplingSpanProcessor) {
tsp.nonSampledIDCache = c
}
}

func getPolicyEvaluator(settings component.TelemetrySettings, cfg *PolicyCfg) (sampling.PolicyEvaluator, error) {
switch cfg.Type {
case Composite:
Expand Down Expand Up @@ -371,7 +388,15 @@ func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.Resourc
traceTd := ptrace.NewTraces()
appendToTraces(traceTd, resourceSpans, spans)
tsp.releaseSampledTrace(tsp.ctx, id, traceTd)
tsp.telemetry.ProcessorTailSamplingEarlyReleasesFromCacheDecision.Add(tsp.ctx, int64(len(spans)))
metric.WithAttributeSet(attribute.NewSet())
tsp.telemetry.ProcessorTailSamplingEarlyReleasesFromCacheDecision.
Add(tsp.ctx, int64(len(spans)), attrSampledTrue)
continue
}
// If the trace ID is in the non-sampled cache, short circuit the decision
if _, ok := tsp.nonSampledIDCache.Get(id); ok {
tsp.telemetry.ProcessorTailSamplingEarlyReleasesFromCacheDecision.
Add(tsp.ctx, int64(len(spans)), attrSampledFalse)
continue
}

Expand Down Expand Up @@ -429,6 +454,7 @@ func (tsp *tailSamplingSpanProcessor) processTraces(resourceSpans ptrace.Resourc
appendToTraces(traceTd, resourceSpans, spans)
tsp.releaseSampledTrace(tsp.ctx, id, traceTd)
case sampling.NotSampled:
tsp.nonSampledIDCache.Put(id, true)
tsp.telemetry.ProcessorTailSamplingSamplingLateSpanAge.Record(tsp.ctx, int64(time.Since(actualData.DecisionTime)/time.Second))
default:
tsp.logger.Warn("Encountered unexpected sampling decision",
Expand Down
74 changes: 74 additions & 0 deletions processor/tailsamplingprocessor/processor_decisions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,3 +400,77 @@ func TestLateArrivingSpanUsesDecisionCache(t *testing.T) {
require.EqualValues(t, 2, nextConsumer.SpanCount(), "original final decision not honored")
require.NoError(t, tel.Shutdown(context.Background()))
}

func TestLateSpanUsesNonSampledDecisionCache(t *testing.T) {
cfg := Config{
DecisionWait: defaultTestDecisionWait * 10,
NumTraces: defaultNumTraces,
}
nextConsumer := new(consumertest.TracesSink)
s := setupTestTelemetry()
ct := s.NewSettings()
idb := newSyncIDBatcher()

mpe := &mockPolicyEvaluator{}
policies := []*policy{
{name: "mock-policy-1", evaluator: mpe, attribute: metric.WithAttributes(attribute.String("policy", "mock-policy-1"))},
}

// Use this instead of the default no-op cache
c, err := cache.NewLRUDecisionCache[bool](200)
require.NoError(t, err)
p, err := newTracesProcessor(context.Background(), ct, nextConsumer, cfg, withDecisionBatcher(idb), withPolicies(policies), withNonSampledDecisionCache(c))
require.NoError(t, err)

require.NoError(t, p.Start(context.Background(), componenttest.NewNopHost()))
defer func() {
require.NoError(t, p.Shutdown(context.Background()))
}()

// We are going to create 2 spans belonging to the same trace
traceID := uInt64ToTraceID(1)

// The first span will be NOT sampled, this will later be set to sampled, but the sampling decision will be cached
mpe.NextDecision = sampling.NotSampled

// A function that return a ptrace.Traces containing a single span for the single trace we are using.
spanIndexToTraces := func(spanIndex uint64) ptrace.Traces {
traces := ptrace.NewTraces()
span := traces.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty()
span.SetTraceID(traceID)
span.SetSpanID(uInt64ToSpanID(spanIndex))
return traces
}

// Generate and deliver first span
require.NoError(t, p.ConsumeTraces(context.Background(), spanIndexToTraces(1)))

tsp := p.(*tailSamplingSpanProcessor)

// The first tick won't do anything
tsp.policyTicker.OnTick()
require.EqualValues(t, 0, mpe.EvaluationCount)

// This will cause policy evaluations on the first span
tsp.policyTicker.OnTick()

// Policy should have been evaluated once
require.EqualValues(t, 1, mpe.EvaluationCount)

// The final decision SHOULD be NOT Sampled.
require.EqualValues(t, 0, nextConsumer.SpanCount())

// Drop the trace to force cache to make decision
tsp.dropTrace(traceID, time.Now())
_, ok := tsp.idToTrace.Load(traceID)
require.False(t, ok)

// Set next decision to sampled, ensuring the next decision is determined by the decision cache, not the policy
mpe.NextDecision = sampling.Sampled

// Generate and deliver final span for the trace which SHOULD get the same sampling decision as the first span.
// The policies should NOT be evaluated again.
require.NoError(t, p.ConsumeTraces(context.Background(), spanIndexToTraces(2)))
require.EqualValues(t, 1, mpe.EvaluationCount)
require.EqualValues(t, 0, nextConsumer.SpanCount(), "original final decision not honored")
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ tail_sampling:
num_traces: 100
expected_new_traces_per_sec: 10
decision_cache:
sampled_cache_size: 500
sampled_cache_size: 1000
non_sampled_cache_size: 10000
policies:
[
{
Expand Down

0 comments on commit eb6ed8d

Please sign in to comment.