Skip to content

Commit

Permalink
[chore][receiver/pulsar]: add observability to pulsar receiver (open-…
Browse files Browse the repository at this point in the history
  • Loading branch information
VihasMakwana authored and shivanthzen committed Dec 4, 2024
1 parent 26adde4 commit 0e1ecf5
Showing 1 changed file with 51 additions and 7 deletions.
58 changes: 51 additions & 7 deletions receiver/pulsarreceiver/pulsar_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,16 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receiverhelper"
"go.uber.org/zap"
)

var errUnrecognizedEncoding = errors.New("unrecognized encoding")

const alreadyClosedError = "AlreadyClosedError"

const transport = "pulsar"

type pulsarTracesConsumer struct {
tracesConsumer consumer.Traces
topic string
Expand All @@ -29,9 +32,18 @@ type pulsarTracesConsumer struct {
unmarshaler TracesUnmarshaler
settings receiver.Settings
consumerOptions pulsar.ConsumerOptions
obsrecv *receiverhelper.ObsReport
}

func newTracesReceiver(config Config, set receiver.Settings, unmarshalers map[string]TracesUnmarshaler, nextConsumer consumer.Traces) (*pulsarTracesConsumer, error) {
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
ReceiverID: set.ID,
Transport: transport,
ReceiverCreateSettings: set,
})
if err != nil {
return nil, err
}
unmarshaler := unmarshalers[config.Encoding]
if nil == unmarshaler {
return nil, errUnrecognizedEncoding
Expand All @@ -49,6 +61,7 @@ func newTracesReceiver(config Config, set receiver.Settings, unmarshalers map[st
}

return &pulsarTracesConsumer{
obsrecv: obsrecv,
tracesConsumer: nextConsumer,
topic: config.Topic,
unmarshaler: unmarshaler,
Expand Down Expand Up @@ -81,8 +94,10 @@ func consumerTracesLoop(ctx context.Context, c *pulsarTracesConsumer) error {

// TODO: Ensure returned errors are handled
for {
obsCtx := c.obsrecv.StartTracesOp(ctx)
message, err := c.consumer.Receive(ctx)
if err != nil {
c.obsrecv.EndTracesOp(obsCtx, unmarshaler.Encoding(), 0, err)
if strings.Contains(err.Error(), alreadyClosedError) {
return err
}
Expand All @@ -98,13 +113,15 @@ func consumerTracesLoop(ctx context.Context, c *pulsarTracesConsumer) error {
traces, err := unmarshaler.Unmarshal(message.Payload())
if err != nil {
c.settings.Logger.Error("failed to unmarshaler traces message", zap.Error(err))
c.obsrecv.EndTracesOp(obsCtx, unmarshaler.Encoding(), 0, err)
_ = c.consumer.Ack(message)
return err
}

if err := traceConsumer.ConsumeTraces(context.Background(), traces); err != nil {
err = traceConsumer.ConsumeTraces(context.Background(), traces)
if err != nil {
c.settings.Logger.Error("consume traces failed", zap.Error(err))
}
c.obsrecv.EndTracesOp(obsCtx, unmarshaler.Encoding(), traces.SpanCount(), err)
_ = c.consumer.Ack(message)
}
}
Expand All @@ -128,9 +145,18 @@ type pulsarMetricsConsumer struct {
cancel context.CancelFunc
settings receiver.Settings
consumerOptions pulsar.ConsumerOptions
obsrecv *receiverhelper.ObsReport
}

func newMetricsReceiver(config Config, set receiver.Settings, unmarshalers map[string]MetricsUnmarshaler, nextConsumer consumer.Metrics) (*pulsarMetricsConsumer, error) {
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
ReceiverID: set.ID,
Transport: transport,
ReceiverCreateSettings: set,
})
if err != nil {
return nil, err
}
unmarshaler := unmarshalers[config.Encoding]
if nil == unmarshaler {
return nil, errUnrecognizedEncoding
Expand All @@ -148,6 +174,7 @@ func newMetricsReceiver(config Config, set receiver.Settings, unmarshalers map[s
}

return &pulsarMetricsConsumer{
obsrecv: obsrecv,
metricsConsumer: nextConsumer,
topic: config.Topic,
unmarshaler: unmarshaler,
Expand Down Expand Up @@ -181,8 +208,10 @@ func consumeMetricsLoop(ctx context.Context, c *pulsarMetricsConsumer) error {

// TODO: Ensure returned errors are handled
for {
obsCtx := c.obsrecv.StartMetricsOp(ctx)
message, err := c.consumer.Receive(ctx)
if err != nil {
c.obsrecv.EndMetricsOp(obsCtx, unmarshaler.Encoding(), 0, err)
if strings.Contains(err.Error(), alreadyClosedError) {
return err
}
Expand All @@ -199,13 +228,15 @@ func consumeMetricsLoop(ctx context.Context, c *pulsarMetricsConsumer) error {
metrics, err := unmarshaler.Unmarshal(message.Payload())
if err != nil {
c.settings.Logger.Error("failed to unmarshaler metrics message", zap.Error(err))
c.obsrecv.EndMetricsOp(obsCtx, unmarshaler.Encoding(), 0, err)
_ = c.consumer.Ack(message)
return err
}

if err := metricsConsumer.ConsumeMetrics(context.Background(), metrics); err != nil {
err = metricsConsumer.ConsumeMetrics(context.Background(), metrics)
if err != nil {
c.settings.Logger.Error("consume traces failed", zap.Error(err))
}
c.obsrecv.EndMetricsOp(obsCtx, unmarshaler.Encoding(), metrics.DataPointCount(), err)

_ = c.consumer.Ack(message)
}
Expand All @@ -230,9 +261,18 @@ type pulsarLogsConsumer struct {
cancel context.CancelFunc
settings receiver.Settings
consumerOptions pulsar.ConsumerOptions
obsrecv *receiverhelper.ObsReport
}

func newLogsReceiver(config Config, set receiver.Settings, unmarshalers map[string]LogsUnmarshaler, nextConsumer consumer.Logs) (*pulsarLogsConsumer, error) {
obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{
ReceiverID: set.ID,
Transport: transport,
ReceiverCreateSettings: set,
})
if err != nil {
return nil, err
}
unmarshaler := unmarshalers[config.Encoding]
if nil == unmarshaler {
return nil, errUnrecognizedEncoding
Expand All @@ -250,6 +290,7 @@ func newLogsReceiver(config Config, set receiver.Settings, unmarshalers map[stri
}

return &pulsarLogsConsumer{
obsrecv: obsrecv,
logsConsumer: nextConsumer,
topic: config.Topic,
cancel: nil,
Expand Down Expand Up @@ -282,8 +323,10 @@ func consumeLogsLoop(ctx context.Context, c *pulsarLogsConsumer) error {
logsConsumer := c.logsConsumer

for {
obsCtx := c.obsrecv.StartLogsOp(ctx)
message, err := c.consumer.Receive(ctx)
if err != nil {
c.obsrecv.EndLogsOp(obsCtx, unmarshaler.Encoding(), 0, err)
if strings.Contains(err.Error(), alreadyClosedError) {
return err
}
Expand All @@ -299,14 +342,15 @@ func consumeLogsLoop(ctx context.Context, c *pulsarLogsConsumer) error {
logs, err := unmarshaler.Unmarshal(message.Payload())
if err != nil {
c.settings.Logger.Error("failed to unmarshaler logs message", zap.Error(err))
c.obsrecv.EndLogsOp(obsCtx, unmarshaler.Encoding(), 0, err)
_ = c.consumer.Ack(message)
return err
}

if err := logsConsumer.ConsumeLogs(context.Background(), logs); err != nil {
err = logsConsumer.ConsumeLogs(context.Background(), logs)
if err != nil {
c.settings.Logger.Error("consume traces failed", zap.Error(err))
}

c.obsrecv.EndLogsOp(obsCtx, unmarshaler.Encoding(), logs.LogRecordCount(), err)
_ = c.consumer.Ack(message)
}
}
Expand Down

0 comments on commit 0e1ecf5

Please sign in to comment.