Skip to content

Commit

Permalink
OpenTelemetry Metrics for Middlewares (#42)
Browse files Browse the repository at this point in the history
* implement otel metrics for middlewares

* allow transforming topic name before converting to attribute

* use raw topic in traces and transformed topic in metrics

* add courier.client.connected gauge

* use info handler instead of emitter to scrape connection metrics

* add bucket boundaries acc to seconds instrumentation

* add bucket boundaries option to override defaults
  • Loading branch information
ajatprabha authored Apr 23, 2024
1 parent 698c3a9 commit f0fcd5e
Show file tree
Hide file tree
Showing 20 changed files with 1,360 additions and 394 deletions.
280 changes: 175 additions & 105 deletions docs/docs/sdk/otelcourier.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,23 @@ Package otelcourier instruments the github.com/gojek/courier\-go package.
## Index

- [Constants](#constants)
- [func DisableCallbackTracing\(opts \*traceOptions\)](#DisableCallbackTracing)
- [func DisablePublisherTracing\(opts \*traceOptions\)](#DisablePublisherTracing)
- [func DisableSubscriberTracing\(opts \*traceOptions\)](#DisableSubscriberTracing)
- [func DisableUnsubscriberTracing\(opts \*traceOptions\)](#DisableUnsubscriberTracing)
- [Variables](#variables)
- [func DefaultTopicAttributeTransformer\(\_ context.Context, topic string\) string](#DefaultTopicAttributeTransformer)
- [type BucketBoundaries](#BucketBoundaries)
- [type OTel](#OTel)
- [func New\(service string, opts ...Option\) \*OTel](#New)
- [func \(t \*OTel\) ApplyMiddlewares\(c UseMiddleware\)](#OTel.ApplyMiddlewares)
- [func \(t \*OTel\) PublisherMiddleware\(next courier.Publisher\) courier.Publisher](#OTel.PublisherMiddleware)
- [func \(t \*OTel\) SubscriberMiddleware\(next courier.Subscriber\) courier.Subscriber](#OTel.SubscriberMiddleware)
- [func \(t \*OTel\) UnsubscriberMiddleware\(next courier.Unsubscriber\) courier.Unsubscriber](#OTel.UnsubscriberMiddleware)
- [type Option](#Option)
- [func WithInfoHandlerFrom\(c interface\{ InfoHandler\(\) http.Handler \}\) Option](#WithInfoHandlerFrom)
- [func WithMeterProvider\(provider metric.MeterProvider\) Option](#WithMeterProvider)
- [func WithTextMapCarrierExtractFunc\(fn func\(context.Context\) propagation.TextMapCarrier\) Option](#WithTextMapCarrierExtractFunc)
- [func WithTextMapPropagator\(propagator propagation.TextMapPropagator\) Option](#WithTextMapPropagator)
- [func WithTracerProvider\(provider oteltrace.TracerProvider\) Option](#WithTracerProvider)
- [type Tracer](#Tracer)
- [func NewTracer\(service string, opts ...Option\) \*Tracer](#NewTracer)
- [func \(t \*Tracer\) ApplyTraceMiddlewares\(c \*courier.Client\)](#Tracer.ApplyTraceMiddlewares)
- [func \(t \*Tracer\) PublisherMiddleware\(next courier.Publisher\) courier.Publisher](#Tracer.PublisherMiddleware)
- [func \(t \*Tracer\) SubscriberMiddleware\(next courier.Subscriber\) courier.Subscriber](#Tracer.SubscriberMiddleware)
- [func \(t \*Tracer\) UnsubscriberMiddleware\(next courier.Unsubscriber\) courier.Unsubscriber](#Tracer.UnsubscriberMiddleware)
- [type TopicAttributeTransformer](#TopicAttributeTransformer)
- [type UseMiddleware](#UseMiddleware)


## Constants
Expand All @@ -41,185 +44,252 @@ const (
MQTTTopicWithQoS = attribute.Key("mqtt.topicwithqos")
// MQTTRetained is the attribute key for tracing message retained flag
MQTTRetained = attribute.Key("mqtt.retained")
// MQTTClientID is the attribute key for tracing mqtt client id
MQTTClientID = attribute.Key("mqtt.clientid")
)
```

<a name="DisableCallbackTracing"></a>
## func [DisableCallbackTracing](https://github.com/gojek/courier-go/blob/main/otelcourier/options.go#L59)
## Variables

```go
func DisableCallbackTracing(opts *traceOptions)
```

DisableCallbackTracing disables implicit tracing on subscription callbacks.

<a name="DisablePublisherTracing"></a>
## func [DisablePublisherTracing](https://github.com/gojek/courier-go/blob/main/otelcourier/options.go#L62)
<a name="DisableCallbackTracing"></a>DisableCallbackTracing disables implicit tracing on subscription callbacks.

```go
func DisablePublisherTracing(opts *traceOptions)
var DisableCallbackTracing = &disableTracePathOpt{traceCallback}
```

DisablePublisherTracing disables courier.Publisher tracing.

<a name="DisableSubscriberTracing"></a>
## func [DisableSubscriberTracing](https://github.com/gojek/courier-go/blob/main/otelcourier/options.go#L65)
<a name="DisablePublisherTracing"></a>DisablePublisherTracing disables courier.Publisher tracing.

```go
func DisableSubscriberTracing(opts *traceOptions)
var DisablePublisherTracing = &disableTracePathOpt{tracePublisher}
```

DisableSubscriberTracing disables courier.Subscriber tracing.

<a name="DisableUnsubscriberTracing"></a>
## func [DisableUnsubscriberTracing](https://github.com/gojek/courier-go/blob/main/otelcourier/options.go#L68)
<a name="DisableSubscriberTracing"></a>DisableSubscriberTracing disables courier.Subscriber tracing.

```go
func DisableUnsubscriberTracing(opts *traceOptions)
var DisableSubscriberTracing = &disableTracePathOpt{traceSubscriber}
```

DisableUnsubscriberTracing disables courier.Unsubscriber tracing.

<a name="Option"></a>
## type [Option](https://github.com/gojek/courier-go/blob/main/otelcourier/options.go#L30)

Option helps configure trace options.
<a name="DisableUnsubscriberTracing"></a>DisableUnsubscriberTracing disables courier.Unsubscriber tracing.

```go
type Option func(*traceOptions)
var DisableUnsubscriberTracing = &disableTracePathOpt{traceUnsubscriber}
```

<a name="WithTextMapCarrierExtractFunc"></a>
### func [WithTextMapCarrierExtractFunc](https://github.com/gojek/courier-go/blob/main/otelcourier/options.go#L54)
<a name="DefaultTopicAttributeTransformer"></a>
## func [DefaultTopicAttributeTransformer](https://github.com/gojek/courier-go/blob/main/otelcourier/options.go#L63)

```go
func WithTextMapCarrierExtractFunc(fn func(context.Context) propagation.TextMapCarrier) Option
func DefaultTopicAttributeTransformer(_ context.Context, topic string) string
```

WithTextMapCarrierExtractFunc is used to specify the function which should be used to extract propagation.TextMapCarrier from the ongoing context.Context.
DefaultTopicAttributeTransformer is the default transformer for topic attribute.

<a name="WithTextMapPropagator"></a>
### func [WithTextMapPropagator](https://github.com/gojek/courier-go/blob/main/otelcourier/options.go#L48)
<a name="BucketBoundaries"></a>
## type [BucketBoundaries](https://github.com/gojek/courier-go/blob/main/otelcourier/options.go#L66-L68)

```go
func WithTextMapPropagator(propagator propagation.TextMapPropagator) Option
```

WithTextMapPropagator specifies the propagator to use for extracting/injecting key\-value texts. If none is specified, the global provider is used.

<a name="WithTracerProvider"></a>
### func [WithTracerProvider](https://github.com/gojek/courier-go/blob/main/otelcourier/options.go#L42)
BucketBoundaries helps override default histogram bucket boundaries for metrics.

```go
func WithTracerProvider(provider oteltrace.TracerProvider) Option
type BucketBoundaries struct {
Publisher, Subscriber, Unsubscriber, Callback []float64
}
```

WithTracerProvider specifies a tracer provider to use for creating a tracer. If none is specified, the global provider is used.

<a name="Tracer"></a>
## type [Tracer](https://github.com/gojek/courier-go/blob/main/otelcourier/trace.go#L17-L23)
<a name="OTel"></a>
## type [OTel](https://github.com/gojek/courier-go/blob/main/otelcourier/otel.go#L29-L41)

Tracer implements tracing abilities using OpenTelemetry SDK.
OTel implements tracing & metric abilities using OpenTelemetry SDK.

```go
type Tracer struct {
type OTel struct {
// contains filtered or unexported fields
}
```

<a name="NewTracer"></a>
### func [NewTracer](https://github.com/gojek/courier-go/blob/main/otelcourier/trace.go#L26)
<a name="New"></a>
### func [New](https://github.com/gojek/courier-go/blob/main/otelcourier/otel.go#L44)

```go
func NewTracer(service string, opts ...Option) *Tracer
func New(service string, opts ...Option) *OTel
```

NewTracer creates a new Tracer with Option\(s\).
New creates a new OTel with Option\(s\).

<details><summary>Example</summary>
<p>



```go
package main

import (
"context"
"os"
"os/signal"
"syscall"
tp := trace.NewTracerProvider()
defer tp.Shutdown(context.Background())

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/sdk/trace"

courier "github.com/gojek/courier-go"
"github.com/gojek/courier-go/otelcourier"
exporter, err := prometheus.New(
/* Add a non-default prometheus registry here with `prometheus.WithRegisterer` option, if needed. */
)
if err != nil {
panic(err)
}
mp := metric.NewMeterProvider(metric.WithReader(exporter))

func main() {
tp := trace.NewTracerProvider()
defer tp.Shutdown(context.Background())
otel.SetTracerProvider(tp)
otel.SetMeterProvider(mp)
otel.SetTextMapPropagator(&propagation.TraceContext{})

otel.SetTracerProvider(tp)
metricLabelMapper := otelcourier.TopicAttributeTransformer(func(ctx context.Context, topic string) string {
if strings.HasPrefix(topic, "test") {
return "test"
}

c, _ := courier.NewClient()
otelcourier.NewTracer("service-name").ApplyTraceMiddlewares(c)
return "other"
})

if err := c.Start(); err != nil {
panic(err)
}
c, _ := courier.NewClient()
otelcourier.New(
"service-name",
// Use this to also track active connections.
otelcourier.WithInfoHandlerFrom(c),
metricLabelMapper,
).ApplyMiddlewares(c)

stopCh := make(chan os.Signal, 1)
signal.Notify(stopCh, []os.Signal{os.Interrupt, syscall.SIGTERM}...)
if err := c.Start(); err != nil {
panic(err)
}

if err := c.Publish(
context.Background(), "test-topic", "message", courier.QOSOne); err != nil {
panic(err)
}
<-stopCh
ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill)

if err := c.Publish(
context.Background(), "test-topic", "message", courier.QOSOne); err != nil {
panic(err)
}

c.Stop()
if err := c.Publish(
context.Background(), "other-topic", "message", courier.QOSOne); err != nil {
panic(err)
}

// Here, you can expose the metrics at /metrics endpoint for prometheus.DefaultRegisterer.

<-ctx.Done()

c.Stop()
```

</p>
</details>

<a name="Tracer.ApplyTraceMiddlewares"></a>
### func \(\*Tracer\) [ApplyTraceMiddlewares](https://github.com/gojek/courier-go/blob/main/otelcourier/trace.go#L49)
<a name="OTel.ApplyMiddlewares"></a>
### func \(\*OTel\) [ApplyMiddlewares](https://github.com/gojek/courier-go/blob/main/otelcourier/otel.go#L81)

```go
func (t *Tracer) ApplyTraceMiddlewares(c *courier.Client)
func (t *OTel) ApplyMiddlewares(c UseMiddleware)
```

ApplyTraceMiddlewares will instrument all the operations of a courier.Client instance according to Option\(s\) used.
ApplyMiddlewares will instrument all the operations of a UseMiddleware instance according to Option\(s\) used.

<a name="Tracer.PublisherMiddleware"></a>
### func \(\*Tracer\) [PublisherMiddleware](https://github.com/gojek/courier-go/blob/main/otelcourier/publish.go#L20)
<a name="OTel.PublisherMiddleware"></a>
### func \(\*OTel\) [PublisherMiddleware](https://github.com/gojek/courier-go/blob/main/otelcourier/publish.go#L23)

```go
func (t *Tracer) PublisherMiddleware(next courier.Publisher) courier.Publisher
func (t *OTel) PublisherMiddleware(next courier.Publisher) courier.Publisher
```

PublisherMiddleware is a courier.PublisherMiddlewareFunc for tracing publish calls.

<a name="Tracer.SubscriberMiddleware"></a>
### func \(\*Tracer\) [SubscriberMiddleware](https://github.com/gojek/courier-go/blob/main/otelcourier/subscribe.go#L25)
<a name="OTel.SubscriberMiddleware"></a>
### func \(\*OTel\) [SubscriberMiddleware](https://github.com/gojek/courier-go/blob/main/otelcourier/subscribe.go#L43)

```go
func (t *Tracer) SubscriberMiddleware(next courier.Subscriber) courier.Subscriber
func (t *OTel) SubscriberMiddleware(next courier.Subscriber) courier.Subscriber
```

SubscriberMiddleware is a courier.SubscriberMiddlewareFunc for tracing subscribe calls.

<a name="Tracer.UnsubscriberMiddleware"></a>
### func \(\*Tracer\) [UnsubscriberMiddleware](https://github.com/gojek/courier-go/blob/main/otelcourier/unsubscribe.go#L19)
<a name="OTel.UnsubscriberMiddleware"></a>
### func \(\*OTel\) [UnsubscriberMiddleware](https://github.com/gojek/courier-go/blob/main/otelcourier/unsubscribe.go#L21)

```go
func (t *Tracer) UnsubscriberMiddleware(next courier.Unsubscriber) courier.Unsubscriber
func (t *OTel) UnsubscriberMiddleware(next courier.Unsubscriber) courier.Unsubscriber
```

UnsubscriberMiddleware is a courier.UnsubscriberMiddlewareFunc for tracing unsubscribe calls.

<a name="Option"></a>
## type [Option](https://github.com/gojek/courier-go/blob/main/otelcourier/options.go#L14)

Option helps configure trace options.

```go
type Option interface {
// contains filtered or unexported methods
}
```

<a name="WithInfoHandlerFrom"></a>
### func [WithInfoHandlerFrom](https://github.com/gojek/courier-go/blob/main/otelcourier/options.go#L46)

```go
func WithInfoHandlerFrom(c interface{ InfoHandler() http.Handler }) Option
```

WithInfoHandlerFrom is used to specify the handler which should be used to extract client information from the courier.Client instance.

<a name="WithMeterProvider"></a>
### func [WithMeterProvider](https://github.com/gojek/courier-go/blob/main/otelcourier/options.go#L28)

```go
func WithMeterProvider(provider metric.MeterProvider) Option
```

WithMeterProvider specifies a meter provider to use for creating a meter. If none is specified, the global provider is used.

<a name="WithTextMapCarrierExtractFunc"></a>
### func [WithTextMapCarrierExtractFunc](https://github.com/gojek/courier-go/blob/main/otelcourier/options.go#L40)

```go
func WithTextMapCarrierExtractFunc(fn func(context.Context) propagation.TextMapCarrier) Option
```

WithTextMapCarrierExtractFunc is used to specify the function which should be used to extract propagation.TextMapCarrier from the ongoing context.Context.

<a name="WithTextMapPropagator"></a>
### func [WithTextMapPropagator](https://github.com/gojek/courier-go/blob/main/otelcourier/options.go#L34)

```go
func WithTextMapPropagator(propagator propagation.TextMapPropagator) Option
```

WithTextMapPropagator specifies the propagator to use for extracting/injecting key\-value texts. If none is specified, the global provider is used.

<a name="WithTracerProvider"></a>
### func [WithTracerProvider](https://github.com/gojek/courier-go/blob/main/otelcourier/options.go#L22)

```go
func WithTracerProvider(provider oteltrace.TracerProvider) Option
```

WithTracerProvider specifies a tracer provider to use for creating a tracer. If none is specified, the global provider is used.

<a name="TopicAttributeTransformer"></a>
## type [TopicAttributeTransformer](https://github.com/gojek/courier-go/blob/main/otelcourier/options.go#L18)

TopicAttributeTransformer helps transform topic before making an attribute for it. It is used in metric recording only. Traces use the original topic.

```go
type TopicAttributeTransformer func(context.Context, string) string
```

<a name="UseMiddleware"></a>
## type [UseMiddleware](https://github.com/gojek/courier-go/blob/main/otelcourier/otel.go#L22-L26)

UseMiddleware is an interface that defines the methods to apply middlewares to a courier.Client or similar instance.

```go
type UseMiddleware interface {
UsePublisherMiddleware(mwf ...courier.PublisherMiddlewareFunc)
UseSubscriberMiddleware(mwf ...courier.SubscriberMiddlewareFunc)
UseUnsubscriberMiddleware(mwf ...courier.UnsubscriberMiddlewareFunc)
}
```

Generated by [gomarkdoc](https://github.com/princjef/gomarkdoc)
Loading

0 comments on commit f0fcd5e

Please sign in to comment.