From 3e3da6500ee656417e49336a84efbe77cab4e40a Mon Sep 17 00:00:00 2001 From: Andrew Crump Date: Thu, 3 Aug 2023 21:26:41 +0000 Subject: [PATCH] otelcolclient cancels gRPC context when closed We don't expect this to actually be invoked currently, although we'd like it to be. The forwarder agent does not currently cancel contexts on shutdown as it is not setup to run shutdown code when a signal is received. Signed-off-by: Carson Long --- .../app/otelcolclient/otelcolclient.go | 23 ++++++++++++++----- .../app/otelcolclient/otelcolclient_test.go | 19 +++++++++++++-- 2 files changed, 34 insertions(+), 8 deletions(-) diff --git a/src/cmd/forwarder-agent/app/otelcolclient/otelcolclient.go b/src/cmd/forwarder-agent/app/otelcolclient/otelcolclient.go index 469935865..a042fd9da 100644 --- a/src/cmd/forwarder-agent/app/otelcolclient/otelcolclient.go +++ b/src/cmd/forwarder-agent/app/otelcolclient/otelcolclient.go @@ -18,8 +18,12 @@ import ( ) type Client struct { - // The client API for the OTel Collector metrics service. + // The client API for the OTel Collector metrics service msc colmetricspb.MetricsServiceClient + // Context passed to gRPC + ctx context.Context + // Cancel func invoked on shutdown + cancel func() // The logger to use for errors l *log.Logger } @@ -32,7 +36,13 @@ func New(addr string, tlsConfig *tls.Config, l *log.Logger) (*Client, error) { return nil, err } - return &Client{msc: colmetricspb.NewMetricsServiceClient(cc), l: l}, nil + ctx, cancel := context.WithCancel(context.Background()) + return &Client{ + msc: colmetricspb.NewMetricsServiceClient(cc), + ctx: ctx, + cancel: cancel, + l: l, + }, nil } // Write translates an envelope to OTLP and forwards it to the connected OTel @@ -55,15 +65,16 @@ func (c *Client) Write(e *loggregator_v2.Envelope) error { return err } -// Close flushes any buffers and closes any connections. +// Close cancels the underlying context. func (c *Client) Close() error { + c.cancel() return nil } // writeCounter translates a loggregator v2 Counter to OTLP and forwards it. func (c *Client) writeCounter(e *loggregator_v2.Envelope) error { atts := attributes(e) - resp, err := c.msc.Export(context.TODO(), &colmetricspb.ExportMetricsServiceRequest{ + resp, err := c.msc.Export(c.ctx, &colmetricspb.ExportMetricsServiceRequest{ ResourceMetrics: []*metricspb.ResourceMetrics{ { ScopeMetrics: []*metricspb.ScopeMetrics{ @@ -123,7 +134,7 @@ func (c *Client) writeGauge(e *loggregator_v2.Envelope) error { }) } - resp, err := c.msc.Export(context.TODO(), &colmetricspb.ExportMetricsServiceRequest{ + resp, err := c.msc.Export(c.ctx, &colmetricspb.ExportMetricsServiceRequest{ ResourceMetrics: []*metricspb.ResourceMetrics{ { ScopeMetrics: []*metricspb.ScopeMetrics{ @@ -143,7 +154,7 @@ func (c *Client) writeGauge(e *loggregator_v2.Envelope) error { // writeTimer translates a loggregator v2 Timer to OTLP and forwards it. func (c *Client) writeTimer(e *loggregator_v2.Envelope) error { atts := attributes(e) - resp, err := c.msc.Export(context.TODO(), &colmetricspb.ExportMetricsServiceRequest{ + resp, err := c.msc.Export(c.ctx, &colmetricspb.ExportMetricsServiceRequest{ ResourceMetrics: []*metricspb.ResourceMetrics{ { ScopeMetrics: []*metricspb.ScopeMetrics{ diff --git a/src/cmd/forwarder-agent/app/otelcolclient/otelcolclient_test.go b/src/cmd/forwarder-agent/app/otelcolclient/otelcolclient_test.go index 9aa774753..9463c2700 100644 --- a/src/cmd/forwarder-agent/app/otelcolclient/otelcolclient_test.go +++ b/src/cmd/forwarder-agent/app/otelcolclient/otelcolclient_test.go @@ -33,9 +33,12 @@ var _ = Describe("Client", func() { response: &colmetricspb.ExportMetricsServiceResponse{}, responseErr: nil, } + ctx, cancel := context.WithCancel(context.Background()) c = Client{ - msc: spyMSC, - l: log.New(GinkgoWriter, "", 0), + msc: spyMSC, + ctx: ctx, + cancel: cancel, + l: log.New(GinkgoWriter, "", 0), } }) @@ -566,15 +569,27 @@ var _ = Describe("Client", func() { }) }) }) + + Describe("Close", func() { + It("cancels the context", func() { + envelope := &loggregator_v2.Envelope{Message: &loggregator_v2.Envelope_Timer{}} + Expect(c.Write(envelope)).ToNot(HaveOccurred()) + + Expect(c.Close()).ToNot(HaveOccurred()) + Eventually(spyMSC.ctx.Done()).Should(BeClosed()) + }) + }) }) type spyMetricsServiceClient struct { requests chan *colmetricspb.ExportMetricsServiceRequest response *colmetricspb.ExportMetricsServiceResponse responseErr error + ctx context.Context } func (c *spyMetricsServiceClient) Export(ctx context.Context, in *colmetricspb.ExportMetricsServiceRequest, opts ...grpc.CallOption) (*colmetricspb.ExportMetricsServiceResponse, error) { c.requests <- in + c.ctx = ctx return c.response, c.responseErr }