Skip to content

Commit

Permalink
improve tests
Browse files Browse the repository at this point in the history
  • Loading branch information
njiang747 committed Jun 12, 2024
1 parent c48b82d commit 5cd57b5
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 35 deletions.
12 changes: 8 additions & 4 deletions interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,10 @@ func (i *Interceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc {
bytes.WithLabelValues(callType, callPackage, callMethod).Add(float64(proto.Size(req.Any().(proto.Message))))
}
reporter.ReportStarted(callType, callPackage, callMethod)
defer reporter.ReportHandled(callType, callPackage, callMethod, code)
defer reporter.ReportHandledSeconds(callType, callPackage, callMethod, code, time.Since(now).Seconds())
defer func() { reporter.ReportHandled(callType, callPackage, callMethod, code) }()
defer func() {
reporter.ReportHandledSeconds(callType, callPackage, callMethod, code, time.Since(now).Seconds())
}()
}

resp, err := next(ctx, req)
Expand Down Expand Up @@ -122,8 +124,10 @@ func (i *Interceptor) WrapStreamingHandler(next connect.StreamingHandlerFunc) co

var code string
i.server.ReportStarted(callType, callPackage, callMethod)
defer i.server.ReportHandled(callType, callPackage, callMethod, code)
defer i.server.ReportHandledSeconds(callType, callPackage, callMethod, code, time.Since(now).Seconds())
defer func() { i.server.ReportHandled(callType, callPackage, callMethod, code) }()
defer func() {
i.server.ReportHandledSeconds(callType, callPackage, callMethod, code, time.Since(now).Seconds())
}()

shc = newStreamingHandlerConn(shc, i)
err := next(ctx, shc)
Expand Down
89 changes: 87 additions & 2 deletions interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package connect_go_prometheus

import (
"context"
"fmt"
"net/http"
"net/http/httptest"
"testing"
Expand All @@ -17,6 +18,8 @@ import (
var (
testMetricOptions = []MetricsOption{
WithHistogram(true),
WithByteMetrics(true),
WithInflightMetrics(true),
WithNamespace("namespace"),
WithSubsystem("subsystem"),
WithConstLabels(prom.Labels{"component": "foo"}),
Expand All @@ -41,6 +44,7 @@ func createClientAndStreamRequest(t *testing.T, srv *httptest.Server, intercepto
Name: "eliza",
}))
require.NoError(t, err)
defer stream.Close()
require.False(t, stream.Receive())
err = stream.Err()
require.Error(t, err)
Expand All @@ -66,16 +70,47 @@ func TestInterceptor_WithClient_WithServer_Histogram(t *testing.T) {
"namespace_subsystem_connect_client_handled_seconds",
"namespace_subsystem_connect_client_handled_total",
"namespace_subsystem_connect_client_started_total",
"namespace_subsystem_connect_client_bytes_sent_total",
"namespace_subsystem_connect_client_inflight_requests",

"namespace_subsystem_connect_server_handled_seconds",
"namespace_subsystem_connect_server_handled_total",
"namespace_subsystem_connect_server_started_total",
"namespace_subsystem_connect_server_bytes_received_total",
"namespace_subsystem_connect_server_inflight_requests",
}
count, err := testutil.GatherAndCount(reg, expectedMetrics...)
require.NoError(t, err)
require.Equal(t, len(expectedMetrics), count)

clientMetrics.Reset()
serverMetrics.Reset()

createClientAndStreamRequest(t, srv, interceptor)

expectedMetrics = []string{
"namespace_subsystem_connect_client_handled_seconds",
"namespace_subsystem_connect_client_handled_total",
"namespace_subsystem_connect_client_started_total",
"namespace_subsystem_connect_client_msg_sent_total",
"namespace_subsystem_connect_client_bytes_sent_total",
"namespace_subsystem_connect_client_inflight_requests",

"namespace_subsystem_connect_server_handled_seconds",
"namespace_subsystem_connect_server_handled_total",
"namespace_subsystem_connect_server_started_total",
"namespace_subsystem_connect_server_msg_received_total",
"namespace_subsystem_connect_server_bytes_received_total",
"namespace_subsystem_connect_server_inflight_requests",
}
metrics, err := reg.Gather()
require.NoError(t, err)
for _, metric := range metrics {
fmt.Printf("%v\n", *metric.Name)
}
count, err = testutil.GatherAndCount(reg, expectedMetrics...)
require.NoError(t, err)
require.Equal(t, len(expectedMetrics), count)
}

func TestInterceptor_Default(t *testing.T) {
Expand Down Expand Up @@ -116,16 +151,41 @@ func TestInterceptor_WithClientMetrics(t *testing.T) {
"namespace_subsystem_connect_client_handled_seconds",
"namespace_subsystem_connect_client_handled_total",
"namespace_subsystem_connect_client_started_total",
"namespace_subsystem_connect_client_bytes_sent_total",
"namespace_subsystem_connect_client_inflight_requests",

"namespace_subsystem_connect_server_handled_seconds",
"namespace_subsystem_connect_server_handled_total",
"namespace_subsystem_connect_server_started_total",
"namespace_subsystem_connect_server_bytes_received_total",
"namespace_subsystem_connect_server_inflight_requests",
}
count, err := testutil.GatherAndCount(reg, possibleMetrics...)
require.NoError(t, err)
require.Equal(t, 3, count, "must report only 3 metrics, as server side is disabled")
require.Equal(t, 5, count, "must report only client-side metrics, as server-side is disabled")

clientMetrics.Reset()

createClientAndStreamRequest(t, srv, interceptor)

possibleMetrics = []string{
"namespace_subsystem_connect_client_handled_seconds",
"namespace_subsystem_connect_client_handled_total",
"namespace_subsystem_connect_client_started_total",
"namespace_subsystem_connect_client_msg_sent_total",
"namespace_subsystem_connect_client_bytes_sent_total",
"namespace_subsystem_connect_client_inflight_requests",

"namespace_subsystem_connect_server_handled_seconds",
"namespace_subsystem_connect_server_handled_total",
"namespace_subsystem_connect_server_started_total",
"namespace_subsystem_connect_server_msg_received_total",
"namespace_subsystem_connect_server_bytes_received_total",
"namespace_subsystem_connect_server_inflight_requests",
}
count, err = testutil.GatherAndCount(reg, possibleMetrics...)
require.NoError(t, err)
require.Equal(t, 6, count, "must report only client-side metrics, as server-side is disabled")
}

func TestInterceptor_WithServerMetrics(t *testing.T) {
Expand All @@ -144,14 +204,39 @@ func TestInterceptor_WithServerMetrics(t *testing.T) {
"namespace_subsystem_connect_client_handled_seconds",
"namespace_subsystem_connect_client_handled_total",
"namespace_subsystem_connect_client_started_total",
"namespace_subsystem_connect_client_bytes_sent_total",
"namespace_subsystem_connect_client_inflight_requests",

"namespace_subsystem_connect_server_handled_seconds",
"namespace_subsystem_connect_server_handled_total",
"namespace_subsystem_connect_server_started_total",
"namespace_subsystem_connect_server_bytes_received_total",
"namespace_subsystem_connect_server_inflight_requests",
}
count, err := testutil.GatherAndCount(reg, possibleMetrics...)
require.NoError(t, err)
require.Equal(t, 3, count, "must report only server side metrics, client-side is disabled")
require.Equal(t, 5, count, "must report only server-side metrics, client-side is disabled")

serverMetrics.Reset()

createClientAndStreamRequest(t, srv, interceptor)

possibleMetrics = []string{
"namespace_subsystem_connect_client_handled_seconds",
"namespace_subsystem_connect_client_handled_total",
"namespace_subsystem_connect_client_started_total",
"namespace_subsystem_connect_client_msg_sent_total",
"namespace_subsystem_connect_client_bytes_sent_total",
"namespace_subsystem_connect_client_inflight_requests",

"namespace_subsystem_connect_server_handled_seconds",
"namespace_subsystem_connect_server_handled_total",
"namespace_subsystem_connect_server_started_total",
"namespace_subsystem_connect_server_msg_received_total",
"namespace_subsystem_connect_server_bytes_received_total",
"namespace_subsystem_connect_server_inflight_requests",
}
count, err = testutil.GatherAndCount(reg, possibleMetrics...)
require.NoError(t, err)
require.Equal(t, 6, count, "must report only server-side metrics, client-side is disabled")
}
38 changes: 35 additions & 3 deletions metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func NewServerMetrics(opts ...MetricsOption) *Metrics {
streamMsgReceivedName: "connect_server_msg_received_total",
bytesSentName: "connect_server_bytes_sent_total",
bytesReceivedName: "connect_server_bytes_received_total",
inflightRequestsName: "connect_server_inflight_requests",
}, opts...)

m := &Metrics{
Expand Down Expand Up @@ -88,6 +89,16 @@ func NewServerMetrics(opts ...MetricsOption) *Metrics {
}, []string{"type", "service", "method"})
}

if config.withInflightMetrics {
m.inflightRequests = prom.NewGaugeVec(prom.GaugeOpts{
Namespace: config.namespace,
Subsystem: config.subsystem,
ConstLabels: config.constLabels,
Name: config.inflightRequestsName,
Help: "Current number of inflight RPCs server-side",
}, []string{"type", "service", "method"})
}

return m
}

Expand All @@ -98,9 +109,10 @@ func NewClientMetrics(opts ...MetricsOption) *Metrics {
requestHandledName: "connect_client_handled_total",
requestHandledSecondsName: "connect_client_handled_seconds",
streamMsgSentName: "connect_client_msg_sent_total",
streamMsgReceivedName: "connect_client_msg_recieved_total",
streamMsgReceivedName: "connect_client_msg_received_total",
bytesSentName: "connect_client_bytes_sent_total",
bytesReceivedName: "connect_client_bytes_received_total",
inflightRequestsName: "connect_client_inflight_requests",
}, opts...)

m := &Metrics{
Expand Down Expand Up @@ -168,8 +180,8 @@ func NewClientMetrics(opts ...MetricsOption) *Metrics {
Namespace: config.namespace,
Subsystem: config.subsystem,
ConstLabels: config.constLabels,
Name: "inflight_requests",
Help: "Current number of inflight RPCs",
Name: config.inflightRequestsName,
Help: "Current number of inflight RPCs client-side",
}, []string{"type", "service", "method"})
}

Expand All @@ -190,6 +202,25 @@ type Metrics struct {
inflightRequests *prom.GaugeVec
}

func (m *Metrics) Reset() {
m.requestStarted.Reset()
m.requestHandled.Reset()
m.streamMsgSent.Reset()
m.streamMsgReceived.Reset()
if m.requestHandledSeconds != nil {
m.requestHandledSeconds.Reset()
}
if m.bytesSent != nil {
m.bytesSent.Reset()
}
if m.bytesReceived != nil {
m.bytesReceived.Reset()
}
if m.inflightRequests != nil {
m.inflightRequests.Reset()
}
}

// Describe implements Describe as required by prom.Collector
func (m *Metrics) Describe(c chan<- *prom.Desc) {
m.requestStarted.Describe(c)
Expand Down Expand Up @@ -264,6 +295,7 @@ type metricsOptions struct {
streamMsgReceivedName string
bytesSentName string
bytesReceivedName string
inflightRequestsName string

constLabels prom.Labels

Expand Down
82 changes: 56 additions & 26 deletions metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ func TestServerMetrics(t *testing.T) {
reg := prom.NewRegistry()
sm := NewServerMetrics(
WithHistogram(true),
WithByteMetrics(true),
WithInflightMetrics(true),
WithNamespace("namespace"),
WithSubsystem("subsystem"),
WithConstLabels(prom.Labels{"component": "foo"}),
Expand All @@ -38,11 +40,25 @@ func TestServerMetrics(t *testing.T) {
msgReceived.Inc()
require.EqualValues(t, float64(1), testutil.ToFloat64(msgReceived))

if sm.requestHandledSeconds != nil {
handledHist := sm.requestHandledSeconds.WithLabelValues("unary", greetconnect.GreetServiceName, "Greet", connect.CodeAborted.String())
handledHist.Observe(1)

err := testutil.CollectAndCompare(sm.requestHandledSeconds, strings.NewReader(`
require.NotNil(t, sm.bytesSent)
bytesSent := sm.bytesSent.WithLabelValues("unary", greetconnect.GreetServiceName, "Greet")
bytesSent.Inc()
require.EqualValues(t, float64(1), testutil.ToFloat64(bytesSent))

require.NotNil(t, sm.bytesReceived)
bytesReceived := sm.bytesReceived.WithLabelValues("unary", greetconnect.GreetServiceName, "Greet")
bytesReceived.Inc()
require.EqualValues(t, float64(1), testutil.ToFloat64(bytesReceived))

require.NotNil(t, sm.inflightRequests)
inflight := sm.inflightRequests.WithLabelValues("unary", greetconnect.GreetServiceName, "Greet")
inflight.Inc()
require.EqualValues(t, float64(1), testutil.ToFloat64(inflight))

require.NotNil(t, sm.requestHandledSeconds)
handledHist := sm.requestHandledSeconds.WithLabelValues("unary", greetconnect.GreetServiceName, "Greet", connect.CodeAborted.String())
handledHist.Observe(1)
err := testutil.CollectAndCompare(sm.requestHandledSeconds, strings.NewReader(`
# HELP namespace_subsystem_connect_server_handled_seconds Histogram of RPCs handled server-side
# TYPE namespace_subsystem_connect_server_handled_seconds histogram
namespace_subsystem_connect_server_handled_seconds_bucket{code="aborted",component="foo",method="Greet",service="greet.v1.GreetService",type="unary",le="0.5"} 0
Expand All @@ -52,14 +68,15 @@ func TestServerMetrics(t *testing.T) {
namespace_subsystem_connect_server_handled_seconds_sum{code="aborted",component="foo",method="Greet",service="greet.v1.GreetService",type="unary"} 1
namespace_subsystem_connect_server_handled_seconds_count{code="aborted",component="foo",method="Greet",service="greet.v1.GreetService",type="unary"} 1
`))
require.NoError(t, err)
}
require.NoError(t, err)
}

func TestClientMetrics(t *testing.T) {
reg := prom.NewRegistry()
cm := NewClientMetrics(
WithHistogram(true),
WithByteMetrics(true),
WithInflightMetrics(true),
WithNamespace("namespace"),
WithSubsystem("subsystem"),
WithConstLabels(prom.Labels{"component": "foo"}),
Expand All @@ -75,28 +92,41 @@ func TestClientMetrics(t *testing.T) {
msgSent.Inc()
require.EqualValues(t, float64(1), testutil.ToFloat64(msgSent))

msgRecieved := cm.streamMsgReceived.WithLabelValues("unary", greetconnect.GreetServiceName, "Greet")
msgRecieved.Inc()
require.EqualValues(t, float64(1), testutil.ToFloat64(msgRecieved))
msgreceived := cm.streamMsgReceived.WithLabelValues("unary", greetconnect.GreetServiceName, "Greet")
msgreceived.Inc()
require.EqualValues(t, float64(1), testutil.ToFloat64(msgreceived))

handled := cm.requestHandled.WithLabelValues("unary", greetconnect.GreetServiceName, "Greet", connect.CodeAborted.String())
handled.Inc()
require.EqualValues(t, 1, testutil.ToFloat64(handled))

if cm.requestHandledSeconds != nil {
handledHist := cm.requestHandledSeconds.WithLabelValues("unary", greetconnect.GreetServiceName, "Greet", connect.CodeAborted.String())
handledHist.Observe(1)

err := testutil.CollectAndCompare(cm.requestHandledSeconds, strings.NewReader(`
# HELP namespace_subsystem_connect_client_handled_seconds Histogram of RPCs handled client-side
# TYPE namespace_subsystem_connect_client_handled_seconds histogram
namespace_subsystem_connect_client_handled_seconds_bucket{code="aborted",component="foo",method="Greet",service="greet.v1.GreetService",type="unary",le="0.5"} 0
namespace_subsystem_connect_client_handled_seconds_bucket{code="aborted",component="foo",method="Greet",service="greet.v1.GreetService",type="unary",le="1"} 1
namespace_subsystem_connect_client_handled_seconds_bucket{code="aborted",component="foo",method="Greet",service="greet.v1.GreetService",type="unary",le="1.5"} 1
namespace_subsystem_connect_client_handled_seconds_bucket{code="aborted",component="foo",method="Greet",service="greet.v1.GreetService",type="unary",le="Inf"} 1
namespace_subsystem_connect_client_handled_seconds_sum{code="aborted",component="foo",method="Greet",service="greet.v1.GreetService",type="unary"} 1
namespace_subsystem_connect_client_handled_seconds_count{code="aborted",component="foo",method="Greet",service="greet.v1.GreetService",type="unary"} 1
`))
require.NoError(t, err)
}
require.NotNil(t, cm.bytesSent)
bytesSent := cm.bytesSent.WithLabelValues("unary", greetconnect.GreetServiceName, "Greet")
bytesSent.Inc()
require.EqualValues(t, float64(1), testutil.ToFloat64(bytesSent))

require.NotNil(t, cm.bytesReceived)
bytesReceived := cm.bytesReceived.WithLabelValues("unary", greetconnect.GreetServiceName, "Greet")
bytesReceived.Inc()
require.EqualValues(t, float64(1), testutil.ToFloat64(bytesReceived))

require.NotNil(t, cm.inflightRequests)
inflight := cm.inflightRequests.WithLabelValues("unary", greetconnect.GreetServiceName, "Greet")
inflight.Inc()
require.EqualValues(t, float64(1), testutil.ToFloat64(inflight))

require.NotNil(t, cm.requestHandledSeconds)
handledHist := cm.requestHandledSeconds.WithLabelValues("unary", greetconnect.GreetServiceName, "Greet", connect.CodeAborted.String())
handledHist.Observe(1)
err := testutil.CollectAndCompare(cm.requestHandledSeconds, strings.NewReader(`
# HELP namespace_subsystem_connect_client_handled_seconds Histogram of RPCs handled client-side
# TYPE namespace_subsystem_connect_client_handled_seconds histogram
namespace_subsystem_connect_client_handled_seconds_bucket{code="aborted",component="foo",method="Greet",service="greet.v1.GreetService",type="unary",le="0.5"} 0
namespace_subsystem_connect_client_handled_seconds_bucket{code="aborted",component="foo",method="Greet",service="greet.v1.GreetService",type="unary",le="1"} 1
namespace_subsystem_connect_client_handled_seconds_bucket{code="aborted",component="foo",method="Greet",service="greet.v1.GreetService",type="unary",le="1.5"} 1
namespace_subsystem_connect_client_handled_seconds_bucket{code="aborted",component="foo",method="Greet",service="greet.v1.GreetService",type="unary",le="Inf"} 1
namespace_subsystem_connect_client_handled_seconds_sum{code="aborted",component="foo",method="Greet",service="greet.v1.GreetService",type="unary"} 1
namespace_subsystem_connect_client_handled_seconds_count{code="aborted",component="foo",method="Greet",service="greet.v1.GreetService",type="unary"} 1
`))
require.NoError(t, err)
}

0 comments on commit 5cd57b5

Please sign in to comment.