Skip to content

Commit

Permalink
add inflight metrics and fix bytes metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
njiang747 committed Jun 8, 2024
1 parent 189cc68 commit c48b82d
Show file tree
Hide file tree
Showing 5 changed files with 112 additions and 71 deletions.
14 changes: 5 additions & 9 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func newStreamingConn(spec connect.Spec, reporter *Metrics) streamingConn {
callPackage, callMethod := procedureToPackageAndMethod(spec.Procedure)
conn := streamingConn{
startTime: time.Now(),
callType: steamTypeString(spec.StreamType),
callType: streamTypeString(spec.StreamType),
service: callPackage,
method: callMethod,
reporter: reporter,
Expand All @@ -41,21 +41,17 @@ func (conn *streamingConn) reportReceive(message any) {
}
}

func (conn *streamingConn) reportHandled(err error) {
errCode := codeOf(err)
conn.reporter.requestHandled.WithLabelValues(conn.callType, conn.service, conn.method, errCode).Inc()
conn.reporter.ReportHandledSeconds(conn.callType, conn.service, conn.method, errCode, time.Since(conn.startTime).Seconds())
}

type streamingClientConn struct {
connect.StreamingClientConn
streamingConn
onClose func(error)
}

func newStreamingClientConn(conn connect.StreamingClientConn, i *Interceptor) *streamingClientConn {
func newStreamingClientConn(conn connect.StreamingClientConn, i *Interceptor, onClose func(error)) *streamingClientConn {
return &streamingClientConn{
StreamingClientConn: conn,
streamingConn: newStreamingConn(conn.Spec(), i.client),
onClose: onClose,
}
}

Expand All @@ -74,7 +70,7 @@ func (conn *streamingClientConn) Receive(msg any) error {

func (conn *streamingClientConn) CloseResponse() error {
err := conn.StreamingClientConn.CloseResponse()
conn.reportHandled(err)
conn.onClose(err)
return err
}

Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,5 @@ require (
github.com/rogpeppe/go-internal v1.9.0 // indirect
golang.org/x/sys v0.5.0 // indirect
golang.org/x/text v0.7.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
10 changes: 3 additions & 7 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1m
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/getsentry/sentry-go v0.18.0 h1:MtBW5H9QgdcJabtZcuJG80BMOwaBpkRDZkxRkNC1sN0=
github.com/getsentry/sentry-go v0.18.0/go.mod h1:Kgon4Mby+FJ7ZWHFUAZgVaIa8sxHtnRJRLTXZr51aKQ=
github.com/go-errors/errors v1.4.2 h1:J6MZopCL4uSllY1OfXM374weqZFFItUbrImctkmUxIA=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
Expand Down Expand Up @@ -154,12 +155,10 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
Expand All @@ -172,6 +171,7 @@ github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3Rllmb
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
github.com/pingcap/errors v0.11.4 h1:lFuQV/oaUMGcD2tqt+01ROSmJs75VG1ToEOkZIZ4nE4=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down Expand Up @@ -216,7 +216,6 @@ github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSS
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
Expand Down Expand Up @@ -355,8 +354,6 @@ golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
Expand Down Expand Up @@ -499,15 +496,14 @@ google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqw
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Expand Down
85 changes: 61 additions & 24 deletions interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ import (
"strings"
"time"

"github.com/cockroachdb/errors"
"connectrpc.com/connect"
"github.com/cockroachdb/errors"
prom "github.com/prometheus/client_golang/prometheus"
"google.golang.org/protobuf/proto"
)

const (
Expand Down Expand Up @@ -40,7 +42,7 @@ func (i *Interceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc {
}

now := time.Now()
callType := steamTypeString(req.Spec().StreamType)
callType := streamTypeString(req.Spec().StreamType)
callPackage, callMethod := procedureToPackageAndMethod(req.Spec().Procedure)

var reporter *Metrics
Expand All @@ -50,47 +52,82 @@ func (i *Interceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc {
reporter = i.server
}

var code string
if reporter != nil {
reporter.ReportStarted(callType, callPackage, callMethod, req.Any())
var bytes *prom.CounterVec
if reporter.isClient {
bytes = reporter.bytesSent
} else {
bytes = reporter.bytesReceived
}
if bytes != nil {
bytes.WithLabelValues(callType, callPackage, callMethod).Add(float64(proto.Size(req.Any().(proto.Message))))
}
reporter.ReportStarted(callType, callPackage, callMethod)
defer reporter.ReportHandled(callType, callPackage, callMethod, code)
defer reporter.ReportHandledSeconds(callType, callPackage, callMethod, code, time.Since(now).Seconds())
}

resp, err := next(ctx, req)
code := codeOf(err)
var respMsg any
code = codeOf(err)
if err == nil {
respMsg = resp.Any()
}

if reporter != nil {
reporter.ReportHandled(callType, callPackage, callMethod, code, respMsg)
reporter.ReportHandledSeconds(callType, callPackage, callMethod, code, time.Since(now).Seconds())
var bytes *prom.CounterVec
if reporter.isClient {
bytes = reporter.bytesSent
} else {
bytes = reporter.bytesReceived
}
if bytes != nil {
bytes.WithLabelValues(callType, callPackage, callMethod).Add(float64(proto.Size(resp.Any().(proto.Message))))
}
}

return resp, err
})
}

func (i *Interceptor) WrapStreamingClient(next connect.StreamingClientFunc) connect.StreamingClientFunc {
return connect.StreamingClientFunc(func(ctx context.Context, s connect.Spec) connect.StreamingClientConn {
conn := next(ctx, s)
if i.client != nil {
conn = newStreamingClientConn(conn, i)
return connect.StreamingClientFunc(func(ctx context.Context, spec connect.Spec) connect.StreamingClientConn {
// Short-circuit, not configured to report for client.
if i.client == nil {
return next(ctx, spec)
}

now := time.Now()
callType := streamTypeString(spec.StreamType)
callPackage, callMethod := procedureToPackageAndMethod(spec.Procedure)

i.client.ReportStarted(callType, callPackage, callMethod)
onClose := func(err error) {
code := codeOf(err)
i.client.ReportHandled(callType, callPackage, callMethod, code)
i.client.ReportHandledSeconds(callType, callPackage, callMethod, code, time.Since(now).Seconds())
}
return conn

conn := next(ctx, spec)
return newStreamingClientConn(conn, i, onClose)
})
}

func (i *Interceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc) connect.StreamingHandlerFunc {
return connect.StreamingHandlerFunc(func(ctx context.Context, shc connect.StreamingHandlerConn) error {
var newShc *streamingHandlerConn
if i.server != nil {
newShc = newStreamingHandlerConn(shc, i)
shc = newShc
// Short-circuit, not configured to report for server.
if i.server == nil {
return next(ctx, shc)
}

now := time.Now()
callType := streamTypeString(shc.Spec().StreamType)
callPackage, callMethod := procedureToPackageAndMethod(shc.Spec().Procedure)

var code string
i.server.ReportStarted(callType, callPackage, callMethod)
defer i.server.ReportHandled(callType, callPackage, callMethod, code)
defer i.server.ReportHandledSeconds(callType, callPackage, callMethod, code, time.Since(now).Seconds())

shc = newStreamingHandlerConn(shc, i)
err := next(ctx, shc)
if newShc != nil {
newShc.reportHandled(err)
}
code = codeOf(err)
return err
})
}
Expand All @@ -104,7 +141,7 @@ func procedureToPackageAndMethod(procedure string) (string, string) {
return "unknown", "unknown"
}

func steamTypeString(st connect.StreamType) string {
func streamTypeString(st connect.StreamType) string {
switch st {
case connect.StreamTypeUnary:
return "unary"
Expand Down
73 changes: 43 additions & 30 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package connect_go_prometheus

import (
prom "github.com/prometheus/client_golang/prometheus"
"google.golang.org/protobuf/proto"
)

var (
Expand Down Expand Up @@ -164,6 +163,16 @@ func NewClientMetrics(opts ...MetricsOption) *Metrics {
}, []string{"type", "service", "method"})
}

if config.withInflightMetrics {
m.inflightRequests = prom.NewGaugeVec(prom.GaugeOpts{
Namespace: config.namespace,
Subsystem: config.subsystem,
ConstLabels: config.constLabels,
Name: "inflight_requests",
Help: "Current number of inflight RPCs",
}, []string{"type", "service", "method"})
}

return m
}

Expand All @@ -178,6 +187,7 @@ type Metrics struct {
streamMsgReceived *prom.CounterVec
bytesSent *prom.CounterVec
bytesReceived *prom.CounterVec
inflightRequests *prom.GaugeVec
}

// Describe implements Describe as required by prom.Collector
Expand All @@ -189,6 +199,15 @@ func (m *Metrics) Describe(c chan<- *prom.Desc) {
}
m.streamMsgSent.Describe(c)
m.streamMsgReceived.Describe(c)
if m.bytesSent != nil {
m.bytesSent.Describe(c)
}
if m.bytesReceived != nil {
m.bytesReceived.Describe(c)
}
if m.inflightRequests != nil {
m.inflightRequests.Describe(c)
}
}

// Collect implements collect as required by prom.Collector
Expand All @@ -200,41 +219,28 @@ func (m *Metrics) Collect(c chan<- prom.Metric) {
}
m.streamMsgSent.Collect(c)
m.streamMsgReceived.Collect(c)
if m.bytesSent != nil {
m.bytesSent.Collect(c)
}
if m.bytesReceived != nil {
m.bytesReceived.Collect(c)
}
if m.inflightRequests != nil {
m.inflightRequests.Collect(c)
}
}

func (m *Metrics) ReportStarted(callType, service, method string, message any) {
func (m *Metrics) ReportStarted(callType, service, method string) {
m.requestStarted.WithLabelValues(callType, service, method).Inc()
var streamMsg *prom.CounterVec
var bytes *prom.CounterVec
if m.isClient {
streamMsg = m.streamMsgSent
bytes = m.bytesSent
} else {
streamMsg = m.streamMsgReceived
bytes = m.bytesReceived
}
streamMsg.WithLabelValues(callType, service, method).Inc()
if bytes != nil {
bytes.WithLabelValues(callType, service, method).Add(float64(proto.Size(message.(proto.Message))))
if m.inflightRequests != nil {
m.inflightRequests.WithLabelValues(callType, service, method).Inc()
}
}

func (m *Metrics) ReportHandled(callType, service, method, code string, message any) {
func (m *Metrics) ReportHandled(callType, service, method, code string) {
m.requestHandled.WithLabelValues(callType, service, method, code).Inc()
if code == CodeOk {
var streamMsg *prom.CounterVec
var bytes *prom.CounterVec
if m.isClient {
streamMsg = m.streamMsgReceived
bytes = m.bytesReceived
} else {
streamMsg = m.streamMsgSent
bytes = m.bytesSent
}
streamMsg.WithLabelValues(callType, service, method).Inc()
if bytes != nil {
bytes.WithLabelValues(callType, service, method).Add(float64(proto.Size(message.(proto.Message))))
}
if m.inflightRequests != nil {
m.inflightRequests.WithLabelValues(callType, service, method).Dec()
}
}

Expand All @@ -261,7 +267,8 @@ type metricsOptions struct {

constLabels prom.Labels

withByteMetrics bool
withByteMetrics bool
withInflightMetrics bool
}

type MetricsOption func(opts *metricsOptions)
Expand Down Expand Up @@ -302,6 +309,12 @@ func WithByteMetrics(enabled bool) MetricsOption {
}
}

func WithInflightMetrics(enabled bool) MetricsOption {
return func(opts *metricsOptions) {
opts.withInflightMetrics = enabled
}
}

func evaluateMetricsOptions(defaults *metricsOptions, opts ...MetricsOption) *metricsOptions {
for _, opt := range opts {
opt(defaults)
Expand Down

0 comments on commit c48b82d

Please sign in to comment.