Skip to content

Commit

Permalink
chore(cmd/forwarder-agent): do not forward timers (#367)
Browse files Browse the repository at this point in the history
  • Loading branch information
ctlong authored Aug 21, 2023
1 parent 46503c9 commit b92b732
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 192 deletions.
13 changes: 1 addition & 12 deletions src/cmd/forwarder-agent/app/forwarder_agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,7 @@ var _ = Describe("App", func() {
},
Entry("drops logs", &loggregator_v2.Envelope{Message: &loggregator_v2.Envelope_Log{}}),
Entry("drops events", &loggregator_v2.Envelope{Message: &loggregator_v2.Envelope_Event{}}),
Entry("drops timers", &loggregator_v2.Envelope{Message: &loggregator_v2.Envelope_Timer{}}),
)

It("forwards counters", func() {
Expand All @@ -366,17 +367,5 @@ var _ = Describe("App", func() {
metric := req.ResourceMetrics[0].ScopeMetrics[0].Metrics[0]
Expect(metric.GetName()).To(Equal(name))
})

It("forwards timers", func() {
name := "test-timer-name"
ingressClient.EmitTimer(name, time.Time{}, time.Time{})

var req *colmetricspb.ExportMetricsServiceRequest
Eventually(otelServer.requests).Should(Receive(&req))

metric := req.ResourceMetrics[0].ScopeMetrics[0].Metrics[0]
Expect(metric.GetName()).To(Equal(name))
})

})
})
40 changes: 0 additions & 40 deletions src/cmd/forwarder-agent/app/otelcolclient/otelcolclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
metricspb "go.opentelemetry.io/proto/otlp/metrics/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/protobuf/proto"
)

type Client struct {
Expand Down Expand Up @@ -54,8 +53,6 @@ func (c *Client) Write(e *loggregator_v2.Envelope) error {
err = c.writeCounter(e)
case *loggregator_v2.Envelope_Gauge:
err = c.writeGauge(e)
case *loggregator_v2.Envelope_Timer:
err = c.writeTimer(e)
}
// Need to log the error right now because the Forwarder Agent drops
// returned errors. If that changes we can remove this conditional.
Expand Down Expand Up @@ -151,43 +148,6 @@ func (c *Client) writeGauge(e *loggregator_v2.Envelope) error {
return errorOnRejection(resp)
}

// writeTimer translates a loggregator v2 Timer to OTLP and forwards it.
func (c *Client) writeTimer(e *loggregator_v2.Envelope) error {
atts := attributes(e)
resp, err := c.msc.Export(c.ctx, &colmetricspb.ExportMetricsServiceRequest{
ResourceMetrics: []*metricspb.ResourceMetrics{
{
ScopeMetrics: []*metricspb.ScopeMetrics{
{
Metrics: []*metricspb.Metric{
{
Name: e.GetTimer().GetName(),
Data: &metricspb.Metric_Histogram{
Histogram: &metricspb.Histogram{
AggregationTemporality: metricspb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE,
DataPoints: []*metricspb.HistogramDataPoint{
{
TimeUnixNano: uint64(e.GetTimestamp()),
Attributes: atts,
Count: 1,
Sum: proto.Float64(float64(e.GetTimer().GetStop() - e.GetTimer().GetStart())),
},
},
},
},
},
},
},
},
},
},
})
if err != nil {
return err
}
return errorOnRejection(resp)
}

func errorOnRejection(r *colmetricspb.ExportMetricsServiceResponse) error {
if ps := r.GetPartialSuccess(); ps != nil && ps.GetRejectedDataPoints() > 0 {
return errors.New(ps.GetErrorMessage())
Expand Down
148 changes: 8 additions & 140 deletions src/cmd/forwarder-agent/app/otelcolclient/otelcolclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
commonpb "go.opentelemetry.io/proto/otlp/common/v1"
metricspb "go.opentelemetry.io/proto/otlp/metrics/v1"
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/testing/protocmp"
)

Expand Down Expand Up @@ -393,9 +392,9 @@ var _ = Describe("Client", func() {
})
})

Context("when given a log", func() {
Context("when given a timer", func() {
BeforeEach(func() {
envelope = &loggregator_v2.Envelope{Message: &loggregator_v2.Envelope_Log{}}
envelope = &loggregator_v2.Envelope{Message: &loggregator_v2.Envelope_Timer{}}
})

It("returns nil", func() {
Expand All @@ -408,149 +407,18 @@ var _ = Describe("Client", func() {
})
})

Context("when given a timer", func() {
Context("when given a log", func() {
BeforeEach(func() {
envelope = &loggregator_v2.Envelope{
Timestamp: 1690405373300570810,
SourceId: "fake-source-id",
InstanceId: "fake-instance-id",
Tags: map[string]string{
"app_name": "dora",
"peer_type": "Server",
},
Message: &loggregator_v2.Envelope_Timer{
Timer: &loggregator_v2.Timer{
Name: "http",
Start: 1690405373296932993,
Stop: 1690405373300561198,
},
},
}
envelope = &loggregator_v2.Envelope{Message: &loggregator_v2.Envelope_Log{}}
})

It("returns nil", func() {
Expect(returnedErr).NotTo(HaveOccurred())
})

It("succeeds", func() {
var msr *colmetricspb.ExportMetricsServiceRequest
Expect(spyMSC.requests).To(Receive(&msr))

expectedReq := &colmetricspb.ExportMetricsServiceRequest{
ResourceMetrics: []*metricspb.ResourceMetrics{
{
ScopeMetrics: []*metricspb.ScopeMetrics{
{
Metrics: []*metricspb.Metric{
{
Name: "http",
Data: &metricspb.Metric_Histogram{
Histogram: &metricspb.Histogram{
AggregationTemporality: metricspb.AggregationTemporality_AGGREGATION_TEMPORALITY_CUMULATIVE,
DataPoints: []*metricspb.HistogramDataPoint{
{
TimeUnixNano: 1690405373300570810,
Attributes: []*commonpb.KeyValue{
{
Key: "app_name",
Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "dora"}},
},
{
Key: "instance_id",
Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "fake-instance-id"}},
},
{
Key: "peer_type",
Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "Server"}},
},
{
Key: "source_id",
Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "fake-source-id"}},
},
},
Count: 1,
Sum: proto.Float64(3628205),
},
},
},
},
},
},
},
},
},
},
}

s1 := protocmp.SortRepeated(func(x *commonpb.KeyValue, y *commonpb.KeyValue) bool {
return x.Key < y.Key
})
s2 := protocmp.SortRepeated(func(x *metricspb.Metric, y *metricspb.Metric) bool {
return x.Name < y.Name
})
Expect(cmp.Diff(msr, expectedReq, protocmp.Transform(), s1, s2)).To(BeEmpty())
})

Context("when Metric Service Client returns an error", func() {
BeforeEach(func() {
spyMSC.responseErr = errors.New("test-error")
})

It("returns it", func() {
Expect(returnedErr).To(MatchError("test-error"))
})

It("logs it", func() {
Eventually(buf).Should(gbytes.Say("Write error: test-error"))
})
})

Context("when Metric Service Client indicates data points were rejected", func() {
BeforeEach(func() {
spyMSC.response = &colmetricspb.ExportMetricsServiceResponse{
PartialSuccess: &colmetricspb.ExportMetricsPartialSuccess{
RejectedDataPoints: 1,
ErrorMessage: "test-error-message",
},
}
})

It("returns it", func() {
Expect(returnedErr).To(MatchError("test-error-message"))
})

It("logs it", func() {
Eventually(buf).Should(gbytes.Say("Write error: test-error-message"))
})
})

Context("when the instance id or source id are provided as tags", func() {
BeforeEach(func() {
envelope.Tags = map[string]string{}
envelope.Tags["source_id"] = "some-other-source-id"
envelope.Tags["instance_id"] = "some-other-instance-id"
})

It("ignores them and uses the envelope fields instead", func() {
var msr *colmetricspb.ExportMetricsServiceRequest
Expect(spyMSC.requests).To(Receive(&msr))

expectedAtts := []*commonpb.KeyValue{
{
Key: "instance_id",
Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "fake-instance-id"}},
},
{
Key: "source_id",
Value: &commonpb.AnyValue{Value: &commonpb.AnyValue_StringValue{StringValue: "fake-source-id"}},
},
}
sortFunc := protocmp.SortRepeated(func(x *commonpb.KeyValue, y *commonpb.KeyValue) bool {
return x.Key < y.Key
})
actualAtts := msr.GetResourceMetrics()[0].GetScopeMetrics()[0].GetMetrics()[0].GetHistogram().GetDataPoints()[0].GetAttributes()
Expect(cmp.Diff(actualAtts, expectedAtts, protocmp.Transform(), sortFunc)).To(BeEmpty())
})
It("does nothing", func() {
Expect(spyMSC.requests).NotTo(Receive())
Consistently(buf.Contents()).Should(HaveLen(0))
})
})

Expand All @@ -572,7 +440,7 @@ var _ = Describe("Client", func() {

Describe("Close", func() {
It("cancels the context", func() {
envelope := &loggregator_v2.Envelope{Message: &loggregator_v2.Envelope_Timer{}}
envelope := &loggregator_v2.Envelope{Message: &loggregator_v2.Envelope_Gauge{}}
Expect(c.Write(envelope)).ToNot(HaveOccurred())

Expect(c.Close()).ToNot(HaveOccurred())
Expand Down

0 comments on commit b92b732

Please sign in to comment.