Skip to content

Commit

Permalink
put endobs at every error
Browse files Browse the repository at this point in the history
  • Loading branch information
VihasMakwana committed Nov 10, 2024
1 parent dd7e754 commit d179b8c
Showing 1 changed file with 12 additions and 3 deletions.
15 changes: 12 additions & 3 deletions receiver/pulsarreceiver/pulsar_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,12 @@ func consumerTracesLoop(ctx context.Context, c *pulsarTracesConsumer) error {
message, err := c.consumer.Receive(ctx)
if err != nil {
if strings.Contains(err.Error(), alreadyClosedError) {
c.obsrecv.EndTracesOp(obsCtx, unmarshaler.Encoding(), 0, err)
return err
}
if errors.Is(err, context.Canceled) {
c.settings.Logger.Info("exiting consume traces loop")
c.obsrecv.EndTracesOp(obsCtx, unmarshaler.Encoding(), 0, err)
return err
}
c.settings.Logger.Error("failed to receive traces message from Pulsar, waiting for one second before retrying", zap.Error(err))
Expand All @@ -112,14 +114,15 @@ func consumerTracesLoop(ctx context.Context, c *pulsarTracesConsumer) error {
traces, err := unmarshaler.Unmarshal(message.Payload())
if err != nil {
c.settings.Logger.Error("failed to unmarshaler traces message", zap.Error(err))
c.obsrecv.EndTracesOp(obsCtx, unmarshaler.Encoding(), 0, err)
_ = c.consumer.Ack(message)
return err
}
err = traceConsumer.ConsumeTraces(context.Background(), traces)
if err != nil {
c.settings.Logger.Error("consume traces failed", zap.Error(err))
}
c.obsrecv.EndTracesOp(obsCtx, c.unmarshaler.Encoding(), traces.SpanCount(), err)
c.obsrecv.EndTracesOp(obsCtx, unmarshaler.Encoding(), traces.SpanCount(), err)
_ = c.consumer.Ack(message)
}
}
Expand Down Expand Up @@ -210,10 +213,12 @@ func consumeMetricsLoop(ctx context.Context, c *pulsarMetricsConsumer) error {
message, err := c.consumer.Receive(ctx)
if err != nil {
if strings.Contains(err.Error(), alreadyClosedError) {
c.obsrecv.EndMetricsOp(obsCtx, unmarshaler.Encoding(), 0, err)
return err
}
if errors.Is(err, context.Canceled) {
c.settings.Logger.Info("exiting consume metrics loop")
c.obsrecv.EndMetricsOp(obsCtx, unmarshaler.Encoding(), 0, err)
return err
}

Expand All @@ -225,14 +230,15 @@ func consumeMetricsLoop(ctx context.Context, c *pulsarMetricsConsumer) error {
metrics, err := unmarshaler.Unmarshal(message.Payload())
if err != nil {
c.settings.Logger.Error("failed to unmarshaler metrics message", zap.Error(err))
c.obsrecv.EndMetricsOp(obsCtx, unmarshaler.Encoding(), 0, err)
_ = c.consumer.Ack(message)
return err
}
err = metricsConsumer.ConsumeMetrics(context.Background(), metrics)
if err != nil {
c.settings.Logger.Error("consume traces failed", zap.Error(err))
}
c.obsrecv.EndMetricsOp(obsCtx, c.unmarshaler.Encoding(), metrics.DataPointCount(), err)
c.obsrecv.EndMetricsOp(obsCtx, unmarshaler.Encoding(), metrics.DataPointCount(), err)

_ = c.consumer.Ack(message)
}
Expand Down Expand Up @@ -323,10 +329,12 @@ func consumeLogsLoop(ctx context.Context, c *pulsarLogsConsumer) error {
message, err := c.consumer.Receive(ctx)
if err != nil {
if strings.Contains(err.Error(), alreadyClosedError) {
c.obsrecv.EndLogsOp(obsCtx, unmarshaler.Encoding(), 0, err)
return err
}
if errors.Is(err, context.Canceled) {
c.settings.Logger.Info("exiting consume traces loop canceled")
c.obsrecv.EndLogsOp(obsCtx, unmarshaler.Encoding(), 0, err)
return err
}
c.settings.Logger.Error("failed to receive logs message from Pulsar, waiting for one second before retrying", zap.Error(err))
Expand All @@ -337,14 +345,15 @@ func consumeLogsLoop(ctx context.Context, c *pulsarLogsConsumer) error {
logs, err := unmarshaler.Unmarshal(message.Payload())
if err != nil {
c.settings.Logger.Error("failed to unmarshaler logs message", zap.Error(err))
c.obsrecv.EndLogsOp(obsCtx, unmarshaler.Encoding(), 0, err)
_ = c.consumer.Ack(message)
return err
}
err = logsConsumer.ConsumeLogs(context.Background(), logs)
if err != nil {
c.settings.Logger.Error("consume traces failed", zap.Error(err))
}
c.obsrecv.EndLogsOp(obsCtx, c.unmarshaler.Encoding(), logs.LogRecordCount(), err)
c.obsrecv.EndLogsOp(obsCtx, unmarshaler.Encoding(), logs.LogRecordCount(), err)
_ = c.consumer.Ack(message)
}
}
Expand Down

0 comments on commit d179b8c

Please sign in to comment.