diff --git a/src/cmd/forwarder-agent/app/otelcolclient/otelcolclient.go b/src/cmd/forwarder-agent/app/otelcolclient/otelcolclient.go index 469935865..a042fd9da 100644 --- a/src/cmd/forwarder-agent/app/otelcolclient/otelcolclient.go +++ b/src/cmd/forwarder-agent/app/otelcolclient/otelcolclient.go @@ -18,8 +18,12 @@ import ( ) type Client struct { - // The client API for the OTel Collector metrics service. + // The client API for the OTel Collector metrics service msc colmetricspb.MetricsServiceClient + // Context passed to gRPC + ctx context.Context + // Cancel func invoked on shutdown + cancel func() // The logger to use for errors l *log.Logger } @@ -32,7 +36,13 @@ func New(addr string, tlsConfig *tls.Config, l *log.Logger) (*Client, error) { return nil, err } - return &Client{msc: colmetricspb.NewMetricsServiceClient(cc), l: l}, nil + ctx, cancel := context.WithCancel(context.Background()) + return &Client{ + msc: colmetricspb.NewMetricsServiceClient(cc), + ctx: ctx, + cancel: cancel, + l: l, + }, nil } // Write translates an envelope to OTLP and forwards it to the connected OTel @@ -55,15 +65,16 @@ func (c *Client) Write(e *loggregator_v2.Envelope) error { return err } -// Close flushes any buffers and closes any connections. +// Close cancels the underlying context. func (c *Client) Close() error { + c.cancel() return nil } // writeCounter translates a loggregator v2 Counter to OTLP and forwards it. func (c *Client) writeCounter(e *loggregator_v2.Envelope) error { atts := attributes(e) - resp, err := c.msc.Export(context.TODO(), &colmetricspb.ExportMetricsServiceRequest{ + resp, err := c.msc.Export(c.ctx, &colmetricspb.ExportMetricsServiceRequest{ ResourceMetrics: []*metricspb.ResourceMetrics{ { ScopeMetrics: []*metricspb.ScopeMetrics{ @@ -123,7 +134,7 @@ func (c *Client) writeGauge(e *loggregator_v2.Envelope) error { }) } - resp, err := c.msc.Export(context.TODO(), &colmetricspb.ExportMetricsServiceRequest{ + resp, err := c.msc.Export(c.ctx, &colmetricspb.ExportMetricsServiceRequest{ ResourceMetrics: []*metricspb.ResourceMetrics{ { ScopeMetrics: []*metricspb.ScopeMetrics{ @@ -143,7 +154,7 @@ func (c *Client) writeGauge(e *loggregator_v2.Envelope) error { // writeTimer translates a loggregator v2 Timer to OTLP and forwards it. func (c *Client) writeTimer(e *loggregator_v2.Envelope) error { atts := attributes(e) - resp, err := c.msc.Export(context.TODO(), &colmetricspb.ExportMetricsServiceRequest{ + resp, err := c.msc.Export(c.ctx, &colmetricspb.ExportMetricsServiceRequest{ ResourceMetrics: []*metricspb.ResourceMetrics{ { ScopeMetrics: []*metricspb.ScopeMetrics{ diff --git a/src/cmd/forwarder-agent/app/otelcolclient/otelcolclient_test.go b/src/cmd/forwarder-agent/app/otelcolclient/otelcolclient_test.go index 9aa774753..9463c2700 100644 --- a/src/cmd/forwarder-agent/app/otelcolclient/otelcolclient_test.go +++ b/src/cmd/forwarder-agent/app/otelcolclient/otelcolclient_test.go @@ -33,9 +33,12 @@ var _ = Describe("Client", func() { response: &colmetricspb.ExportMetricsServiceResponse{}, responseErr: nil, } + ctx, cancel := context.WithCancel(context.Background()) c = Client{ - msc: spyMSC, - l: log.New(GinkgoWriter, "", 0), + msc: spyMSC, + ctx: ctx, + cancel: cancel, + l: log.New(GinkgoWriter, "", 0), } }) @@ -566,15 +569,27 @@ var _ = Describe("Client", func() { }) }) }) + + Describe("Close", func() { + It("cancels the context", func() { + envelope := &loggregator_v2.Envelope{Message: &loggregator_v2.Envelope_Timer{}} + Expect(c.Write(envelope)).ToNot(HaveOccurred()) + + Expect(c.Close()).ToNot(HaveOccurred()) + Eventually(spyMSC.ctx.Done()).Should(BeClosed()) + }) + }) }) type spyMetricsServiceClient struct { requests chan *colmetricspb.ExportMetricsServiceRequest response *colmetricspb.ExportMetricsServiceResponse responseErr error + ctx context.Context } func (c *spyMetricsServiceClient) Export(ctx context.Context, in *colmetricspb.ExportMetricsServiceRequest, opts ...grpc.CallOption) (*colmetricspb.ExportMetricsServiceResponse, error) { c.requests <- in + c.ctx = ctx return c.response, c.responseErr }