Skip to content

Commit

Permalink
fix: update the statsreporter
Browse files Browse the repository at this point in the history
  • Loading branch information
Leo6Leo committed Aug 15, 2024
1 parent ea78fc9 commit e1aa68a
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 194 deletions.
122 changes: 5 additions & 117 deletions pkg/metrics/stats_reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,46 +19,13 @@ package metrics
import (
"context"
"log"
"strconv"
"time"

"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"knative.dev/pkg/metrics"
)

const (
// anyValue is the default value if the trigger filter attributes are empty.
AnyValue = "any"
)

var (
// eventCountM is a counter which records the number of events received
// by a Trigger.
eventCountM = stats.Int64(
"event_count",
"Number of events received by a Trigger",
stats.UnitDimensionless,
)

// dispatchTimeInMsecM records the time spent dispatching an event to
// a Trigger subscriber, in milliseconds.
dispatchTimeInMsecM = stats.Float64(
"event_dispatch_latencies",
"The time spent dispatching an event to a Trigger subscriber",
stats.UnitMilliseconds,
)

// Create the tag keys that will be used to add tags to our measurements.
// Tag keys must conform to the restrictions described in
// go.opencensus.io/tag/validate.go. Currently those restrictions are:
// - length between 1 and 255 inclusive
// - characters are printable US-ASCII
responseCodeKey = tag.MustNewKey(LabelResponseCode)
responseCodeClassKey = tag.MustNewKey(LabelResponseCodeClass)
)

type MetricArgs interface {
GenerateTag(tags ...tag.Mutator) (context.Context, error)
}
Expand All @@ -69,104 +36,25 @@ func init() {

// StatsReporter defines the interface for sending filter metrics.
type StatsReporter interface {
ReportEventCount(args MetricArgs, responseCode int) error
ReportEventDispatchTime(args MetricArgs, responseCode int, d time.Duration) error
}

var _ StatsReporter = (*reporter)(nil)
var EmptyContext = context.Background()

// reporter holds cached metric objects to report filter metrics.
type reporter struct {
container string
uniqueName string
}

// NewStatsReporter creates a reporter that collects and reports filter metrics.
func NewStatsReporter(container, uniqueName string) StatsReporter {
return &reporter{
container: container,
uniqueName: uniqueName,
}
}

func Register(customMetrics []stats.Measure, customViews []*view.View, customTagKeys ...tag.Key) {
commonTagKeys := []tag.Key{responseCodeKey, responseCodeClassKey, tag.MustNewKey("unique_name"), tag.MustNewKey("container_name")}
allTagKeys := append(commonTagKeys, customTagKeys...)

defaultViews := []*view.View{
{
Description: eventCountM.Description(),
Measure: eventCountM,
Aggregation: view.Count(),
TagKeys: allTagKeys,
},
{
Description: dispatchTimeInMsecM.Description(),
Measure: dispatchTimeInMsecM,
Aggregation: view.Distribution(metrics.Buckets125(1, 10000)...),
TagKeys: allTagKeys,
},
}
allTagKeys := append(customTagKeys)

Check failure on line 42 in pkg/metrics/stats_reporter.go

View workflow job for this annotation

GitHub Actions / style / Golang / Lint

appends: append with no values (govet)
allViews := append(customViews)

Check failure on line 43 in pkg/metrics/stats_reporter.go

View workflow job for this annotation

GitHub Actions / style / Golang / Lint

appends: append with no values (govet)

// Add custom views for custom metrics
for _, metric := range customMetrics {
defaultViews = append(defaultViews, &view.View{
allViews = append(allViews, &view.View{
Description: metric.Description(),
Name: metric.Name(),
Measure: metric,
Aggregation: view.LastValue(), // You can change this aggregation as needed
Aggregation: view.LastValue(),
TagKeys: allTagKeys,
})
}

// Append custom views
allViews := append(defaultViews, customViews...)

if err := metrics.RegisterResourceView(allViews...); err != nil {
log.Printf("Failed to register opencensus views: %v", err)
}
}

// ReportEventCount captures the event count.
func (r *reporter) ReportEventCount(args MetricArgs, responseCode int) error {
// Create base tags
baseTags := []tag.Mutator{
tag.Insert(responseCodeKey, strconv.Itoa(responseCode)),
tag.Insert(responseCodeClassKey, metrics.ResponseCodeClass(responseCode)),
}

// Generate context with all tags, including any custom ones from args.GenerateTag
ctx, err := args.GenerateTag(baseTags...)
if err != nil {
return err
}

metrics.Record(ctx, eventCountM.M(1))
return nil
}

// ReportEventDispatchTime captures dispatch times.
func (r *reporter) ReportEventDispatchTime(args MetricArgs, responseCode int, d time.Duration) error {
// Create base tags
baseTags := []tag.Mutator{
tag.Insert(responseCodeKey, strconv.Itoa(responseCode)),
tag.Insert(responseCodeClassKey, metrics.ResponseCodeClass(responseCode)),
}

// Generate context with all tags, including any custom ones from args.GenerateTag
ctx, err := args.GenerateTag(baseTags...)
if err != nil {
return err
}

// convert time.Duration in nanoseconds to milliseconds.
metrics.Record(ctx, dispatchTimeInMsecM.M(float64(d/time.Millisecond)))
return nil
}

func ValueOrAny(v string) string {
if v != "" {
return v
}
return AnyValue
}
139 changes: 62 additions & 77 deletions pkg/metrics/stats_repoter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,48 +13,70 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
package metrics_test

Check failure on line 16 in pkg/metrics/stats_repoter_test.go

View workflow job for this annotation

GitHub Actions / style / Golang / Boilerplate Check (go)

[Go headers] reported by reviewdog 🐶 found mismatched boilerplate lines: Raw Output: pkg/metrics/stats_repoter_test.go:16: found mismatched boilerplate lines: {[]string}[0]: -: "" +: "package metrics_test"

import (
"context"
"net/http"
"testing"
"time"

"go.opencensus.io/resource"
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
eventingmetrics "knative.dev/eventing/pkg/metrics"
"knative.dev/pkg/metrics/metricskey"
"knative.dev/pkg/metrics/metricstest"
_ "knative.dev/pkg/metrics/testing"
)

type testMetricArgs struct{}
type testMetricArgs struct {
ns string
trigger string
broker string
testParam string
}

func (t *testMetricArgs) GenerateTag(tags ...tag.Mutator) (context.Context, error) {
return tag.New(EmptyContext, append(tags,
tag.Insert(tag.MustNewKey("unique_name"), "testpod"),
tag.Insert(tag.MustNewKey("container_name"), "testcontainer"),
)...)
func (args *testMetricArgs) GenerateTag(tags ...tag.Mutator) (context.Context, error) {
ctx := metricskey.WithResource(context.Background(), resource.Resource{
Type: eventingmetrics.ResourceTypeKnativeTrigger,
Labels: map[string]string{
eventingmetrics.LabelNamespaceName: args.ns,
eventingmetrics.LabelBrokerName: args.broker,
eventingmetrics.LabelTriggerName: args.trigger,
},
})
ctx, err := tag.New(
ctx,
append(tags,
tag.Insert(customTagKey, args.testParam),
)...)
return ctx, err
}

// Custom metric for testing
var customMetricM = stats.Int64(
"custom_metric",
"A custom metric for testing",
stats.UnitDimensionless,
var (
customMetricM = stats.Int64(
"custom_metric",
"A custom metric for testing",
stats.UnitDimensionless,
)

customTagKey = tag.MustNewKey("custom_tag")
)

// Custom tag for testing
var customTagKey = tag.MustNewKey("custom_tag")
// StatsReporter interface definition
type StatsReporter interface {
eventingmetrics.StatsReporter
ReportCustomMetric(args eventingmetrics.MetricArgs, value int64) error
}

// add ReportEventCountRetry to StatsReporter
type statsReporterTester interface {
StatsReporter
ReportCustomMetric(args MetricArgs, value int64) error
// reporter struct definition
type reporter struct {
container string
uniqueName string
}

// ReportCustomMetric records a custom metric value.
func (r *reporter) ReportCustomMetric(args MetricArgs, value int64) error {
func (r *reporter) ReportCustomMetric(args eventingmetrics.MetricArgs, value int64) error {
// Create base tags
baseTags := []tag.Mutator{
tag.Insert(tag.MustNewKey("unique_name"), r.uniqueName),
Expand All @@ -71,73 +93,40 @@ func (r *reporter) ReportCustomMetric(args MetricArgs, value int64) error {
stats.Record(ctx, customMetricM.M(value))
return nil
}
func NewStatsReporterTester(container, uniqueName string) statsReporterTester {
return &reporter{
container: container,
uniqueName: uniqueName,
}
}

func TestStatsReporter(t *testing.T) {
func TestRegisterCustomMetrics(t *testing.T) {
setup()

args := &testMetricArgs{}
r := NewStatsReporterTester("testcontainer", "testpod")

wantTags := map[string]string{
"response_code": "202",
"response_code_class": "2xx",
"unique_name": "testpod",
"container_name": "testcontainer",
args := &testMetricArgs{
ns: "test-namespace",
trigger: "test-trigger",
broker: "test-broker",
testParam: "test-param",
}
r := &reporter{
container: "testcontainer",
uniqueName: "testpod",
}

// Test ReportEventCount
expectSuccess(t, func() error {
return r.ReportEventCount(args, http.StatusAccepted)
})
expectSuccess(t, func() error {
return r.ReportEventCount(args, http.StatusAccepted)
})
metricstest.CheckCountData(t, "event_count", wantTags, 2)

// Test ReportEventDispatchTime
expectSuccess(t, func() error {
return r.ReportEventDispatchTime(args, http.StatusAccepted, 1100*time.Millisecond)
})
expectSuccess(t, func() error {
return r.ReportEventDispatchTime(args, http.StatusAccepted, 9100*time.Millisecond)
})
metricstest.CheckDistributionData(t, "event_dispatch_latencies", wantTags, 2, 1100.0, 9100.0)

}

func TestRegisterCustomMetrics(t *testing.T) {
setup()

customMetrics := []stats.Measure{customMetricM}
customViews := []*view.View{
{
Description: customMetricM.Description(),
Measure: customMetricM,
Aggregation: view.LastValue(),
TagKeys: []tag.Key{customTagKey},
},
}

customTagKeys := []tag.Key{customTagKey}

Register(customMetrics, customViews, customTagKeys...)
// No need for customViews, as the custom metric will be used to create the view
eventingmetrics.Register(customMetrics, nil, customTagKeys...)

// Verify that the custom view is registered
if v := view.Find("custom_metric"); v == nil {
t.Error("Custom view was not registered")
}

// Record a value for the custom metric
ctx, _ := tag.New(context.Background(), tag.Insert(customTagKey, "test_value"))
stats.Record(ctx, customMetricM.M(100))
expectSuccess(t, func() error {
return r.ReportCustomMetric(args, 100)
})

// Check if the value was recorded correctly
metricstest.CheckLastValueData(t, "custom_metric", map[string]string{"custom_tag": "test_value"}, 100)
metricstest.CheckLastValueData(t, "custom_metric", map[string]string{"custom_tag": "test-param"}, 100)
}

func expectSuccess(t *testing.T, f func() error) {
Expand All @@ -152,10 +141,6 @@ func setup() {
}

func resetMetrics() {
metricstest.Unregister(
"event_count",
"event_dispatch_latencies",
"event_processing_latencies",
"custom_metric")
Register([]stats.Measure{}, nil)
metricstest.Unregister("custom_metric")
eventingmetrics.Register([]stats.Measure{}, nil)
}

0 comments on commit e1aa68a

Please sign in to comment.