From 0de19fddb73f3514c80d1538b2237d5427e31518 Mon Sep 17 00:00:00 2001 From: Vihas Makwana Date: Mon, 11 Nov 2024 11:57:45 +0530 Subject: [PATCH] simplify --- receiver/pulsarreceiver/pulsar_receiver.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/receiver/pulsarreceiver/pulsar_receiver.go b/receiver/pulsarreceiver/pulsar_receiver.go index 45ad6805e812..5d15d1e3ce80 100644 --- a/receiver/pulsarreceiver/pulsar_receiver.go +++ b/receiver/pulsarreceiver/pulsar_receiver.go @@ -96,14 +96,13 @@ func consumerTracesLoop(ctx context.Context, c *pulsarTracesConsumer) error { for { obsCtx := c.obsrecv.StartTracesOp(ctx) message, err := c.consumer.Receive(ctx) + c.obsrecv.EndTracesOp(obsCtx, unmarshaler.Encoding(), 0, err) if err != nil { if strings.Contains(err.Error(), alreadyClosedError) { - c.obsrecv.EndTracesOp(obsCtx, unmarshaler.Encoding(), 0, err) return err } if errors.Is(err, context.Canceled) { c.settings.Logger.Info("exiting consume traces loop") - c.obsrecv.EndTracesOp(obsCtx, unmarshaler.Encoding(), 0, err) return err } c.settings.Logger.Error("failed to receive traces message from Pulsar, waiting for one second before retrying", zap.Error(err)) @@ -211,14 +210,13 @@ func consumeMetricsLoop(ctx context.Context, c *pulsarMetricsConsumer) error { for { obsCtx := c.obsrecv.StartMetricsOp(ctx) message, err := c.consumer.Receive(ctx) + c.obsrecv.EndMetricsOp(obsCtx, unmarshaler.Encoding(), 0, err) if err != nil { if strings.Contains(err.Error(), alreadyClosedError) { - c.obsrecv.EndMetricsOp(obsCtx, unmarshaler.Encoding(), 0, err) return err } if errors.Is(err, context.Canceled) { c.settings.Logger.Info("exiting consume metrics loop") - c.obsrecv.EndMetricsOp(obsCtx, unmarshaler.Encoding(), 0, err) return err } @@ -328,13 +326,12 @@ func consumeLogsLoop(ctx context.Context, c *pulsarLogsConsumer) error { obsCtx := c.obsrecv.StartLogsOp(ctx) message, err := c.consumer.Receive(ctx) if err != nil { + c.obsrecv.EndLogsOp(obsCtx, unmarshaler.Encoding(), 0, err) if strings.Contains(err.Error(), alreadyClosedError) { - c.obsrecv.EndLogsOp(obsCtx, unmarshaler.Encoding(), 0, err) return err } if errors.Is(err, context.Canceled) { c.settings.Logger.Info("exiting consume traces loop canceled") - c.obsrecv.EndLogsOp(obsCtx, unmarshaler.Encoding(), 0, err) return err } c.settings.Logger.Error("failed to receive logs message from Pulsar, waiting for one second before retrying", zap.Error(err))