Skip to content

Commit

Permalink
optimize execution time recording (#793)
Browse files Browse the repository at this point in the history
* optimize execution time recording

Signed-off-by: spacewander <[email protected]>

* divide & conquer

Signed-off-by: spacewander <[email protected]>

---------

Signed-off-by: spacewander <[email protected]>
  • Loading branch information
spacewander authored Nov 8, 2024
1 parent 4e14fc7 commit 5f5357a
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 36 deletions.
11 changes: 11 additions & 0 deletions api/pkg/filtermanager/filtermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,17 @@ func (m *filterManager) EncodeTrailers(trailers capi.ResponseTrailerMap) capi.St
func (m *filterManager) runOnLogPhase(reqHdr api.RequestHeaderMap, reqTrailer api.RequestTrailerMap,
rspHdr api.ResponseHeaderMap, rspTrailer api.ResponseTrailerMap) {

if m.DebugModeEnabled() {
executionRecords := model.NewExecutionRecords()
for _, f := range m.filters {
if df, ok := f.Filter.(*debugFilter); ok {
name, duration := df.reportExecution()
executionRecords.Record(name, duration)
}
}
m.callbacks.PluginState().Set("debugMode", "executionRecords", executionRecords)
}

// It is unsafe to access the f.callbacks in the goroutine, as the underlying request
// may be destroyed when the goroutine is running. So if people want to do some IO jobs,
// they need to copy the used data from the request to the Go side before kicking off
Expand Down
33 changes: 33 additions & 0 deletions api/pkg/filtermanager/filtermanager_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package filtermanager

import (
"fmt"
"net/http"
"strconv"
"testing"
Expand Down Expand Up @@ -135,3 +136,35 @@ func BenchmarkFilterManagerConsumerWithFilter(b *testing.B) {
m.OnLog(reqHdrs[n%num], nil, nil, nil)
}
}

func BenchmarkFilterManagerDebugEnabled(b *testing.B) {
envoy.DisableLogInTest() // otherwise, there is too much output
cb := envoy.NewCAPIFilterCallbackHandler()
config := initFilterManagerConfig("ns")
pc := []*model.ParsedFilterConfig{}
for i := 0; i < 5; i++ {
pc = append(pc, &model.ParsedFilterConfig{
Name: fmt.Sprintf("all-%d", i),
Factory: PassThroughFactory,
})
}
config.parsed = pc
config.enableDebugMode = true
reqHdr := envoy.NewRequestHeaderMap(http.Header{})
respHdr := envoy.NewResponseHeaderMap(http.Header{})
reqBuf := envoy.NewBufferInstance([]byte{})
respBuf := envoy.NewBufferInstance([]byte{})

for n := 0; n < b.N; n++ {
m := unwrapFilterManager(FilterManagerFactory(config, cb))
m.DecodeHeaders(reqHdr, false)
cb.WaitContinued()
m.DecodeData(reqBuf, true)
cb.WaitContinued()
m.EncodeHeaders(respHdr, false)
cb.WaitContinued()
m.EncodeData(respBuf, true)
cb.WaitContinued()
m.OnLog(reqHdr, nil, respHdr, nil)
}
}
31 changes: 28 additions & 3 deletions api/pkg/filtermanager/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,32 @@ func NewFilterWrapper(name string, f api.Filter) *FilterWrapper {
}
}

type ExecutionRecord struct {
PluginName string
Record time.Duration
type executionRecord struct {
name string
duration time.Duration
}

type ExecutionRecords struct {
records []*executionRecord
}

func NewExecutionRecords() *ExecutionRecords {
return &ExecutionRecords{
records: make([]*executionRecord, 0, 8),
}
}

// Record & ForEach should only be called in OnLog phase

func (e *ExecutionRecords) Record(name string, duration time.Duration) {
e.records = append(e.records, &executionRecord{
name: name,
duration: duration,
})
}

func (e *ExecutionRecords) ForEach(f func(name string, duration time.Duration)) {
for _, record := range e.records {
f(record.name, record.duration)
}
}
28 changes: 6 additions & 22 deletions api/pkg/filtermanager/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,9 @@
package filtermanager

import (
"fmt"
"reflect"
"time"

"mosn.io/htnn/api/pkg/filtermanager/api"
"mosn.io/htnn/api/pkg/filtermanager/model"
)

type logExecutionFilter struct {
Expand Down Expand Up @@ -119,6 +116,8 @@ type debugFilter struct {
name string
internal api.Filter
callbacks api.FilterCallbackHandler

record time.Duration
}

func NewDebugFilter(name string, internal api.Filter, callbacks api.FilterCallbackHandler) api.Filter {
Expand All @@ -131,26 +130,11 @@ func NewDebugFilter(name string, internal api.Filter, callbacks api.FilterCallba

func (f *debugFilter) recordExecution(start time.Time) {
duration := time.Since(start)
executionRecords := f.callbacks.PluginState().Get("debugMode", "executionRecords")
if executionRecords == nil {
executionRecords = []*model.ExecutionRecord{}
f.callbacks.PluginState().Set("debugMode", "executionRecords", executionRecords)
}
f.record += duration
}

records, ok := executionRecords.([]*model.ExecutionRecord)
if !ok {
panic(fmt.Sprintf("unexpected type: %s", reflect.TypeOf(executionRecords)))
}
for _, record := range records {
if record.PluginName == f.name {
record.Record += duration
return
}
}
f.callbacks.PluginState().Set("debugMode", "executionRecords", append(records, &model.ExecutionRecord{
PluginName: f.name,
Record: duration,
}))
func (f *debugFilter) reportExecution() (name string, duration time.Duration) {
return f.name, f.record
}

func (f *debugFilter) DecodeHeaders(headers api.RequestHeaderMap, endStream bool) api.ResultAction {
Expand Down
34 changes: 28 additions & 6 deletions api/pkg/filtermanager/wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,30 @@ func TestDebugFilter(t *testing.T) {

f2.DecodeHeaders(nil, true)
f1.DecodeHeaders(nil, true)
records := cb.PluginState().Get("debugMode", "executionRecords").([]*model.ExecutionRecord)

type RecordWrapper struct {
PluginName string
Record time.Duration
}

getRecords := func(executionRecords *model.ExecutionRecords) []RecordWrapper {
records := []RecordWrapper{}
executionRecords.ForEach(func(name string, duration time.Duration) {
r := RecordWrapper{
PluginName: name,
Record: duration,
}
records = append(records, r)
})
return records
}
executionRecords := model.NewExecutionRecords()
name, duration := f2.(*debugFilter).reportExecution()
executionRecords.Record(name, duration)
name, duration = f1.(*debugFilter).reportExecution()
executionRecords.Record(name, duration)
records := getRecords(executionRecords)
t.Logf("get records %+v\n", records) // for debug when test failed
assert.Equal(t, 2, len(records))
assert.Equal(t, "two", records[0].PluginName)
assert.True(t, records[0].Record > 0)
assert.Equal(t, "one", records[1].PluginName)
Expand All @@ -63,12 +84,13 @@ func TestDebugFilter(t *testing.T) {
f1.EncodeHeaders(nil, false)
f1.EncodeData(nil, true)

records = cb.PluginState().Get("debugMode", "executionRecords").([]*model.ExecutionRecord)
executionRecords = model.NewExecutionRecords()
name, duration = f1.(*debugFilter).reportExecution()
executionRecords.Record(name, duration)
records = getRecords(executionRecords)
t.Logf("get records %+v\n", records) // for debug when test failed
assert.Equal(t, 2, len(records))
assert.Equal(t, "one", records[1].PluginName)
// Should be the sum of multiple calls
delta := 10 * time.Millisecond
rec := records[1].Record - decodeHeadersCost
rec := records[0].Record - decodeHeadersCost
assert.True(t, 270*time.Millisecond-delta < rec && rec < 270*time.Millisecond+delta, rec)
}
10 changes: 5 additions & 5 deletions plugins/plugins/debugmode/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,14 @@ func (f *filter) OnLog(reqHeaders api.RequestHeaderMap, reqTrailers api.RequestT
// This is a private API and we don't guarantee its stablibity
r := f.callbacks.PluginState().Get("debugMode", "executionRecords")
if r != nil {
executionRecords := r.([]*model.ExecutionRecord)
for _, record := range executionRecords {
executionRecords := r.(*model.ExecutionRecords)
executionRecords.ForEach(func(name string, duration time.Duration) {
p := executionPlugin{
Name: record.PluginName,
CostSeconds: record.Record.Seconds(),
Name: name,
CostSeconds: duration.Seconds(),
}
report.ExecutedPlugins = append(report.ExecutedPlugins, p)
}
})
}

b, _ := json.Marshal(report)
Expand Down

0 comments on commit 5f5357a

Please sign in to comment.