Skip to content

Commit

Permalink
address comments and add integration test
Browse files Browse the repository at this point in the history
  • Loading branch information
wonderflow committed Dec 19, 2024
1 parent 34cd10c commit c1c2719
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 37 deletions.
13 changes: 8 additions & 5 deletions api/pkg/filtermanager/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ func (conf *filterManagerConfig) InitOnce() {
}

func (p *FilterManagerConfigParser) Parse(any *anypb.Any, callbacks capi.ConfigCallbackHandler) (interface{}, error) {
if callbacks == nil {
api.LogErrorf("no config callback handler provided")
// the call back handler to be nil only affects plugin metrics, so we can continue
}
configStruct := &xds.TypedStruct{}

// No configuration
Expand Down Expand Up @@ -217,11 +221,6 @@ func (p *FilterManagerConfigParser) Parse(any *anypb.Any, callbacks capi.ConfigC
for _, proto := range plugins {
name := proto.Name

if registerMetrics := pkgPlugins.LoadMetricsCallback(name); registerMetrics != nil {
registerMetrics(callbacks)
api.LogInfof("loaded metrics definition for plugin %s", name)
}

if plugin := pkgPlugins.LoadHTTPFilterFactoryAndParser(name); plugin != nil {
config, err := plugin.ConfigParser.Parse(proto.Config)
if err != nil {
Expand Down Expand Up @@ -252,6 +251,10 @@ func (p *FilterManagerConfigParser) Parse(any *anypb.Any, callbacks capi.ConfigC
if _, ok := config.(pkgPlugins.Initer); ok {
needInit = true
}
if register, ok := config.(pkgPlugins.MetricsRegister); ok {
register.MetricsDefinition(callbacks)
api.LogInfof("loaded metrics definition for plugin: %s", name)
}

Check warning on line 257 in api/pkg/filtermanager/config.go

View check run for this annotation

Codecov / codecov/patch

api/pkg/filtermanager/config.go#L255-L257

Added lines #L255 - L257 were not covered by tests

if name == "debugMode" {
// we handle this plugin differently, so we can have debug behavior before
Expand Down
18 changes: 0 additions & 18 deletions api/pkg/plugins/plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,24 +191,6 @@ func (cp *PluginConfigParser) Parse(any interface{}) (res interface{}, err error
return conf, nil
}

func RegisterMetricsCallback(pluginName string, registerMetricFunc func(capi.ConfigCallbacks)) {
if registerMetricFunc == nil {
panic("registerMetricFunc should not be nil")
}
if pluginName == "" {
panic("pluginName should not be empty")
}
if _, ok := metricsRegister[pluginName]; ok {
logger.Error(errors.New("metrics for plugin already registered, overriding"), "name", pluginName)
}
metricsRegister[pluginName] = registerMetricFunc
logger.Info("registered metrics for plugin", "name", pluginName)
}

func LoadMetricsCallback(pluginName string) func(capi.ConfigCallbacks) {
return metricsRegister[pluginName]
}

// PluginMethodDefaultImpl provides reasonable implementation for optional methods
type PluginMethodDefaultImpl struct{}

Expand Down
6 changes: 6 additions & 0 deletions api/pkg/plugins/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package plugins

import (
capi "github.com/envoyproxy/envoy/contrib/golang/common/go/api"

"mosn.io/htnn/api/pkg/filtermanager/api"
)

Expand Down Expand Up @@ -149,6 +151,10 @@ type Initer interface {
Init(cb api.ConfigCallbackHandler) error
}

type MetricsRegister interface {
MetricsDefinition(capi.ConfigCallbacks)
}

type NativePlugin interface {
Plugin

Expand Down
21 changes: 21 additions & 0 deletions api/plugins/tests/integration/dataplane/data_plane.go
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,27 @@ func (dp *DataPlane) do(method string, path string, header http.Header, body io.
return resp, err
}

func (dp *DataPlane) GetAdmin(path string) (*http.Response, error) {
req, err := http.NewRequest("GET", "http://localhost:"+dp.adminAPIPort+path, nil)
if err != nil {
return nil, err
}
tr := &http.Transport{
DialContext: func(ctx context.Context, proto, addr string) (conn net.Conn, err error) {
return net.DialTimeout("tcp", ":"+dp.adminAPIPort, 1*time.Second)
},
}

client := &http.Client{Transport: tr,
Timeout: 10 * time.Second,
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
}
resp, err := client.Do(req)
return resp, err
}

func (dp *DataPlane) doWithTrailer(method string, path string, header http.Header, body io.Reader, trailer http.Header) (*http.Response, error) {
req, err := http.NewRequest(method, "http://localhost:"+dp.dataPlanePort+path, body)
if err != nil {
Expand Down
41 changes: 41 additions & 0 deletions api/tests/integration/filtermanager_latest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package integration
import (
"bytes"
_ "embed"
"io"
"net/http"
"os"
"path/filepath"
Expand Down Expand Up @@ -339,3 +340,43 @@ func TestFilterManagerLogWithTrailers(t *testing.T) {
require.Nil(t, err)
assert.Equal(t, 200, resp.StatusCode)
}

func TestMetricsEnabledPlugin(t *testing.T) {

dp, err := dataplane.StartDataPlane(t, &dataplane.Option{
LogLevel: "debug",
Bootstrap: dataplane.Bootstrap(),
})
if err != nil {
t.Fatalf("failed to start data plane: %v", err)
return
}
defer dp.Stop()

lp := &filtermanager.FilterManagerConfig{
Plugins: []*model.FilterConfig{
{
Name: "metrics",
Config: &Config{},
},
},
}

controlPlane.UseGoPluginConfig(t, lp, dp)
hdr := http.Header{}
trailer := http.Header{}
trailer.Add("Expires", "Wed, 21 Oct 2015 07:28:00 GMT")
resp, err := dp.Get("/", hdr)
require.Nil(t, err)
body, err := io.ReadAll(resp.Body)
require.Nil(t, err)
assert.Equal(t, 200, resp.StatusCode, "response: %s", string(body))
resp.Body.Close()

resp, err = dp.GetAdmin("/stats")
require.Nil(t, err)
body, err = io.ReadAll(resp.Body)
require.Nil(t, err)
assert.Contains(t, string(body), "metrics-test.usage.counter 1")
assert.Contains(t, string(body), "metrics-test.usage.gauge 2")
}
75 changes: 61 additions & 14 deletions api/tests/integration/test_plugins.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@ func (p *bufferPlugin) Factory() api.FilterFactory {
type localReplyPlugin struct {
plugins.PluginMethodDefaultImpl
basePlugin
usageCounter capi.CounterMetric
}

func localReplyFactory(c interface{}, callbacks api.FilterCallbackHandler) api.Filter {
Expand Down Expand Up @@ -245,9 +244,6 @@ func (f *localReplyFilter) DecodeRequest(headers api.RequestHeaderMap, buf api.B
f.reqHdr = headers
f.runFilters = headers.Values("run")
if f.config.Decode {
if lrp.usageCounter != nil {
lrp.usageCounter.Increment(1)
}
return f.NewLocalResponse("reply", true)
}
return api.Continue
Expand Down Expand Up @@ -315,11 +311,6 @@ func (p *localReplyPlugin) Factory() api.FilterFactory {
return localReplyFactory
}

func (p *localReplyPlugin) MetricsDefinition(c capi.ConfigCallbacks) {
p.usageCounter = c.DefineCounterMetric("localreply.usage.counter")
// Define more metrics here
}

type badPlugin struct {
plugins.PluginMethodDefaultImpl
}
Expand Down Expand Up @@ -630,15 +621,70 @@ func (f *onLogFilter) OnLog(reqHeaders api.RequestHeaderMap, reqTrailers api.Req
api.LogWarnf("receive request trailers: %+v", trailers)
}

var lrp = &localReplyPlugin{}
type metricsConfig struct {
Config

usageCounter capi.CounterMetric
gauge capi.GaugeMetric
}

func (m *metricsConfig) MetricsDefinition(c capi.ConfigCallbacks) {
if c == nil {
api.LogErrorf("metrics config callback is nil")
return
}
m.usageCounter = c.DefineCounterMetric("metrics-test.usage.counter")
m.gauge = c.DefineGaugeMetric("metrics-test.usage.gauge")
api.LogInfo("metrics config loaded for metrics-test")
// Define more metrics here
}

var _ plugins.MetricsRegister = &metricsConfig{}

type metricsPlugin struct {
plugins.PluginMethodDefaultImpl
}

func (p *metricsPlugin) Config() api.PluginConfig {
return &metricsConfig{}
}

func (p *metricsPlugin) Factory() api.FilterFactory {
return metricsFactory
}

func metricsFactory(c interface{}, callbacks api.FilterCallbackHandler) api.Filter {
return &metricsFilter{
callbacks: callbacks,
config: c.(*metricsConfig),
}
}

type metricsFilter struct {
api.PassThroughFilter

callbacks api.FilterCallbackHandler
config *metricsConfig
}

func (f *metricsFilter) DecodeHeaders(headers api.RequestHeaderMap, endStream bool) api.ResultAction {
if f.config.usageCounter != nil {
f.config.usageCounter.Increment(1)
} else {
return &api.LocalResponse{Code: 500, Msg: "metrics config counter is nil"}
}
if f.config.gauge != nil {
f.config.gauge.Record(2)
} else {
return &api.LocalResponse{Code: 500, Msg: "metrics config gauge is nil"}
}
return &api.LocalResponse{Code: 200, Msg: "metrics works"}
}

func init() {
plugins.RegisterPlugin("stream", &streamPlugin{})
plugins.RegisterPlugin("buffer", &bufferPlugin{})

plugins.RegisterPlugin("localReply", lrp)
plugins.RegisterMetricsCallback("localReply", lrp.MetricsDefinition)

plugins.RegisterPlugin("localReply", &localReplyPlugin{})
plugins.RegisterPlugin("bad", &badPlugin{})
plugins.RegisterPlugin("consumer", &consumerPlugin{})
plugins.RegisterPlugin("init", &initPlugin{})
Expand All @@ -647,4 +693,5 @@ func init() {
plugins.RegisterPlugin("beforeConsumerAndHasOtherMethod", &beforeConsumerAndHasOtherMethodPlugin{})
plugins.RegisterPlugin("beforeConsumerAndHasDecodeRequest", &beforeConsumerAndHasDecodeRequestPlugin{})
plugins.RegisterPlugin("onLog", &onLogPlugin{})
plugins.RegisterPlugin("metrics", &metricsPlugin{})
}

0 comments on commit c1c2719

Please sign in to comment.