diff --git a/receiver/pulsarreceiver/pulsar_receiver.go b/receiver/pulsarreceiver/pulsar_receiver.go index ada1fa1f6265..45ad6805e812 100644 --- a/receiver/pulsarreceiver/pulsar_receiver.go +++ b/receiver/pulsarreceiver/pulsar_receiver.go @@ -98,10 +98,12 @@ func consumerTracesLoop(ctx context.Context, c *pulsarTracesConsumer) error { message, err := c.consumer.Receive(ctx) if err != nil { if strings.Contains(err.Error(), alreadyClosedError) { + c.obsrecv.EndTracesOp(obsCtx, unmarshaler.Encoding(), 0, err) return err } if errors.Is(err, context.Canceled) { c.settings.Logger.Info("exiting consume traces loop") + c.obsrecv.EndTracesOp(obsCtx, unmarshaler.Encoding(), 0, err) return err } c.settings.Logger.Error("failed to receive traces message from Pulsar, waiting for one second before retrying", zap.Error(err)) @@ -112,6 +114,7 @@ func consumerTracesLoop(ctx context.Context, c *pulsarTracesConsumer) error { traces, err := unmarshaler.Unmarshal(message.Payload()) if err != nil { c.settings.Logger.Error("failed to unmarshaler traces message", zap.Error(err)) + c.obsrecv.EndTracesOp(obsCtx, unmarshaler.Encoding(), 0, err) _ = c.consumer.Ack(message) return err } @@ -119,7 +122,7 @@ func consumerTracesLoop(ctx context.Context, c *pulsarTracesConsumer) error { if err != nil { c.settings.Logger.Error("consume traces failed", zap.Error(err)) } - c.obsrecv.EndTracesOp(obsCtx, c.unmarshaler.Encoding(), traces.SpanCount(), err) + c.obsrecv.EndTracesOp(obsCtx, unmarshaler.Encoding(), traces.SpanCount(), err) _ = c.consumer.Ack(message) } } @@ -210,10 +213,12 @@ func consumeMetricsLoop(ctx context.Context, c *pulsarMetricsConsumer) error { message, err := c.consumer.Receive(ctx) if err != nil { if strings.Contains(err.Error(), alreadyClosedError) { + c.obsrecv.EndMetricsOp(obsCtx, unmarshaler.Encoding(), 0, err) return err } if errors.Is(err, context.Canceled) { c.settings.Logger.Info("exiting consume metrics loop") + c.obsrecv.EndMetricsOp(obsCtx, unmarshaler.Encoding(), 0, err) return err } @@ -225,6 +230,7 @@ func consumeMetricsLoop(ctx context.Context, c *pulsarMetricsConsumer) error { metrics, err := unmarshaler.Unmarshal(message.Payload()) if err != nil { c.settings.Logger.Error("failed to unmarshaler metrics message", zap.Error(err)) + c.obsrecv.EndMetricsOp(obsCtx, unmarshaler.Encoding(), 0, err) _ = c.consumer.Ack(message) return err } @@ -232,7 +238,7 @@ func consumeMetricsLoop(ctx context.Context, c *pulsarMetricsConsumer) error { if err != nil { c.settings.Logger.Error("consume traces failed", zap.Error(err)) } - c.obsrecv.EndMetricsOp(obsCtx, c.unmarshaler.Encoding(), metrics.DataPointCount(), err) + c.obsrecv.EndMetricsOp(obsCtx, unmarshaler.Encoding(), metrics.DataPointCount(), err) _ = c.consumer.Ack(message) } @@ -323,10 +329,12 @@ func consumeLogsLoop(ctx context.Context, c *pulsarLogsConsumer) error { message, err := c.consumer.Receive(ctx) if err != nil { if strings.Contains(err.Error(), alreadyClosedError) { + c.obsrecv.EndLogsOp(obsCtx, unmarshaler.Encoding(), 0, err) return err } if errors.Is(err, context.Canceled) { c.settings.Logger.Info("exiting consume traces loop canceled") + c.obsrecv.EndLogsOp(obsCtx, unmarshaler.Encoding(), 0, err) return err } c.settings.Logger.Error("failed to receive logs message from Pulsar, waiting for one second before retrying", zap.Error(err)) @@ -337,6 +345,7 @@ func consumeLogsLoop(ctx context.Context, c *pulsarLogsConsumer) error { logs, err := unmarshaler.Unmarshal(message.Payload()) if err != nil { c.settings.Logger.Error("failed to unmarshaler logs message", zap.Error(err)) + c.obsrecv.EndLogsOp(obsCtx, unmarshaler.Encoding(), 0, err) _ = c.consumer.Ack(message) return err } @@ -344,7 +353,7 @@ func consumeLogsLoop(ctx context.Context, c *pulsarLogsConsumer) error { if err != nil { c.settings.Logger.Error("consume traces failed", zap.Error(err)) } - c.obsrecv.EndLogsOp(obsCtx, c.unmarshaler.Encoding(), logs.LogRecordCount(), err) + c.obsrecv.EndLogsOp(obsCtx, unmarshaler.Encoding(), logs.LogRecordCount(), err) _ = c.consumer.Ack(message) } }