Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(otel): add opentelemety utility functions #272

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
22 changes: 15 additions & 7 deletions channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -1492,7 +1492,7 @@ func (ch *Channel) Publish(exchange, key string, mandatory, immediate bool, msg
/*
PublishWithContext sends a Publishing from the client to an exchange on the server.

NOTE: this function is equivalent to [Channel.Publish]. Context is not honoured.
NOTE: Context termination is not honoured.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're now using the context for span propagation in-process.


When you want a single message to be delivered to a single queue, you can
publish to the default exchange with the routingKey of the queue name. This is
Expand Down Expand Up @@ -1523,8 +1523,9 @@ confirmations start at 1. Exit when all publishings are confirmed.
When Publish does not return an error and the channel is in confirm mode, the
internal counter for DeliveryTags with the first confirmation starts at 1.
*/
func (ch *Channel) PublishWithContext(_ context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) error {
return ch.Publish(exchange, key, mandatory, immediate, msg)
func (ch *Channel) PublishWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) error {
_, err := ch.PublishWithDeferredConfirmWithContext(ctx, exchange, key, mandatory, immediate, msg)
return err
}

/*
Expand Down Expand Up @@ -1583,11 +1584,18 @@ DeferredConfirmation, allowing the caller to wait on the publisher confirmation
for this message. If the channel has not been put into confirm mode,
the DeferredConfirmation will be nil.

NOTE: PublishWithDeferredConfirmWithContext is equivalent to its non-context variant. The context passed
to this function is not honoured.
NOTE: PublishWithDeferredConfirmWithContext is equivalent to its non-context
variant. The termination of the context passed to this function is not
honoured.
*/
func (ch *Channel) PublishWithDeferredConfirmWithContext(_ context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) (*DeferredConfirmation, error) {
return ch.PublishWithDeferredConfirm(exchange, key, mandatory, immediate, msg)
func (ch *Channel) PublishWithDeferredConfirmWithContext(ctx context.Context, exchange, key string, mandatory, immediate bool, msg Publishing) (*DeferredConfirmation, error) {
_, msg, errFn := spanForPublication(ctx, msg, exchange, key, immediate)
dc, err := ch.PublishWithDeferredConfirm(exchange, key, mandatory, immediate, msg)
if err != nil {
errFn(err)
Comment on lines +1594 to +1595
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

errFn needs to be called regardless of there is an err or not, to properly ends the Span.

Suggested change
if err != nil {
errFn(err)
errFn(err)
if err != nil {

Maybe also rename errFn to endFn to make the intention clearer.

return nil, err
}
return dc, nil
}

/*
Expand Down
54 changes: 54 additions & 0 deletions delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,12 @@
package amqp091

import (
"context"
"errors"
"fmt"
"time"

"go.opentelemetry.io/otel/trace"
)

var errDeliveryNotInitialized = errors.New("delivery not initialized")
Expand Down Expand Up @@ -58,6 +62,21 @@ type Delivery struct {
Body []byte
}

// Span returns context and a span that for the delivery
// the resulting span is linked to the publication that created it, if it has
// the appropraite headers set. See [context-propagation] for more details
//
// [context-propagation]: https://opentelemetry.io/docs/concepts/context-propagation/
func (d *Delivery) Span(ctx context.Context, options ...trace.SpanStartOption) (context.Context, trace.Span) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is an okay route-- clients can use their own context + span to indicate boundaries of a batch, and then get child spans for each delivery, with each span linked to the publication.

I also provide access to the Link for a delivery in case they want to combine multiple links into one span for their batch (that's what I would prefer in my use case, but that's because I'm defining a naturally batching consumer).

The tradeoff is that without storing additional state, we're relying on the client to tell us the context when we go to ack nack, which could lead to errors.

If we instead store the span on the delivery, we can close it when we ack, after inserting a child settle span for the ack itself. This has the implication that every consumer needs to settle the delivery even if their in autoack mode in order for them to see spans in their telemetry info (spans are usually not sent until they are closed). In autoack mode the settle method would just close the span, with no further implications at the wire level.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we instead store the span on the delivery, we can close it when we ack, after inserting a child settle span for the ack itself

I prefer this approach, TBH. Relying on the user would make this implementation brittle. I'm ok with having a limitation handling autoack, because that's really not a recommended practice. autoack is synonym of YOLO I don't care about my data, just GO!

return spanForDelivery(ctx, d.ConsumerTag, d, options...)
}

// Link returns a link for the delivery. The link points to the publication, if
// the appropriate headers are set.
func (d *Delivery) Link(ctx context.Context) trace.Link {
return extractLinkFromDelivery(ctx, d)
}

func newDelivery(channel *Channel, msg messageWithContent) *Delivery {
props, body := msg.getContent()

Expand Down Expand Up @@ -171,3 +190,38 @@ func (d Delivery) Nack(multiple, requeue bool) error {
}
return d.Acknowledger.Nack(d.DeliveryTag, multiple, requeue)
}

type DeliveryResponse uint8

const (
Ack DeliveryResponse = iota
Reject
Nack
)

func (r DeliveryResponse) Name() string {
switch r {
case Ack:
return "ack"
case Nack:
return "nack"
case Reject:
return "reject"
default:
return "unknown"
}
}

func (d *Delivery) Settle(ctx context.Context, response DeliveryResponse, multiple, requeue bool) error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel deeply ambivalent about this approach, but the alternative would seem to be providing (Ack,Nack,Rject)Ctx methods. TBH that's probably better.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that (Ack,Nack,Reject)Ctx methods are probably a better alternative, and settle the delivery automagically in those functions.

defer settleDelivery(ctx, d, response, multiple, requeue)
switch response {
case Ack:
return d.Ack(multiple)
case Nack:
return d.Nack(multiple, requeue)
case Reject:
return d.Reject(requeue)
default:
return fmt.Errorf("unknown operation %s", response.Name())
}
}
17 changes: 15 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,5 +1,18 @@
module github.com/rabbitmq/amqp091-go

go 1.20
go 1.21

require go.uber.org/goleak v1.3.0
toolchain go1.22.0

require (
github.com/getoutreach/gobox v1.92.1
go.opentelemetry.io/otel v1.27.0
go.opentelemetry.io/otel/trace v1.27.0
go.uber.org/goleak v1.3.0
)

require (
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
go.opentelemetry.io/otel/metric v1.27.0 // indirect
)
23 changes: 22 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
@@ -1,6 +1,27 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/getoutreach/gobox v1.92.1 h1:MBDedZCUN+ef/ljBHAOSyVisqvR5dPlSwso1JdMPbXw=
github.com/getoutreach/gobox v1.92.1/go.mod h1:IPy+RNuOYRMTizH6iTr33myGKcRhjEIIHS2VMqzZL0A=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.opentelemetry.io/otel v1.27.0 h1:9BZoF3yMK/O1AafMiQTVu0YDj5Ea4hPhxCs7sGva+cg=
go.opentelemetry.io/otel v1.27.0/go.mod h1:DMpAK8fzYRzs+bi3rS5REupisuqTheUlSZJ1WnZaPAQ=
go.opentelemetry.io/otel/metric v1.27.0 h1:hvj3vdEKyeCi4YaYfNjv2NUje8FqKqUY8IlF0FxV/ik=
go.opentelemetry.io/otel/metric v1.27.0/go.mod h1:mVFgmRlhljgBiuk/MP/oKylr4hs85GZAylncepAX/ak=
go.opentelemetry.io/otel/trace v1.27.0 h1:IqYb813p7cmbHk0a5y6pD5JPakbVfftRXABGt5/Rscw=
go.opentelemetry.io/otel/trace v1.27.0/go.mod h1:6RiD1hkAprV4/q+yd2ln1HG9GoPx39SuvvstaLBl+l4=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gotest.tools/v3 v3.5.1 h1:EENdUnS3pdur5nybKYIh2Vfgc8IUNBjxDPSjtiJcOzU=
gotest.tools/v3 v3.5.1/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU=
203 changes: 203 additions & 0 deletions opentelemetry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
package amqp091

import (
"context"
"errors"
"fmt"

"github.com/getoutreach/gobox/pkg/app"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/propagation"
semconv "go.opentelemetry.io/otel/semconv/v1.25.0"
"go.opentelemetry.io/otel/trace"
)

// tracer is the tracer used by the package
var tracer = otel.Tracer("amqp091")

// amqpHeaderCarrier is a carrier for AMQP headers.
type amqpHeaderCarrier Table

// Get returns the value associated with the passed key.
func (c amqpHeaderCarrier) Get(key string) string {
v, ok := c[key]
if !ok {
return ""
}
s, ok := v.(string)
if ok {
return s
}
return ""
}

// Set stores the key-value pair.
func (c amqpHeaderCarrier) Set(key, value string) {
c[key] = value
}

// Keys lists the keys stored in this carrier.
func (c amqpHeaderCarrier) Keys() []string {
keys := []string{}
for k, v := range c {
if _, ok := v.(string); !ok {
continue
}
keys = append(keys, k)
}
return keys
}

// ensure amqpHeaderCarrier implements the TextMapCarrier interface
var _ propagation.TextMapCarrier = amqpHeaderCarrier{}

// InjectSpan injects the span context into the AMQP headers.
// It returns the input headers with the span headers added.
func injectSpanFromContext(ctx context.Context, headers Table) Table {
carrier := amqpHeaderCarrier(headers)
if carrier == nil {
carrier = amqpHeaderCarrier{}
}
otel.GetTextMapPropagator().Inject(ctx, carrier)
return Table(carrier)
}

// ExtractSpanContext extracts the span context from the AMQP headers.
func ExtractSpanContext(ctx context.Context, headers Table) context.Context {
carrier := amqpHeaderCarrier(headers)
if carrier == nil {
carrier = amqpHeaderCarrier{}
}
return otel.GetTextMapPropagator().Extract(ctx, carrier)
}

// extractSpanFromReturn creates a span for a returned message
func extractSpanFromReturn(

Check failure on line 77 in opentelemetry.go

View workflow job for this annotation

GitHub Actions / lint

func `extractSpanFromReturn` is unused (unused)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh I haven't wired the return up yet. Probably gets a similar treatment to Delivery, if that works.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rabbitmq semconv specs does not mention how message returns should be instrumented, maybe we should open an issue in https://github.com/open-telemetry/semantic-conventions/issues asking for clarification.

ctx context.Context,
ret Return,
) (context.Context, trace.Span) {
spctx := ExtractSpanContext(ctx, ret.Headers)
spanName := fmt.Sprintf("%s return", ret.RoutingKey)
return tracer.Start(ctx, spanName,
trace.WithLinks(trace.LinkFromContext(spctx, semconv.MessagingMessageID(ret.MessageId))),
trace.WithSpanKind(trace.SpanKindProducer),
trace.WithAttributes(
semconv.MessagingRabbitmqDestinationRoutingKey(ret.RoutingKey),
semconv.MessagingDestinationPublishName(ret.Exchange),
semconv.MessagingOperationKey.String("return"),
semconv.MessagingMessageID(ret.MessageId),
semconv.MessagingMessageConversationID(ret.CorrelationId),
semconv.MessagingSystemRabbitmq,
semconv.MessagingClientIDKey.String(app.Info().Name),
semconv.ErrorTypeKey.String(ret.ReplyText),
// semconv.NetPeerPort(5672
// semconv.NetPeerIP("localhost")
// semconv.ServerAddress("localhost")
),
trace.WithNewRoot(),
)
}

// settleDelivery creates a span for the acking of a delivery
func settleDelivery(
ctx context.Context,
delivery *Delivery,
response DeliveryResponse,
multiple, requeue bool,
) (context.Context, trace.Span) {
return tracer.Start(ctx,
fmt.Sprintf("%s settle", delivery.RoutingKey),
trace.WithAttributes(
attribute.String("messaging.operation.name", response.Name()),
attribute.Bool("multiple", multiple),
attribute.Bool("requeue", requeue)))
}

// extractLinkFromDelivery creates a link for a delivered message
//
// The recommend way to link a consumer to the publisher is with a link, since
// the two operations can be quit far apart in time. If you have a usecase
// where you would like the spans to have a parent child relationship instead, use
// ExtractSpanContext
//
// The consumer span may containe 1 or more messages, which is why we don't
// manufacture the span in its entirety here.
func extractLinkFromDelivery(ctx context.Context, del *Delivery) trace.Link {
spctx := ExtractSpanContext(ctx, del.Headers)
return trace.LinkFromContext(spctx,
semconv.MessagingMessageConversationID(del.CorrelationId),
semconv.MessagingMessageID(del.MessageId),
semconv.MessagingRabbitmqMessageDeliveryTag(int(del.DeliveryTag)))
}

// spanForDelivery creates a span for the delivered messages
// returns a new context with the span headers and the span.
func spanForDelivery(
ctx context.Context,
consumerTag string,
delivery *Delivery,
options ...trace.SpanStartOption,
) (context.Context, trace.Span) {
spanName := fmt.Sprintf("%s consume", consumerTag)
links := []trace.Link{}
links = append(links, extractLinkFromDelivery(ctx, delivery))
return tracer.Start(
ctx,
spanName,
append(
options,
trace.WithLinks(links...),
trace.WithSpanKind(trace.SpanKindConsumer),
)...,
)
}

// Publish creates a span for a publishing message returns a new context with
// the span headers, the mssage that was being published with span headers
// injected, and a function to be called with the result of the publish
func spanForPublication(
ctx context.Context,
publishing Publishing,
exchange, routinKey string,
immediate bool,
) (context.Context, Publishing, func(err error)) {
spanName := fmt.Sprintf("%s publish", routinKey)
ctx, span := tracer.Start(ctx, spanName,
trace.WithSpanKind(trace.SpanKindProducer),
trace.WithAttributes(
semconv.MessagingRabbitmqDestinationRoutingKey(routinKey),
semconv.MessagingDestinationPublishName(exchange),
semconv.MessagingOperationPublish,
semconv.MessagingMessageID(publishing.MessageId),
semconv.MessagingMessageConversationID(publishing.CorrelationId),
semconv.MessagingSystemRabbitmq,
semconv.MessagingClientIDKey.String(publishing.AppId),
semconv.MessagingMessageBodySize(len(publishing.Body)),
semconv.MessageTypeSent,
attribute.Bool("messaging.immediate", immediate),

// TODO(AWinterman): Add these attributes
// semconv.NetPeerPort(5672) // nolint:gocritic // Why: see to do
// semconv.NetworkPeerAddress() // nolint:gocritic // Why: see to do
// semconv.NetPeerPort() // nolint:gocritic // Why: see to do
),
)
headers := injectSpanFromContext(ctx, publishing.Headers)
publishing.Headers = Table(headers)

return ctx, publishing, func(err error) {
if err != nil {
span.RecordError(err)
amqpErr := &Error{}
if errors.As(err, &amqpErr) {
span.SetAttributes(
semconv.ErrorTypeKey.String(amqpErr.Reason),
)
}
span.SetStatus(codes.Error, err.Error())
}
span.End()
}
}
Loading