Skip to content

Commit

Permalink
feat: forward decision span through peer endpoint (#1342)
Browse files Browse the repository at this point in the history
## Which problem is this PR solving?

implements: #1318 #1326 

## Short description of the changes

- Add a new config option `ForceTraceLocality` to turn off trace
distribution feature
- forward decision spans when peer membership changes
  • Loading branch information
VinozzZ authored Oct 2, 2024
1 parent 5e4c90d commit 0eac5f0
Show file tree
Hide file tree
Showing 7 changed files with 351 additions and 63 deletions.
165 changes: 126 additions & 39 deletions app/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/honeycombio/refinery/sample"
"github.com/honeycombio/refinery/sharder"
"github.com/honeycombio/refinery/transmit"
"github.com/honeycombio/refinery/types"
)

const legacyAPIKey = "c9945edf5d245834089a1bd6cc9ad01e"
Expand Down Expand Up @@ -88,14 +89,8 @@ func (w *countingWriterSender) waitForCount(t testing.TB, target int) {
}
}

func newStartedApp(
t testing.TB,
libhoneyT transmission.Sender,
basePort int,
peers peer.Peers,
enableHostMetadata bool,
) (*App, inject.Graph) {
c := &config.MockConfig{
func defaultConfig(basePort int) *config.MockConfig {
return &config.MockConfig{
GetTracesConfigVal: config.TracesConfig{
SendTicker: config.Duration(2 * time.Millisecond),
SendDelay: config.Duration(1 * time.Millisecond),
Expand All @@ -109,8 +104,7 @@ func newStartedApp(
GetListenAddrVal: "127.0.0.1:" + strconv.Itoa(basePort),
GetPeerListenAddrVal: "127.0.0.1:" + strconv.Itoa(basePort+1),
GetHoneycombAPIVal: "http://api.honeycomb.io",
GetCollectionConfigVal: config.CollectionConfig{CacheCapacity: 10000, ShutdownDelay: config.Duration(1 * time.Second)},
AddHostMetadataToTrace: enableHostMetadata,
GetCollectionConfigVal: config.CollectionConfig{CacheCapacity: 10000, ShutdownDelay: config.Duration(1 * time.Second), EnableTraceLocality: true},
TraceIdFieldNames: []string{"trace.trace_id"},
ParentIdFieldNames: []string{"trace.parent_id"},
SampleCache: config.SampleCacheConfig{KeptSize: 10000, DroppedSize: 100000, SizeCheckInterval: config.Duration(10 * time.Second)},
Expand All @@ -119,7 +113,16 @@ func newStartedApp(
AcceptOnlyListedKeys: true,
},
}
}

func newStartedApp(
t testing.TB,
libhoneyT transmission.Sender,
peerTransmission transmission.Sender,
peers peer.Peers,
cfg *config.MockConfig,
) (*App, inject.Graph) {
c := cfg
var err error
if peers == nil {
peers = &peer.FilePeers{Cfg: c, Metrics: &metrics.NullMetrics{}}
Expand Down Expand Up @@ -158,13 +161,13 @@ func newStartedApp(
})
assert.NoError(t, err)

sdPeer, _ := statsd.New(statsd.Prefix("refinery.peer"))
peerClient, err := libhoney.NewClient(libhoney.ClientConfig{
Transmission: &transmission.Honeycomb{
MaxBatchSize: c.GetTracesConfigVal.MaxBatchSize,
if peerTransmission == nil {
sdPeer, _ := statsd.New(statsd.Prefix("refinery.peer"))
peerTransmission = &transmission.Honeycomb{
MaxBatchSize: cfg.GetTracesConfigVal.MaxBatchSize,
BatchTimeout: libhoney.DefaultBatchTimeout,
MaxConcurrentBatches: libhoney.DefaultMaxConcurrentBatches,
PendingWorkCapacity: uint(c.GetPeerBufferSize()),
PendingWorkCapacity: uint(cfg.GetPeerBufferSize()),
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
Dial: (&net.Dialer{
Expand All @@ -175,7 +178,10 @@ func newStartedApp(
DisableGzipCompression: true,
EnableMsgpackEncoding: true,
Metrics: sdPeer,
},
}
}
peerClient, err := libhoney.NewClient(libhoney.ClientConfig{
Transmission: peerTransmission,
})
assert.NoError(t, err)

Expand Down Expand Up @@ -210,7 +216,7 @@ func newStartedApp(
assert.NoError(t, err)

// Racy: wait just a moment for ListenAndServe to start up.
time.Sleep(10 * time.Millisecond)
time.Sleep(15 * time.Millisecond)
return &a, g
}

Expand All @@ -227,7 +233,8 @@ func TestAppIntegration(t *testing.T) {
port := 10500

sender := &transmission.MockSender{}
app, graph := newStartedApp(t, sender, port, nil, false)
cfg := defaultConfig(port)
app, graph := newStartedApp(t, sender, nil, nil, cfg)

// Send a root span, it should be sent in short order.
req := httptest.NewRequest(
Expand Down Expand Up @@ -264,7 +271,8 @@ func TestAppIntegrationWithNonLegacyKey(t *testing.T) {
port := 10600

sender := &transmission.MockSender{}
a, graph := newStartedApp(t, sender, port, nil, false)
cfg := defaultConfig(port)
a, graph := newStartedApp(t, sender, nil, nil, cfg)
a.IncomingRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) { return "test", nil })
a.PeerRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) { return "test", nil })

Expand Down Expand Up @@ -304,7 +312,8 @@ func TestAppIntegrationWithUnauthorizedKey(t *testing.T) {
port := 10700

sender := &transmission.MockSender{}
a, graph := newStartedApp(t, sender, port, nil, false)
cfg := defaultConfig(port)
a, graph := newStartedApp(t, sender, nil, nil, cfg)
a.IncomingRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) { return "test", nil })
a.PeerRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) { return "test", nil })

Expand Down Expand Up @@ -345,7 +354,9 @@ func TestPeerRouting(t *testing.T) {
Peers: peerList,
ID: peerList[i],
}
apps[i], graph = newStartedApp(t, senders[i], basePort, peers, false)
cfg := defaultConfig(basePort)

apps[i], graph = newStartedApp(t, senders[i], nil, peers, cfg)
defer startstop.Stop(graph.Objects(), nil)
}

Expand Down Expand Up @@ -402,7 +413,6 @@ func TestPeerRouting(t *testing.T) {
},
}
assert.Equal(t, expectedEvent, senders[0].Events()[0])

// Repeat, but deliver to host 1 on the peer channel, it should be
// passed to host 0 since that's who the trace belongs to.
req, err = http.NewRequest(
Expand All @@ -416,18 +426,26 @@ func TestPeerRouting(t *testing.T) {

req.Body = io.NopCloser(strings.NewReader(blob))
post(t, req)
assert.Eventually(t, func() bool {
return len(senders[0].Events()) == 1
require.Eventually(t, func() bool {
return len(senders[0].Events()) == 2
}, 2*time.Second, 2*time.Millisecond)
assert.Equal(t, expectedEvent, senders[0].Events()[0])
expectedEvent.Metadata = map[string]any{
"api_host": "http://api.honeycomb.io",
"dataset": "dataset",
"environment": "",
"enqueued_at": senders[0].Events()[1].Metadata.(map[string]any)["enqueued_at"],
}
assert.Equal(t, expectedEvent, senders[0].Events()[1])
}

func TestHostMetadataSpanAdditions(t *testing.T) {
t.Parallel()
port := 14000

sender := &transmission.MockSender{}
app, graph := newStartedApp(t, sender, port, nil, true)
cfg := defaultConfig(port)
cfg.AddHostMetadataToTrace = true
app, graph := newStartedApp(t, sender, nil, nil, cfg)

// Send a root span, it should be sent in short order.
req := httptest.NewRequest(
Expand Down Expand Up @@ -481,11 +499,12 @@ func TestEventsEndpoint(t *testing.T) {
ID: peerList[i],
}

apps[i], graph = newStartedApp(t, senders[i], basePort, peers, false)
cfg := defaultConfig(basePort)
apps[i], graph = newStartedApp(t, senders[i], nil, peers, cfg)
defer startstop.Stop(graph.Objects(), nil)
}

// Deliver to host 1, it should be passed to host 0 and emitted there.
// Deliver to host 1, it should be passed to host 0
zEnc, _ := zstd.NewWriter(nil)
blob := zEnc.EncodeAll([]byte(`{"foo":"bar","trace.trace_id":"1"}`), nil)
req, err := http.NewRequest(
Expand All @@ -504,7 +523,6 @@ func TestEventsEndpoint(t *testing.T) {
assert.Eventually(t, func() bool {
return len(senders[0].Events()) == 1
}, 2*time.Second, 2*time.Millisecond)

assert.Equal(
t,
&transmission.Event{
Expand All @@ -527,7 +545,6 @@ func TestEventsEndpoint(t *testing.T) {
},
senders[0].Events()[0],
)

// Repeat, but deliver to host 1 on the peer channel, it should be
// passed to host 0 since that's the host this trace belongs to.

Expand All @@ -553,7 +570,6 @@ func TestEventsEndpoint(t *testing.T) {
assert.Eventually(t, func() bool {
return len(senders[0].Events()) == 1
}, 2*time.Second, 2*time.Millisecond)

assert.Equal(
t,
&transmission.Event{
Expand All @@ -577,14 +593,16 @@ func TestEventsEndpoint(t *testing.T) {
senders[0].Events()[0],
)
}

func TestEventsEndpointWithNonLegacyKey(t *testing.T) {
t.Parallel()

peerList := []string{
"http://localhost:15001",
"http://localhost:15003",
}
// this traceID was chosen because it hashes to the appropriate shard for this
// test. You can't change it or the number of peers and still expect the test to pass.
traceID := "4"

var apps [2]*App
var senders [2]*transmission.MockSender
Expand All @@ -596,16 +614,15 @@ func TestEventsEndpointWithNonLegacyKey(t *testing.T) {
ID: peerList[i],
}

app, graph := newStartedApp(t, senders[i], basePort, peers, false)
cfg := defaultConfig(basePort)

app, graph := newStartedApp(t, senders[i], nil, peers, cfg)
app.IncomingRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) { return "test", nil })
app.PeerRouter.SetEnvironmentCache(time.Second, func(s string) (string, error) { return "test", nil })
apps[i] = app
defer startstop.Stop(graph.Objects(), nil)
}

// this traceID was chosen because it hashes to the appropriate shard for this
// test. You can't change it or the number of peers and still expect the test to pass.
traceID := "4"
traceData := []byte(fmt.Sprintf(`{"foo":"bar","trace.trace_id":"%s"}`, traceID))
// Deliver to host 1, it should be passed to host 0 and emitted there.
zEnc, _ := zstd.NewWriter(nil)
Expand Down Expand Up @@ -649,7 +666,6 @@ func TestEventsEndpointWithNonLegacyKey(t *testing.T) {
},
senders[0].Events()[0],
)

// Repeat, but deliver to host 1 on the peer channel, it should be
// passed to host 0.

Expand Down Expand Up @@ -700,6 +716,75 @@ func TestEventsEndpointWithNonLegacyKey(t *testing.T) {
)
}

func TestPeerRouting_TraceLocalityDisabled(t *testing.T) {
// Parallel integration tests need different ports!
t.Parallel()

peerList := []string{"http://localhost:17001", "http://localhost:17003"}

var apps [2]*App
var senders [2]*transmission.MockSender
var peerSenders [2]*transmission.MockSender
for i := range apps {
var graph inject.Graph
basePort := 17000 + (i * 2)
senders[i] = &transmission.MockSender{}
peerSenders[i] = &transmission.MockSender{}
peers := &peer.MockPeers{
Peers: peerList,
ID: peerList[i],
}
cfg := defaultConfig(basePort)
collectionCfg := cfg.GetCollectionConfigVal
collectionCfg.EnableTraceLocality = false
cfg.GetCollectionConfigVal = collectionCfg

apps[i], graph = newStartedApp(t, senders[i], peerSenders[i], peers, cfg)
defer startstop.Stop(graph.Objects(), nil)
}

// Deliver to host 1, it should be passed to host 0 and emitted there.
req, err := http.NewRequest(
"POST",
"http://localhost:17002/1/batch/dataset",
nil,
)
assert.NoError(t, err)
req.Header.Set("X-Honeycomb-Team", legacyAPIKey)
req.Header.Set("Content-Type", "application/json")

// this span index was chosen because it hashes to the appropriate shard for this
// test. You can't change it and expect the test to pass.
blob := `[` + string(spans[10]) + `]`
req.Body = io.NopCloser(strings.NewReader(blob))
post(t, req)
require.Eventually(t, func() bool {
return len(peerSenders[1].Events()) == 1
}, 2*time.Second, 2*time.Millisecond)

expectedEvent := &transmission.Event{
APIKey: legacyAPIKey,
Dataset: "dataset",
SampleRate: 2,
APIHost: "http://localhost:17001",
Timestamp: now,
Data: map[string]interface{}{
"trace_id": "2",
"meta.refinery.min_span": true,
"meta.annotation_type": types.SpanAnnotationTypeUnknown,
"meta.refinery.root": false,
"meta.refinery.span_data_size": 157,
},
Metadata: map[string]any{
"api_host": "http://localhost:17001",
"dataset": "dataset",
"environment": "",
"enqueued_at": peerSenders[1].Events()[0].Metadata.(map[string]any)["enqueued_at"],
},
}
assert.Equal(t, expectedEvent, peerSenders[1].Events()[0])
}

var (
now = time.Now().UTC()
nowString = now.Format(time.RFC3339Nano)
Expand Down Expand Up @@ -760,7 +845,8 @@ func BenchmarkTraces(b *testing.B) {
W: io.Discard,
},
}
_, graph := newStartedApp(b, sender, 11000, nil, false)
cfg := defaultConfig(11000)
_, graph := newStartedApp(b, sender, nil, nil, cfg)

req, err := http.NewRequest(
"POST",
Expand Down Expand Up @@ -862,7 +948,8 @@ func BenchmarkDistributedTraces(b *testing.B) {
ID: peerList[i],
}

apps[i], graph = newStartedApp(b, sender, basePort, peers, false)
cfg := defaultConfig(basePort)
apps[i], graph = newStartedApp(b, sender, nil, peers, cfg)
defer startstop.Stop(graph.Objects(), nil)

addrs[i] = "localhost:" + strconv.Itoa(basePort)
Expand Down
Loading

0 comments on commit 0eac5f0

Please sign in to comment.