diff --git a/api/pkg/filtermanager/config.go b/api/pkg/filtermanager/config.go index 594f54b8..c6a15e39 100644 --- a/api/pkg/filtermanager/config.go +++ b/api/pkg/filtermanager/config.go @@ -177,6 +177,10 @@ func (conf *filterManagerConfig) InitOnce() { } func (p *FilterManagerConfigParser) Parse(any *anypb.Any, callbacks capi.ConfigCallbackHandler) (interface{}, error) { + if callbacks == nil { + api.LogErrorf("no config callback handler provided") + // the call back handler to be nil only affects plugin metrics, so we can continue + } configStruct := &xds.TypedStruct{} // No configuration @@ -217,11 +221,6 @@ func (p *FilterManagerConfigParser) Parse(any *anypb.Any, callbacks capi.ConfigC for _, proto := range plugins { name := proto.Name - if registerMetrics := pkgPlugins.LoadMetricsCallback(name); registerMetrics != nil { - registerMetrics(callbacks) - api.LogInfof("loaded metrics definition for plugin %s", name) - } - if plugin := pkgPlugins.LoadHTTPFilterFactoryAndParser(name); plugin != nil { config, err := plugin.ConfigParser.Parse(proto.Config) if err != nil { @@ -252,6 +251,10 @@ func (p *FilterManagerConfigParser) Parse(any *anypb.Any, callbacks capi.ConfigC if _, ok := config.(pkgPlugins.Initer); ok { needInit = true } + if register, ok := config.(pkgPlugins.MetricsRegister); ok { + register.MetricsDefinition(callbacks) + api.LogInfof("loaded metrics definition for plugin: %s", name) + } if name == "debugMode" { // we handle this plugin differently, so we can have debug behavior before diff --git a/api/pkg/plugins/plugins.go b/api/pkg/plugins/plugins.go index c69f40f5..7cae28a7 100644 --- a/api/pkg/plugins/plugins.go +++ b/api/pkg/plugins/plugins.go @@ -191,24 +191,6 @@ func (cp *PluginConfigParser) Parse(any interface{}) (res interface{}, err error return conf, nil } -func RegisterMetricsCallback(pluginName string, registerMetricFunc func(capi.ConfigCallbacks)) { - if registerMetricFunc == nil { - panic("registerMetricFunc should not be nil") - } - if pluginName == "" { - panic("pluginName should not be empty") - } - if _, ok := metricsRegister[pluginName]; ok { - logger.Error(errors.New("metrics for plugin already registered, overriding"), "name", pluginName) - } - metricsRegister[pluginName] = registerMetricFunc - logger.Info("registered metrics for plugin", "name", pluginName) -} - -func LoadMetricsCallback(pluginName string) func(capi.ConfigCallbacks) { - return metricsRegister[pluginName] -} - // PluginMethodDefaultImpl provides reasonable implementation for optional methods type PluginMethodDefaultImpl struct{} diff --git a/api/pkg/plugins/type.go b/api/pkg/plugins/type.go index 1c258586..5642da87 100644 --- a/api/pkg/plugins/type.go +++ b/api/pkg/plugins/type.go @@ -15,6 +15,8 @@ package plugins import ( + capi "github.com/envoyproxy/envoy/contrib/golang/common/go/api" + "mosn.io/htnn/api/pkg/filtermanager/api" ) @@ -149,6 +151,10 @@ type Initer interface { Init(cb api.ConfigCallbackHandler) error } +type MetricsRegister interface { + MetricsDefinition(capi.ConfigCallbacks) +} + type NativePlugin interface { Plugin diff --git a/api/plugins/tests/integration/dataplane/data_plane.go b/api/plugins/tests/integration/dataplane/data_plane.go index 4e29b0e0..ef178477 100644 --- a/api/plugins/tests/integration/dataplane/data_plane.go +++ b/api/plugins/tests/integration/dataplane/data_plane.go @@ -543,6 +543,27 @@ func (dp *DataPlane) do(method string, path string, header http.Header, body io. return resp, err } +func (dp *DataPlane) GetAdmin(path string) (*http.Response, error) { + req, err := http.NewRequest("GET", "http://localhost:"+dp.adminAPIPort+path, nil) + if err != nil { + return nil, err + } + tr := &http.Transport{ + DialContext: func(ctx context.Context, proto, addr string) (conn net.Conn, err error) { + return net.DialTimeout("tcp", ":"+dp.adminAPIPort, 1*time.Second) + }, + } + + client := &http.Client{Transport: tr, + Timeout: 10 * time.Second, + CheckRedirect: func(req *http.Request, via []*http.Request) error { + return http.ErrUseLastResponse + }, + } + resp, err := client.Do(req) + return resp, err +} + func (dp *DataPlane) doWithTrailer(method string, path string, header http.Header, body io.Reader, trailer http.Header) (*http.Response, error) { req, err := http.NewRequest(method, "http://localhost:"+dp.dataPlanePort+path, body) if err != nil { diff --git a/api/tests/integration/filtermanager_latest_test.go b/api/tests/integration/filtermanager_latest_test.go index 308433e5..74ab4dc8 100644 --- a/api/tests/integration/filtermanager_latest_test.go +++ b/api/tests/integration/filtermanager_latest_test.go @@ -19,6 +19,7 @@ package integration import ( "bytes" _ "embed" + "io" "net/http" "os" "path/filepath" @@ -339,3 +340,43 @@ func TestFilterManagerLogWithTrailers(t *testing.T) { require.Nil(t, err) assert.Equal(t, 200, resp.StatusCode) } + +func TestMetricsEnabledPlugin(t *testing.T) { + + dp, err := dataplane.StartDataPlane(t, &dataplane.Option{ + LogLevel: "debug", + Bootstrap: dataplane.Bootstrap(), + }) + if err != nil { + t.Fatalf("failed to start data plane: %v", err) + return + } + defer dp.Stop() + + lp := &filtermanager.FilterManagerConfig{ + Plugins: []*model.FilterConfig{ + { + Name: "metrics", + Config: &Config{}, + }, + }, + } + + controlPlane.UseGoPluginConfig(t, lp, dp) + hdr := http.Header{} + trailer := http.Header{} + trailer.Add("Expires", "Wed, 21 Oct 2015 07:28:00 GMT") + resp, err := dp.Get("/", hdr) + require.Nil(t, err) + body, err := io.ReadAll(resp.Body) + require.Nil(t, err) + assert.Equal(t, 200, resp.StatusCode, "response: %s", string(body)) + resp.Body.Close() + + resp, err = dp.GetAdmin("/stats") + require.Nil(t, err) + body, err = io.ReadAll(resp.Body) + require.Nil(t, err) + assert.Contains(t, string(body), "metrics-test.usage.counter 1") + assert.Contains(t, string(body), "metrics-test.usage.gauge 2") +} diff --git a/api/tests/integration/test_plugins.go b/api/tests/integration/test_plugins.go index 6e5239a1..6d5ec988 100644 --- a/api/tests/integration/test_plugins.go +++ b/api/tests/integration/test_plugins.go @@ -202,7 +202,6 @@ func (p *bufferPlugin) Factory() api.FilterFactory { type localReplyPlugin struct { plugins.PluginMethodDefaultImpl basePlugin - usageCounter capi.CounterMetric } func localReplyFactory(c interface{}, callbacks api.FilterCallbackHandler) api.Filter { @@ -245,9 +244,6 @@ func (f *localReplyFilter) DecodeRequest(headers api.RequestHeaderMap, buf api.B f.reqHdr = headers f.runFilters = headers.Values("run") if f.config.Decode { - if lrp.usageCounter != nil { - lrp.usageCounter.Increment(1) - } return f.NewLocalResponse("reply", true) } return api.Continue @@ -315,11 +311,6 @@ func (p *localReplyPlugin) Factory() api.FilterFactory { return localReplyFactory } -func (p *localReplyPlugin) MetricsDefinition(c capi.ConfigCallbacks) { - p.usageCounter = c.DefineCounterMetric("localreply.usage.counter") - // Define more metrics here -} - type badPlugin struct { plugins.PluginMethodDefaultImpl } @@ -630,15 +621,70 @@ func (f *onLogFilter) OnLog(reqHeaders api.RequestHeaderMap, reqTrailers api.Req api.LogWarnf("receive request trailers: %+v", trailers) } -var lrp = &localReplyPlugin{} +type metricsConfig struct { + Config + + usageCounter capi.CounterMetric + gauge capi.GaugeMetric +} + +func (m *metricsConfig) MetricsDefinition(c capi.ConfigCallbacks) { + if c == nil { + api.LogErrorf("metrics config callback is nil") + return + } + m.usageCounter = c.DefineCounterMetric("metrics-test.usage.counter") + m.gauge = c.DefineGaugeMetric("metrics-test.usage.gauge") + api.LogInfo("metrics config loaded for metrics-test") + // Define more metrics here +} + +var _ plugins.MetricsRegister = &metricsConfig{} + +type metricsPlugin struct { + plugins.PluginMethodDefaultImpl +} + +func (p *metricsPlugin) Config() api.PluginConfig { + return &metricsConfig{} +} + +func (p *metricsPlugin) Factory() api.FilterFactory { + return metricsFactory +} + +func metricsFactory(c interface{}, callbacks api.FilterCallbackHandler) api.Filter { + return &metricsFilter{ + callbacks: callbacks, + config: c.(*metricsConfig), + } +} + +type metricsFilter struct { + api.PassThroughFilter + + callbacks api.FilterCallbackHandler + config *metricsConfig +} + +func (f *metricsFilter) DecodeHeaders(headers api.RequestHeaderMap, endStream bool) api.ResultAction { + if f.config.usageCounter != nil { + f.config.usageCounter.Increment(1) + } else { + return &api.LocalResponse{Code: 500, Msg: "metrics config counter is nil"} + } + if f.config.gauge != nil { + f.config.gauge.Record(2) + } else { + return &api.LocalResponse{Code: 500, Msg: "metrics config gauge is nil"} + } + return &api.LocalResponse{Code: 200, Msg: "metrics works"} +} func init() { plugins.RegisterPlugin("stream", &streamPlugin{}) plugins.RegisterPlugin("buffer", &bufferPlugin{}) - - plugins.RegisterPlugin("localReply", lrp) - plugins.RegisterMetricsCallback("localReply", lrp.MetricsDefinition) - + plugins.RegisterPlugin("localReply", &localReplyPlugin{}) plugins.RegisterPlugin("bad", &badPlugin{}) plugins.RegisterPlugin("consumer", &consumerPlugin{}) plugins.RegisterPlugin("init", &initPlugin{}) @@ -647,4 +693,5 @@ func init() { plugins.RegisterPlugin("beforeConsumerAndHasOtherMethod", &beforeConsumerAndHasOtherMethodPlugin{}) plugins.RegisterPlugin("beforeConsumerAndHasDecodeRequest", &beforeConsumerAndHasDecodeRequestPlugin{}) plugins.RegisterPlugin("onLog", &onLogPlugin{}) + plugins.RegisterPlugin("metrics", &metricsPlugin{}) }