Skip to content

Commit

Permalink
🌱 add latency metrics for resource resync. (#76)
Browse files Browse the repository at this point in the history
* add metrics for resource resync.

Signed-off-by: morvencao <[email protected]>

* add doc and test.

Signed-off-by: morvencao <[email protected]>

---------

Signed-off-by: morvencao <[email protected]>
  • Loading branch information
morvencao authored Sep 6, 2024
1 parent 6af5c44 commit 3e84658
Show file tree
Hide file tree
Showing 178 changed files with 31,706 additions and 1 deletion.
7 changes: 7 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ require (
github.com/onsi/gomega v1.32.0
github.com/openshift/build-machinery-go v0.0.0-20240419090851-af9c868bcf52
github.com/openshift/library-go v0.0.0-20240621150525-4bb4238aef81
github.com/prometheus/client_golang v1.18.0
github.com/prometheus/client_model v0.5.0
github.com/stretchr/testify v1.8.4
golang.org/x/oauth2 v0.16.0
google.golang.org/grpc v1.62.1
Expand All @@ -37,6 +39,8 @@ require (
require (
cloud.google.com/go/compute v1.23.3 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/evanphx/json-patch/v5 v5.9.0 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
Expand All @@ -52,13 +56,16 @@ require (
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
github.com/moby/term v0.0.0-20221205130635-1aeaba878587 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/common v0.45.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/rs/xid v1.4.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
go.uber.org/multierr v1.11.0 // indirect
Expand Down
1 change: 0 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ github.com/magiconair/properties v1.8.6 h1:5ibWZ6iY0NctNGWo87LalDlEZ6R41TqbbDamh
github.com/magiconair/properties v1.8.6/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60=
github.com/mailru/easyjson v0.7.7 h1:UGYAvKxe3sBsEDzO8ZeWOSlIQfWFlxbzLZe7hwFURr0=
github.com/mailru/easyjson v0.7.7/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k=
github.com/moby/sys/mount v0.3.3 h1:fX1SVkXFJ47XWDoeFW4Sq7PdQJnV2QIDZAqjNqgEjUs=
Expand Down
3 changes: 3 additions & 0 deletions pkg/cloudevents/generic/agentclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strconv"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"

Expand Down Expand Up @@ -163,9 +164,11 @@ func (c *CloudEventAgentClient[T]) receive(ctx context.Context, evt cloudevents.
return
}

startTime := time.Now()
if err := c.respondResyncStatusRequest(ctx, eventType.CloudEventsDataType, evt); err != nil {
klog.Errorf("failed to resync manifestsstatus, %v", err)
}
updateResourceStatusResyncDurationMetric(evt.Source(), c.clusterName, eventType.CloudEventsDataType.String(), startTime)

return
}
Expand Down
138 changes: 138 additions & 0 deletions pkg/cloudevents/generic/metrics_collector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package generic

import (
"time"

"github.com/prometheus/client_golang/prometheus"
)

// Subsystem used to define the metrics:
const metricsSubsystem = "resources"

// Names of the labels added to metrics:
const (
metricsSourceLabel = "source"
metricsClusterLabel = "cluster"
metrucsDataTypeLabel = "type"
)

// metricsLabels - Array of labels added to metrics:
var metricsLabels = []string{
metricsSourceLabel, // source
metricsClusterLabel, // cluster
metrucsDataTypeLabel, // resource type
}

// Names of the metrics:
const (
specResyncDurationMetric = "spec_resync_duration_seconds"
statusResyncDurationMetric = "status_resync_duration_seconds"
)

// The resource spec resync duration metric is a histogram with a base metric name of 'resource_spec_resync_duration_second'
// exposes multiple time series during a scrape:
// 1. cumulative counters for the observation buckets, exposed as 'resource_spec_resync_duration_seconds_bucket{le="<upper inclusive bound>"}'
// 2. the total sum of all observed values, exposed as 'resource_spec_resync_duration_seconds_sum'
// 3. the count of events that have been observed, exposed as 'resource_spec_resync_duration_seconds_count' (identical to 'resource_spec_resync_duration_seconds_bucket{le="+Inf"}' above)
// For example, 2 resource spec resync for manifests type that have been observed, one taking 0.5s and the other taking 0.7s, would result in the following metrics:
// resource_spec_resync_duration_seconds_bucket{source="source1",cluster="cluster1",type="io.open-cluster-management.works.v1alpha1.manifests",le="0.1"} 0
// resource_spec_resync_duration_seconds_bucket{source="source1",cluster="cluster1",type="io.open-cluster-management.works.v1alpha1.manifests",le="0.2"} 0
// resource_spec_resync_duration_seconds_bucket{source="source1",cluster="cluster1",type="io.open-cluster-management.works.v1alpha1.manifests",le="0.5"} 1
// resource_spec_resync_duration_seconds_bucket{source="source1",cluster="cluster1",type="io.open-cluster-management.works.v1alpha1.manifests",le="1.0"} 2
// resource_spec_resync_duration_seconds_bucket{source="source1",cluster="cluster1",type="io.open-cluster-management.works.v1alpha1.manifests",le="2.0"} 2
// resource_spec_resync_duration_seconds_bucket{source="source1",cluster="cluster1",type="io.open-cluster-management.works.v1alpha1.manifests",le="10.0"} 2
// resource_spec_resync_duration_seconds_bucket{source="source1",cluster="cluster1",type="io.open-cluster-management.works.v1alpha1.manifests",le="30.0"} 2
// resource_spec_resync_duration_seconds_bucket{source="source1",cluster="cluster1",type="io.open-cluster-management.works.v1alpha1.manifests",le="+Inf"} 2
// resource_spec_resync_duration_seconds_sum{source="source1",cluster="cluster1",type="io.open-cluster-management.works.v1alpha1.manifests"} 1.2
// resource_spec_resync_duration_seconds_count{source="source1",cluster="cluster1",type="io.open-cluster-management.works.v1alpha1.manifests"} 2
var resourceSpecResyncDurationMetric = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: metricsSubsystem,
Name: specResyncDurationMetric,
Help: "The duration of the resource spec resync in seconds.",
Buckets: []float64{
0.1,
0.2,
0.5,
1.0,
2.0,
10.0,
30.0,
},
},
metricsLabels,
)

// The resource status resync duration metric is a histogram with a base metric name of 'resource_status_resync_duration_second'
// exposes multiple time series during a scrape:
// 1. cumulative counters for the observation buckets, exposed as 'resource_status_resync_duration_seconds_bucket{le="<upper inclusive bound>"}'
// 2. the total sum of all observed values, exposed as 'resource_status_resync_duration_seconds_sum'
// 3. the count of events that have been observed, exposed as 'resource_status_resync_duration_seconds_count' (identical to 'resource_status_resync_duration_seconds_bucket{le="+Inf"}' above)
// For example, 2 resource status resync for manifestbundles type that have been observed, one taking 0.5s and the other taking 1.1s, would result in the following metrics:
// resource_status_resync_duration_seconds_bucket{source="source1",cluster="cluster1",type="io.open-cluster-management.works.v1alpha1.manifestbundles",le="0.1"} 0
// resource_status_resync_duration_seconds_bucket{source="source1",cluster="cluster1",type="io.open-cluster-management.works.v1alpha1.manifestbundles",le="0.2"} 0
// resource_status_resync_duration_seconds_bucket{source="source1",cluster="cluster1",type="io.open-cluster-management.works.v1alpha1.manifestbundles",le="0.5"} 1
// resource_status_resync_duration_seconds_bucket{source="source1",cluster="cluster1",type="io.open-cluster-management.works.v1alpha1.manifestbundles",le="1.0"} 1
// resource_status_resync_duration_seconds_bucket{source="source1",cluster="cluster1",type="io.open-cluster-management.works.v1alpha1.manifestbundles",le="2.0"} 2
// resource_status_resync_duration_seconds_bucket{source="source1",cluster="cluster1",type="io.open-cluster-management.works.v1alpha1.manifestbundles",le="10.0"} 2
// resource_status_resync_duration_seconds_bucket{source="source1",cluster="cluster1",type="io.open-cluster-management.works.v1alpha1.manifestbundles",le="30.0"} 2
// resource_status_resync_duration_seconds_bucket{source="source1",cluster="cluster1",type="io.open-cluster-management.works.v1alpha1.manifestbundles",le="+Inf"} 2
// resource_status_resync_duration_seconds_sum{source="source1",cluster="cluster1",type="io.open-cluster-management.works.v1alpha1.manifestbundles"} 1.6
// resource_status_resync_duration_seconds_count{source="source1",cluster="cluster1",type="io.open-cluster-management.works.v1alpha1.manifestbundles"} 2
var resourceStatusResyncDurationMetric = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Subsystem: metricsSubsystem,
Name: statusResyncDurationMetric,
Help: "The duration of the resource status resync in seconds.",
Buckets: []float64{
0.1,
0.2,
0.5,
1.0,
2.0,
10.0,
30.0,
},
},
metricsLabels,
)

// Register the metrics:
func RegisterResourceResyncMetrics() {
prometheus.MustRegister(resourceSpecResyncDurationMetric)
prometheus.MustRegister(resourceStatusResyncDurationMetric)
}

// Unregister the metrics:
func UnregisterResourceResyncMetrics() {
prometheus.Unregister(resourceStatusResyncDurationMetric)
prometheus.Unregister(resourceStatusResyncDurationMetric)
}

// ResetResourceResyncMetricsCollectors resets all collectors
func ResetResourceResyncMetricsCollectors() {
resourceSpecResyncDurationMetric.Reset()
resourceStatusResyncDurationMetric.Reset()
}

// updateResourceSpecResyncDurationMetric updates the resource spec resync duration metric:
func updateResourceSpecResyncDurationMetric(source, cluster, dataType string, startTime time.Time) {
labels := prometheus.Labels{
metricsSourceLabel: source,
metricsClusterLabel: cluster,
metrucsDataTypeLabel: dataType,
}
duration := time.Since(startTime)
resourceSpecResyncDurationMetric.With(labels).Observe(duration.Seconds())
}

// updateResourceStatusResyncDurationMetric updates the resource status resync duration metric:
func updateResourceStatusResyncDurationMetric(source, cluster, dataType string, startTime time.Time) {
labels := prometheus.Labels{
metricsSourceLabel: source,
metricsClusterLabel: cluster,
metrucsDataTypeLabel: dataType,
}
duration := time.Since(startTime)
resourceStatusResyncDurationMetric.With(labels).Observe(duration.Seconds())
}
166 changes: 166 additions & 0 deletions pkg/cloudevents/generic/metrics_collector_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
package generic

import (
"context"
"fmt"
"testing"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"
"github.com/cloudevents/sdk-go/v2/protocol/gochan"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/require"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/options/fake"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/payload"
"open-cluster-management.io/sdk-go/pkg/cloudevents/generic/types"
)

type testResyncType string

const (
testSpecResync testResyncType = "spec"
testStatusResync testResyncType = "status"
)

func TestResyncMetrics(t *testing.T) {
cases := []struct {
name string
rescType testResyncType
clusterName string
sourceID string
dataType types.CloudEventsDataType
}{
{
name: "resync spec",
rescType: testSpecResync,
clusterName: "cluster1",
sourceID: "source1",
dataType: mockEventDataType,
},
{
name: "resync status",
rescType: testStatusResync,
clusterName: "cluster1",
sourceID: "source1",
dataType: mockEventDataType,
},
}

// register metrics
RegisterResourceResyncMetrics()
// unregister metrics
defer UnregisterResourceResyncMetrics()
for _, c := range cases {
// reset metrics
ResetResourceResyncMetricsCollectors()
// run test
t.Run(c.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())

if c.rescType == testSpecResync {
sourceOptions := fake.NewSourceOptions(gochan.New(), c.sourceID)
lister := newMockResourceLister([]*mockResource{}...)
source, err := NewCloudEventSourceClient[*mockResource](ctx, sourceOptions, lister, statusHash, newMockResourceCodec())
require.NoError(t, err)

eventType := types.CloudEventsType{
CloudEventsDataType: c.dataType,
SubResource: types.SubResourceSpec,
Action: types.ResyncRequestAction,
}
evt := cloudevents.NewEvent()
evt.SetType(eventType.String())
evt.SetExtension("clustername", c.clusterName)
if err := evt.SetData(cloudevents.ApplicationJSON, &payload.ResourceStatusHashList{}); err != nil {
t.Errorf("failed to set data for event: %v", err)
}

// receive resync request and publish associated resources
source.receive(ctx, evt)
// wait 1 seconds to respond to the resync request
time.Sleep(2 * time.Second)

// check spec resync duration metric as a histogram
h := resourceSpecResyncDurationMetric.WithLabelValues(c.sourceID, c.clusterName, mockEventDataType.String())
count, sum := toFloat64HistCountAndSum(h)
require.Equal(t, uint64(1), count)
require.Greater(t, sum, 0.0)
require.Less(t, sum, 1.0)
}

if c.rescType == testStatusResync {
agentOptions := fake.NewAgentOptions(gochan.New(), c.clusterName, testAgentName)
lister := newMockResourceLister([]*mockResource{}...)
agent, err := NewCloudEventAgentClient[*mockResource](ctx, agentOptions, lister, statusHash, newMockResourceCodec())
require.NoError(t, err)

eventType := types.CloudEventsType{
CloudEventsDataType: c.dataType,
SubResource: types.SubResourceStatus,
Action: types.ResyncRequestAction,
}
evt := cloudevents.NewEvent()
evt.SetType(eventType.String())
evt.SetSource(c.sourceID)
if err := evt.SetData(cloudevents.ApplicationJSON, &payload.ResourceStatusHashList{}); err != nil {
t.Errorf("failed to set data for event: %v", err)
}

// receive resync request and publish associated resources
agent.receive(ctx, evt)
// wait 1 seconds to respond to the resync request
time.Sleep(1 * time.Second)

// check status resync duration metric as a histogram
h := resourceStatusResyncDurationMetric.WithLabelValues(c.sourceID, c.clusterName, mockEventDataType.String())
count, sum := toFloat64HistCountAndSum(h)
require.Equal(t, uint64(1), count)
require.Greater(t, sum, 0.0)
require.Less(t, sum, 1.0)
}

cancel()
})
}
}

// toFloat64HistCountAndSum returns the count and sum of a histogram metric
func toFloat64HistCountAndSum(h prometheus.Observer) (uint64, float64) {
var (
m prometheus.Metric
mCount int
mChan = make(chan prometheus.Metric)
done = make(chan struct{})
)

go func() {
for m = range mChan {
mCount++
}
close(done)
}()

c, ok := h.(prometheus.Collector)
if !ok {
panic(fmt.Errorf("observer is not a collector; got: %T", h))
}

c.Collect(mChan)
close(mChan)
<-done

if mCount != 1 {
panic(fmt.Errorf("collected %d metrics instead of exactly 1", mCount))
}

pb := &dto.Metric{}
if err := m.Write(pb); err != nil {
panic(fmt.Errorf("metric write failed, err=%v", err))
}

if pb.Histogram != nil {
return pb.Histogram.GetSampleCount(), pb.Histogram.GetSampleSum()
}
panic(fmt.Errorf("collected a non-histogram metric: %s", pb))
}
9 changes: 9 additions & 0 deletions pkg/cloudevents/generic/sourceclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"strconv"
"time"

cloudevents "github.com/cloudevents/sdk-go/v2"

Expand Down Expand Up @@ -157,9 +158,17 @@ func (c *CloudEventSourceClient[T]) receive(ctx context.Context, evt cloudevents
return
}

clusterName, err := evt.Context.GetExtension(types.ExtensionClusterName)
if err != nil {
klog.Errorf("failed to get cluster name extension, %v", err)
return
}

startTime := time.Now()
if err := c.respondResyncSpecRequest(ctx, eventType.CloudEventsDataType, evt); err != nil {
klog.Errorf("failed to resync resources spec, %v", err)
}
updateResourceSpecResyncDurationMetric(c.sourceID, fmt.Sprintf("%s", clusterName), eventType.CloudEventsDataType.String(), startTime)

return
}
Expand Down
Loading

0 comments on commit 3e84658

Please sign in to comment.