Skip to content

Commit

Permalink
feat: extract decision span from full span (#1338)
Browse files Browse the repository at this point in the history
Extract only necessary information that's needed for trace decision from
a full span so that we can forward only the key fields to peers later
part of #1318

- add a method on `Span` to extract only necessary information into a
new types.Event
- add a method to differentiate a full span from a decision span
- store `IsRoot` information on `Span`
- simplify naming for span annotation types
  • Loading branch information
VinozzZ authored and TylerHelmuth committed Oct 16, 2024
1 parent 4e49ccb commit 19986e4
Show file tree
Hide file tree
Showing 4 changed files with 142 additions and 88 deletions.
24 changes: 4 additions & 20 deletions collect/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -547,7 +547,7 @@ func (i *InMemCollector) processSpan(sp *types.Span) {
}

// if this is a root span, say so and send the trace
if i.isRootSpan(sp) {
if sp.IsRoot {
markTraceForSending = true
trace.RootSpan = sp
}
Expand Down Expand Up @@ -673,8 +673,7 @@ func (i *InMemCollector) dealWithSentTrace(ctx context.Context, tr cache.TraceSe
i.Logger.Debug().WithField("trace_id", sp.TraceID).Logf("Sending span because of previous decision to send trace")
mergeTraceAndSpanSampleRates(sp, tr.Rate(), isDryRun)
// if this span is a late root span, possibly update it with our current span count
isRootSpan := i.isRootSpan(sp)
if isRootSpan {
if sp.IsRoot {
if i.Config.GetAddCountsToRoot() {
sp.Data["meta.span_event_count"] = int64(tr.SpanEventCount())
sp.Data["meta.span_link_count"] = int64(tr.SpanLinkCount())
Expand All @@ -684,7 +683,7 @@ func (i *InMemCollector) dealWithSentTrace(ctx context.Context, tr cache.TraceSe
sp.Data["meta.span_count"] = int64(tr.DescendantCount())
}
}
otelutil.AddSpanField(span, "is_root_span", isRootSpan)
otelutil.AddSpanField(span, "is_root_span", sp.IsRoot)
i.Metrics.Increment(TraceSendLateSpan)
i.addAdditionalAttributes(sp)
i.Transmission.EnqueueSpan(sp)
Expand Down Expand Up @@ -720,21 +719,6 @@ func mergeTraceAndSpanSampleRates(sp *types.Span, traceSampleRate uint, dryRunMo
}
}

func (i *InMemCollector) isRootSpan(sp *types.Span) bool {
// log event should never be considered a root span, check for that first
if signalType := sp.Data["meta.signal_type"]; signalType == "log" {
return false
}
// check if the event has a parent id using the configured parent id field names
for _, parentIdFieldName := range i.Config.GetParentIdFieldNames() {
parentId := sp.Data[parentIdFieldName]
if _, ok := parentId.(string); ok && parentId != "" {
return false
}
}
return true
}

func (i *InMemCollector) send(trace *types.Trace, sendReason string) {
if trace.Sent {
// someone else already sent this so we shouldn't also send it.
Expand Down Expand Up @@ -830,7 +814,7 @@ func (i *InMemCollector) send(trace *types.Trace, sendReason string) {

// update the root span (if we have one, which we might not if the trace timed out)
// with the final total as of our send time
if i.isRootSpan(sp) {
if sp.IsRoot {
if i.Config.GetAddCountsToRoot() {
sp.Data["meta.span_event_count"] = int64(trace.SpanEventCount())
sp.Data["meta.span_link_count"] = int64(trace.SpanLinkCount())
Expand Down
86 changes: 20 additions & 66 deletions collect/collect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ func TestAddRootSpan(t *testing.T) {
Dataset: "aoeu",
APIKey: legacyAPIKey,
},
IsRoot: true,
}
coll.AddSpan(span)
time.Sleep(conf.GetTracesConfig().GetSendTickerValue() * 2)
Expand All @@ -141,6 +142,7 @@ func TestAddRootSpan(t *testing.T) {
Dataset: "aoeu",
APIKey: legacyAPIKey,
},
IsRoot: true,
}
coll.AddSpanFromPeer(span)
time.Sleep(conf.GetTracesConfig().GetSendTickerValue() * 2)
Expand Down Expand Up @@ -233,6 +235,7 @@ func TestOriginalSampleRateIsNotedInMetaField(t *testing.T) {
SampleRate: 0, // no upstream sampling
Data: make(map[string]interface{}),
},
IsRoot: true,
})
require.NoError(t, err, "must be able to add the span")

Expand Down Expand Up @@ -293,6 +296,7 @@ func TestTransmittedSpansShouldHaveASampleRateOfAtLeastOne(t *testing.T) {
SampleRate: 0, // This should get lifted to 1
Data: make(map[string]interface{}),
},
IsRoot: true,
}

coll.AddSpan(span)
Expand Down Expand Up @@ -378,6 +382,7 @@ func TestAddSpan(t *testing.T) {
Data: map[string]interface{}{},
APIKey: legacyAPIKey,
},
IsRoot: true,
}
coll.AddSpan(rootSpan)
time.Sleep(conf.GetTracesConfig().GetSendTickerValue() * 5)
Expand Down Expand Up @@ -452,6 +457,7 @@ func TestDryRunMode(t *testing.T) {
Data: map[string]interface{}{},
APIKey: legacyAPIKey,
},
IsRoot: true,
}
coll.AddSpan(span)
time.Sleep(conf.GetTracesConfig().GetSendTickerValue() * 2)
Expand Down Expand Up @@ -488,6 +494,7 @@ func TestDryRunMode(t *testing.T) {
Data: map[string]interface{}{},
APIKey: legacyAPIKey,
},
IsRoot: true,
}
coll.AddSpanFromPeer(span)
time.Sleep(conf.GetTracesConfig().GetSendTickerValue() * 2)
Expand All @@ -511,6 +518,7 @@ func TestDryRunMode(t *testing.T) {
Data: map[string]interface{}{},
APIKey: legacyAPIKey,
},
IsRoot: true,
}
coll.AddSpan(span)
time.Sleep(conf.GetTracesConfig().GetSendTickerValue() * 2)
Expand Down Expand Up @@ -642,6 +650,7 @@ func TestSampleConfigReload(t *testing.T) {
Dataset: dataset,
APIKey: legacyAPIKey,
},
IsRoot: true,
}

coll.AddSpan(span)
Expand Down Expand Up @@ -669,6 +678,7 @@ func TestSampleConfigReload(t *testing.T) {
Dataset: dataset,
APIKey: legacyAPIKey,
},
IsRoot: true,
}

coll.AddSpan(span)
Expand Down Expand Up @@ -930,6 +940,7 @@ func TestAddCountsToRoot(t *testing.T) {
Data: map[string]interface{}{},
APIKey: legacyAPIKey,
},
IsRoot: true,
}
coll.AddSpan(rootSpan)
time.Sleep(conf.GetTracesConfig().GetSendTickerValue() * 2)
Expand Down Expand Up @@ -1021,6 +1032,7 @@ func TestLateRootGetsCounts(t *testing.T) {
Data: map[string]interface{}{},
APIKey: legacyAPIKey,
},
IsRoot: true,
}
coll.AddSpan(rootSpan)
time.Sleep(conf.GetTracesConfig().GetSendTickerValue() * 2)
Expand Down Expand Up @@ -1101,6 +1113,7 @@ func TestAddSpanCount(t *testing.T) {
Data: map[string]interface{}{},
APIKey: legacyAPIKey,
},
IsRoot: true,
}
coll.AddSpan(rootSpan)
time.Sleep(conf.GetTracesConfig().GetSendTickerValue() * 2)
Expand Down Expand Up @@ -1176,6 +1189,7 @@ func TestLateRootGetsSpanCount(t *testing.T) {
Data: map[string]interface{}{},
APIKey: legacyAPIKey,
},
IsRoot: true,
}
coll.AddSpan(rootSpan)
time.Sleep(conf.GetTracesConfig().GetSendTickerValue() * 2)
Expand Down Expand Up @@ -1247,6 +1261,7 @@ func TestLateSpanNotDecorated(t *testing.T) {
Data: map[string]interface{}{},
APIKey: legacyAPIKey,
},
IsRoot: true,
}
coll.AddSpan(rootSpan)

Expand Down Expand Up @@ -1317,6 +1332,7 @@ func TestAddAdditionalAttributes(t *testing.T) {
Data: map[string]interface{}{},
APIKey: legacyAPIKey,
},
IsRoot: true,
}
coll.AddSpan(rootSpan)
time.Sleep(conf.GetTracesConfig().GetSendTickerValue() * 5)
Expand Down Expand Up @@ -1393,6 +1409,7 @@ func TestStressReliefSampleRate(t *testing.T) {
APIKey: legacyAPIKey,
SampleRate: 10,
},
IsRoot: true,
}

processed2, kept2 := coll.ProcessSpanImmediately(rootSpan)
Expand Down Expand Up @@ -1473,6 +1490,7 @@ func TestStressReliefDecorateHostname(t *testing.T) {
Data: map[string]interface{}{},
APIKey: legacyAPIKey,
},
IsRoot: true,
}
coll.AddSpan(rootSpan)
time.Sleep(conf.GetTracesConfig().GetSendTickerValue() * 2)
Expand Down Expand Up @@ -1591,6 +1609,7 @@ func TestSpanWithRuleReasons(t *testing.T) {
},
APIKey: legacyAPIKey,
},
IsRoot: true,
}
if i == 0 {
rootSpan.Data["test"] = int64(1)
Expand Down Expand Up @@ -1624,72 +1643,6 @@ func TestSpanWithRuleReasons(t *testing.T) {
transmission.Mux.RUnlock()
}

func TestIsRootSpan(t *testing.T) {
tesCases := []struct {
name string
span *types.Span
expected bool
}{
{
name: "root span - no parent id",
span: &types.Span{
Event: types.Event{
Data: map[string]interface{}{},
},
},
expected: true,
},
{
name: "root span - empty parent id",
span: &types.Span{
Event: types.Event{
Data: map[string]interface{}{
"trace.parent_id": "",
},
},
},
expected: true,
},
{
name: "non-root span - parent id",
span: &types.Span{
Event: types.Event{
Data: map[string]interface{}{
"trace.parent_id": "some-id",
},
},
},
expected: false,
},
{
name: "non-root span - no parent id but has signal_type of log",
span: &types.Span{
Event: types.Event{
Data: map[string]interface{}{
"meta.signal_type": "log",
},
},
},
expected: false,
},
}

collector := &InMemCollector{
Config: &config.MockConfig{
ParentIdFieldNames: []string{"trace.parent_id", "parentId"},
GetCollectionConfigVal: config.CollectionConfig{
ShutdownDelay: config.Duration(1 * time.Millisecond),
},
},
}

for _, tc := range tesCases {
t.Run(tc.name, func(t *testing.T) {
assert.Equal(t, tc.expected, collector.isRootSpan(tc.span))
})
}
}

func TestRedistributeTraces(t *testing.T) {
conf := &config.MockConfig{
GetTracesConfigVal: config.TracesConfig{
Expand Down Expand Up @@ -1942,6 +1895,7 @@ func TestBigTracesGoEarly(t *testing.T) {
Data: map[string]interface{}{},
APIKey: legacyAPIKey,
},
IsRoot: true,
}
coll.AddSpan(rootSpan)

Expand Down
54 changes: 52 additions & 2 deletions types/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package types

import (
"context"
"slices"
"time"

huskyotlp "github.com/honeycombio/husky/otlp"
Expand Down Expand Up @@ -102,6 +103,12 @@ func (t *Trace) GetSpans() []*Span {
return t.spans
}

func (t *Trace) RemoveDecisionSpans() {
t.spans = slices.DeleteFunc(t.spans, func(sp *Span) bool {
return sp.IsDecisionSpan()
})
}

func (t *Trace) ID() string {
return t.TraceID
}
Expand Down Expand Up @@ -172,8 +179,8 @@ func (t *Trace) GetSamplerKey() (string, bool) {

env := ""
for _, sp := range t.GetSpans() {
if sp.Event.Environment != "" {
env = sp.Event.Environment
if sp.Environment != "" {
env = sp.Environment
break
}
}
Expand All @@ -187,13 +194,56 @@ type Span struct {
TraceID string
DataSize int
ArrivalTime time.Time
IsRoot bool
}

// IsDecicionSpan returns true if the span is a decision span based on
// a flag set in the span's metadata.
func (sp *Span) IsDecisionSpan() bool {
if sp.Data == nil {
return false
}
v, ok := sp.Data["meta.refinery.min_span"]
if !ok {
return false
}
isDecisionSpan, ok := v.(bool)
if !ok {
return false
}

return isDecisionSpan
}

// ExtractDecisionContext returns a new Event that contains only the data that is
// relevant to the decision-making process.
func (sp *Span) ExtractDecisionContext() *Event {
decisionCtx := sp.Event
dataSize := sp.DataSize
if dataSize == 0 {
dataSize = sp.GetDataSize()
}
decisionCtx.Data = map[string]interface{}{
"trace_id": sp.TraceID,
"meta.refinery.root": sp.IsRoot,
"meta.refinery.min_span": true,
"meta.annotation_type": sp.AnnotationType(),
"meta.refinery.span_data_size": dataSize,
}
return &decisionCtx
}

// GetDataSize computes the size of the Data element of the Span.
// Note that it's not the full size of the span, but we're mainly using this for
// relative ordering, not absolute calculations.
func (sp *Span) GetDataSize() int {
total := 0

if sp.IsDecisionSpan() {
if v, ok := sp.Data["meta.refinery.span_data_size"]; ok {
return v.(int)
}
}
// the data types we should be getting from JSON are:
// float64, int64, bool, string, []byte
for _, v := range sp.Data {
Expand Down
Loading

0 comments on commit 19986e4

Please sign in to comment.