Skip to content

Commit

Permalink
simplify
Browse files Browse the repository at this point in the history
  • Loading branch information
VihasMakwana committed Nov 11, 2024
1 parent d179b8c commit 0de19fd
Showing 1 changed file with 3 additions and 6 deletions.
9 changes: 3 additions & 6 deletions receiver/pulsarreceiver/pulsar_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,13 @@ func consumerTracesLoop(ctx context.Context, c *pulsarTracesConsumer) error {
for {
obsCtx := c.obsrecv.StartTracesOp(ctx)
message, err := c.consumer.Receive(ctx)
c.obsrecv.EndTracesOp(obsCtx, unmarshaler.Encoding(), 0, err)
if err != nil {
if strings.Contains(err.Error(), alreadyClosedError) {
c.obsrecv.EndTracesOp(obsCtx, unmarshaler.Encoding(), 0, err)
return err
}
if errors.Is(err, context.Canceled) {
c.settings.Logger.Info("exiting consume traces loop")
c.obsrecv.EndTracesOp(obsCtx, unmarshaler.Encoding(), 0, err)
return err
}
c.settings.Logger.Error("failed to receive traces message from Pulsar, waiting for one second before retrying", zap.Error(err))
Expand Down Expand Up @@ -211,14 +210,13 @@ func consumeMetricsLoop(ctx context.Context, c *pulsarMetricsConsumer) error {
for {
obsCtx := c.obsrecv.StartMetricsOp(ctx)
message, err := c.consumer.Receive(ctx)
c.obsrecv.EndMetricsOp(obsCtx, unmarshaler.Encoding(), 0, err)
if err != nil {
if strings.Contains(err.Error(), alreadyClosedError) {
c.obsrecv.EndMetricsOp(obsCtx, unmarshaler.Encoding(), 0, err)
return err
}
if errors.Is(err, context.Canceled) {
c.settings.Logger.Info("exiting consume metrics loop")
c.obsrecv.EndMetricsOp(obsCtx, unmarshaler.Encoding(), 0, err)
return err
}

Expand Down Expand Up @@ -328,13 +326,12 @@ func consumeLogsLoop(ctx context.Context, c *pulsarLogsConsumer) error {
obsCtx := c.obsrecv.StartLogsOp(ctx)
message, err := c.consumer.Receive(ctx)
if err != nil {
c.obsrecv.EndLogsOp(obsCtx, unmarshaler.Encoding(), 0, err)
if strings.Contains(err.Error(), alreadyClosedError) {
c.obsrecv.EndLogsOp(obsCtx, unmarshaler.Encoding(), 0, err)
return err
}
if errors.Is(err, context.Canceled) {
c.settings.Logger.Info("exiting consume traces loop canceled")
c.obsrecv.EndLogsOp(obsCtx, unmarshaler.Encoding(), 0, err)
return err
}
c.settings.Logger.Error("failed to receive logs message from Pulsar, waiting for one second before retrying", zap.Error(err))
Expand Down

0 comments on commit 0de19fd

Please sign in to comment.