From 5f5357a1de7e3fc18328a0b4969719c9fbdb14fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BD=97=E6=B3=BD=E8=BD=A9?= Date: Fri, 8 Nov 2024 10:51:29 +0800 Subject: [PATCH] optimize execution time recording (#793) * optimize execution time recording Signed-off-by: spacewander * divide & conquer Signed-off-by: spacewander --------- Signed-off-by: spacewander --- api/pkg/filtermanager/filtermanager.go | 11 ++++++ .../filtermanager_benchmark_test.go | 33 ++++++++++++++++++ api/pkg/filtermanager/model/model.go | 31 +++++++++++++++-- api/pkg/filtermanager/wrapper.go | 28 ++++----------- api/pkg/filtermanager/wrapper_test.go | 34 +++++++++++++++---- plugins/plugins/debugmode/filter.go | 10 +++--- 6 files changed, 111 insertions(+), 36 deletions(-) diff --git a/api/pkg/filtermanager/filtermanager.go b/api/pkg/filtermanager/filtermanager.go index c86546f6..2dca20f8 100644 --- a/api/pkg/filtermanager/filtermanager.go +++ b/api/pkg/filtermanager/filtermanager.go @@ -888,6 +888,17 @@ func (m *filterManager) EncodeTrailers(trailers capi.ResponseTrailerMap) capi.St func (m *filterManager) runOnLogPhase(reqHdr api.RequestHeaderMap, reqTrailer api.RequestTrailerMap, rspHdr api.ResponseHeaderMap, rspTrailer api.ResponseTrailerMap) { + if m.DebugModeEnabled() { + executionRecords := model.NewExecutionRecords() + for _, f := range m.filters { + if df, ok := f.Filter.(*debugFilter); ok { + name, duration := df.reportExecution() + executionRecords.Record(name, duration) + } + } + m.callbacks.PluginState().Set("debugMode", "executionRecords", executionRecords) + } + // It is unsafe to access the f.callbacks in the goroutine, as the underlying request // may be destroyed when the goroutine is running. So if people want to do some IO jobs, // they need to copy the used data from the request to the Go side before kicking off diff --git a/api/pkg/filtermanager/filtermanager_benchmark_test.go b/api/pkg/filtermanager/filtermanager_benchmark_test.go index 9cee6b80..345c5b1d 100644 --- a/api/pkg/filtermanager/filtermanager_benchmark_test.go +++ b/api/pkg/filtermanager/filtermanager_benchmark_test.go @@ -15,6 +15,7 @@ package filtermanager import ( + "fmt" "net/http" "strconv" "testing" @@ -135,3 +136,35 @@ func BenchmarkFilterManagerConsumerWithFilter(b *testing.B) { m.OnLog(reqHdrs[n%num], nil, nil, nil) } } + +func BenchmarkFilterManagerDebugEnabled(b *testing.B) { + envoy.DisableLogInTest() // otherwise, there is too much output + cb := envoy.NewCAPIFilterCallbackHandler() + config := initFilterManagerConfig("ns") + pc := []*model.ParsedFilterConfig{} + for i := 0; i < 5; i++ { + pc = append(pc, &model.ParsedFilterConfig{ + Name: fmt.Sprintf("all-%d", i), + Factory: PassThroughFactory, + }) + } + config.parsed = pc + config.enableDebugMode = true + reqHdr := envoy.NewRequestHeaderMap(http.Header{}) + respHdr := envoy.NewResponseHeaderMap(http.Header{}) + reqBuf := envoy.NewBufferInstance([]byte{}) + respBuf := envoy.NewBufferInstance([]byte{}) + + for n := 0; n < b.N; n++ { + m := unwrapFilterManager(FilterManagerFactory(config, cb)) + m.DecodeHeaders(reqHdr, false) + cb.WaitContinued() + m.DecodeData(reqBuf, true) + cb.WaitContinued() + m.EncodeHeaders(respHdr, false) + cb.WaitContinued() + m.EncodeData(respBuf, true) + cb.WaitContinued() + m.OnLog(reqHdr, nil, respHdr, nil) + } +} diff --git a/api/pkg/filtermanager/model/model.go b/api/pkg/filtermanager/model/model.go index 008ee258..b11aefbe 100644 --- a/api/pkg/filtermanager/model/model.go +++ b/api/pkg/filtermanager/model/model.go @@ -49,7 +49,32 @@ func NewFilterWrapper(name string, f api.Filter) *FilterWrapper { } } -type ExecutionRecord struct { - PluginName string - Record time.Duration +type executionRecord struct { + name string + duration time.Duration +} + +type ExecutionRecords struct { + records []*executionRecord +} + +func NewExecutionRecords() *ExecutionRecords { + return &ExecutionRecords{ + records: make([]*executionRecord, 0, 8), + } +} + +// Record & ForEach should only be called in OnLog phase + +func (e *ExecutionRecords) Record(name string, duration time.Duration) { + e.records = append(e.records, &executionRecord{ + name: name, + duration: duration, + }) +} + +func (e *ExecutionRecords) ForEach(f func(name string, duration time.Duration)) { + for _, record := range e.records { + f(record.name, record.duration) + } } diff --git a/api/pkg/filtermanager/wrapper.go b/api/pkg/filtermanager/wrapper.go index b29bb245..48666fc8 100644 --- a/api/pkg/filtermanager/wrapper.go +++ b/api/pkg/filtermanager/wrapper.go @@ -15,12 +15,9 @@ package filtermanager import ( - "fmt" - "reflect" "time" "mosn.io/htnn/api/pkg/filtermanager/api" - "mosn.io/htnn/api/pkg/filtermanager/model" ) type logExecutionFilter struct { @@ -119,6 +116,8 @@ type debugFilter struct { name string internal api.Filter callbacks api.FilterCallbackHandler + + record time.Duration } func NewDebugFilter(name string, internal api.Filter, callbacks api.FilterCallbackHandler) api.Filter { @@ -131,26 +130,11 @@ func NewDebugFilter(name string, internal api.Filter, callbacks api.FilterCallba func (f *debugFilter) recordExecution(start time.Time) { duration := time.Since(start) - executionRecords := f.callbacks.PluginState().Get("debugMode", "executionRecords") - if executionRecords == nil { - executionRecords = []*model.ExecutionRecord{} - f.callbacks.PluginState().Set("debugMode", "executionRecords", executionRecords) - } + f.record += duration +} - records, ok := executionRecords.([]*model.ExecutionRecord) - if !ok { - panic(fmt.Sprintf("unexpected type: %s", reflect.TypeOf(executionRecords))) - } - for _, record := range records { - if record.PluginName == f.name { - record.Record += duration - return - } - } - f.callbacks.PluginState().Set("debugMode", "executionRecords", append(records, &model.ExecutionRecord{ - PluginName: f.name, - Record: duration, - })) +func (f *debugFilter) reportExecution() (name string, duration time.Duration) { + return f.name, f.record } func (f *debugFilter) DecodeHeaders(headers api.RequestHeaderMap, endStream bool) api.ResultAction { diff --git a/api/pkg/filtermanager/wrapper_test.go b/api/pkg/filtermanager/wrapper_test.go index 92bbe50b..73cc84c4 100644 --- a/api/pkg/filtermanager/wrapper_test.go +++ b/api/pkg/filtermanager/wrapper_test.go @@ -35,9 +35,30 @@ func TestDebugFilter(t *testing.T) { f2.DecodeHeaders(nil, true) f1.DecodeHeaders(nil, true) - records := cb.PluginState().Get("debugMode", "executionRecords").([]*model.ExecutionRecord) + + type RecordWrapper struct { + PluginName string + Record time.Duration + } + + getRecords := func(executionRecords *model.ExecutionRecords) []RecordWrapper { + records := []RecordWrapper{} + executionRecords.ForEach(func(name string, duration time.Duration) { + r := RecordWrapper{ + PluginName: name, + Record: duration, + } + records = append(records, r) + }) + return records + } + executionRecords := model.NewExecutionRecords() + name, duration := f2.(*debugFilter).reportExecution() + executionRecords.Record(name, duration) + name, duration = f1.(*debugFilter).reportExecution() + executionRecords.Record(name, duration) + records := getRecords(executionRecords) t.Logf("get records %+v\n", records) // for debug when test failed - assert.Equal(t, 2, len(records)) assert.Equal(t, "two", records[0].PluginName) assert.True(t, records[0].Record > 0) assert.Equal(t, "one", records[1].PluginName) @@ -63,12 +84,13 @@ func TestDebugFilter(t *testing.T) { f1.EncodeHeaders(nil, false) f1.EncodeData(nil, true) - records = cb.PluginState().Get("debugMode", "executionRecords").([]*model.ExecutionRecord) + executionRecords = model.NewExecutionRecords() + name, duration = f1.(*debugFilter).reportExecution() + executionRecords.Record(name, duration) + records = getRecords(executionRecords) t.Logf("get records %+v\n", records) // for debug when test failed - assert.Equal(t, 2, len(records)) - assert.Equal(t, "one", records[1].PluginName) // Should be the sum of multiple calls delta := 10 * time.Millisecond - rec := records[1].Record - decodeHeadersCost + rec := records[0].Record - decodeHeadersCost assert.True(t, 270*time.Millisecond-delta < rec && rec < 270*time.Millisecond+delta, rec) } diff --git a/plugins/plugins/debugmode/filter.go b/plugins/plugins/debugmode/filter.go index 2e1f9706..c54e757e 100644 --- a/plugins/plugins/debugmode/filter.go +++ b/plugins/plugins/debugmode/filter.go @@ -104,14 +104,14 @@ func (f *filter) OnLog(reqHeaders api.RequestHeaderMap, reqTrailers api.RequestT // This is a private API and we don't guarantee its stablibity r := f.callbacks.PluginState().Get("debugMode", "executionRecords") if r != nil { - executionRecords := r.([]*model.ExecutionRecord) - for _, record := range executionRecords { + executionRecords := r.(*model.ExecutionRecords) + executionRecords.ForEach(func(name string, duration time.Duration) { p := executionPlugin{ - Name: record.PluginName, - CostSeconds: record.Record.Seconds(), + Name: name, + CostSeconds: duration.Seconds(), } report.ExecutedPlugins = append(report.ExecutedPlugins, p) - } + }) } b, _ := json.Marshal(report)