diff --git a/receiver/pulsarreceiver/pulsar_receiver.go b/receiver/pulsarreceiver/pulsar_receiver.go index fe779c06c4e6..4130a10b3a4b 100644 --- a/receiver/pulsarreceiver/pulsar_receiver.go +++ b/receiver/pulsarreceiver/pulsar_receiver.go @@ -13,6 +13,7 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/receiverhelper" "go.uber.org/zap" ) @@ -20,6 +21,8 @@ var errUnrecognizedEncoding = errors.New("unrecognized encoding") const alreadyClosedError = "AlreadyClosedError" +const transport = "pulsar" + type pulsarTracesConsumer struct { tracesConsumer consumer.Traces topic string @@ -29,9 +32,18 @@ type pulsarTracesConsumer struct { unmarshaler TracesUnmarshaler settings receiver.Settings consumerOptions pulsar.ConsumerOptions + obsrecv *receiverhelper.ObsReport } func newTracesReceiver(config Config, set receiver.Settings, unmarshalers map[string]TracesUnmarshaler, nextConsumer consumer.Traces) (*pulsarTracesConsumer, error) { + obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ + ReceiverID: set.ID, + Transport: transport, + ReceiverCreateSettings: set, + }) + if err != nil { + return nil, err + } unmarshaler := unmarshalers[config.Encoding] if nil == unmarshaler { return nil, errUnrecognizedEncoding @@ -49,6 +61,7 @@ func newTracesReceiver(config Config, set receiver.Settings, unmarshalers map[st } return &pulsarTracesConsumer{ + obsrecv: obsrecv, tracesConsumer: nextConsumer, topic: config.Topic, unmarshaler: unmarshaler, @@ -81,8 +94,10 @@ func consumerTracesLoop(ctx context.Context, c *pulsarTracesConsumer) error { // TODO: Ensure returned errors are handled for { + obsCtx := c.obsrecv.StartTracesOp(ctx) message, err := c.consumer.Receive(ctx) if err != nil { + c.obsrecv.EndTracesOp(obsCtx, unmarshaler.Encoding(), 0, err) if strings.Contains(err.Error(), alreadyClosedError) { return err } @@ -98,13 +113,15 @@ func consumerTracesLoop(ctx context.Context, c *pulsarTracesConsumer) error { traces, err := unmarshaler.Unmarshal(message.Payload()) if err != nil { c.settings.Logger.Error("failed to unmarshaler traces message", zap.Error(err)) + c.obsrecv.EndTracesOp(obsCtx, unmarshaler.Encoding(), 0, err) _ = c.consumer.Ack(message) return err } - - if err := traceConsumer.ConsumeTraces(context.Background(), traces); err != nil { + err = traceConsumer.ConsumeTraces(context.Background(), traces) + if err != nil { c.settings.Logger.Error("consume traces failed", zap.Error(err)) } + c.obsrecv.EndTracesOp(obsCtx, unmarshaler.Encoding(), traces.SpanCount(), err) _ = c.consumer.Ack(message) } } @@ -128,9 +145,18 @@ type pulsarMetricsConsumer struct { cancel context.CancelFunc settings receiver.Settings consumerOptions pulsar.ConsumerOptions + obsrecv *receiverhelper.ObsReport } func newMetricsReceiver(config Config, set receiver.Settings, unmarshalers map[string]MetricsUnmarshaler, nextConsumer consumer.Metrics) (*pulsarMetricsConsumer, error) { + obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ + ReceiverID: set.ID, + Transport: transport, + ReceiverCreateSettings: set, + }) + if err != nil { + return nil, err + } unmarshaler := unmarshalers[config.Encoding] if nil == unmarshaler { return nil, errUnrecognizedEncoding @@ -148,6 +174,7 @@ func newMetricsReceiver(config Config, set receiver.Settings, unmarshalers map[s } return &pulsarMetricsConsumer{ + obsrecv: obsrecv, metricsConsumer: nextConsumer, topic: config.Topic, unmarshaler: unmarshaler, @@ -181,8 +208,10 @@ func consumeMetricsLoop(ctx context.Context, c *pulsarMetricsConsumer) error { // TODO: Ensure returned errors are handled for { + obsCtx := c.obsrecv.StartMetricsOp(ctx) message, err := c.consumer.Receive(ctx) if err != nil { + c.obsrecv.EndMetricsOp(obsCtx, unmarshaler.Encoding(), 0, err) if strings.Contains(err.Error(), alreadyClosedError) { return err } @@ -199,13 +228,15 @@ func consumeMetricsLoop(ctx context.Context, c *pulsarMetricsConsumer) error { metrics, err := unmarshaler.Unmarshal(message.Payload()) if err != nil { c.settings.Logger.Error("failed to unmarshaler metrics message", zap.Error(err)) + c.obsrecv.EndMetricsOp(obsCtx, unmarshaler.Encoding(), 0, err) _ = c.consumer.Ack(message) return err } - - if err := metricsConsumer.ConsumeMetrics(context.Background(), metrics); err != nil { + err = metricsConsumer.ConsumeMetrics(context.Background(), metrics) + if err != nil { c.settings.Logger.Error("consume traces failed", zap.Error(err)) } + c.obsrecv.EndMetricsOp(obsCtx, unmarshaler.Encoding(), metrics.DataPointCount(), err) _ = c.consumer.Ack(message) } @@ -230,9 +261,18 @@ type pulsarLogsConsumer struct { cancel context.CancelFunc settings receiver.Settings consumerOptions pulsar.ConsumerOptions + obsrecv *receiverhelper.ObsReport } func newLogsReceiver(config Config, set receiver.Settings, unmarshalers map[string]LogsUnmarshaler, nextConsumer consumer.Logs) (*pulsarLogsConsumer, error) { + obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ + ReceiverID: set.ID, + Transport: transport, + ReceiverCreateSettings: set, + }) + if err != nil { + return nil, err + } unmarshaler := unmarshalers[config.Encoding] if nil == unmarshaler { return nil, errUnrecognizedEncoding @@ -250,6 +290,7 @@ func newLogsReceiver(config Config, set receiver.Settings, unmarshalers map[stri } return &pulsarLogsConsumer{ + obsrecv: obsrecv, logsConsumer: nextConsumer, topic: config.Topic, cancel: nil, @@ -282,8 +323,10 @@ func consumeLogsLoop(ctx context.Context, c *pulsarLogsConsumer) error { logsConsumer := c.logsConsumer for { + obsCtx := c.obsrecv.StartLogsOp(ctx) message, err := c.consumer.Receive(ctx) if err != nil { + c.obsrecv.EndLogsOp(obsCtx, unmarshaler.Encoding(), 0, err) if strings.Contains(err.Error(), alreadyClosedError) { return err } @@ -299,14 +342,15 @@ func consumeLogsLoop(ctx context.Context, c *pulsarLogsConsumer) error { logs, err := unmarshaler.Unmarshal(message.Payload()) if err != nil { c.settings.Logger.Error("failed to unmarshaler logs message", zap.Error(err)) + c.obsrecv.EndLogsOp(obsCtx, unmarshaler.Encoding(), 0, err) _ = c.consumer.Ack(message) return err } - - if err := logsConsumer.ConsumeLogs(context.Background(), logs); err != nil { + err = logsConsumer.ConsumeLogs(context.Background(), logs) + if err != nil { c.settings.Logger.Error("consume traces failed", zap.Error(err)) } - + c.obsrecv.EndLogsOp(obsCtx, unmarshaler.Encoding(), logs.LogRecordCount(), err) _ = c.consumer.Ack(message) } }