Skip to content

Commit

Permalink
divide & conquer
Browse files Browse the repository at this point in the history
Signed-off-by: spacewander <[email protected]>
  • Loading branch information
spacewander committed Nov 7, 2024
1 parent a74a220 commit 1617577
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 31 deletions.
11 changes: 11 additions & 0 deletions api/pkg/filtermanager/filtermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -888,6 +888,17 @@ func (m *filterManager) EncodeTrailers(trailers capi.ResponseTrailerMap) capi.St
func (m *filterManager) runOnLogPhase(reqHdr api.RequestHeaderMap, reqTrailer api.RequestTrailerMap,
rspHdr api.ResponseHeaderMap, rspTrailer api.ResponseTrailerMap) {

if m.DebugModeEnabled() {
executionRecords := model.NewExecutionRecords()
for _, f := range m.filters {
if df, ok := f.Filter.(*debugFilter); ok {
name, duration := df.reportExecution()
executionRecords.Record(name, duration)
}
}
m.callbacks.PluginState().Set("debugMode", "executionRecords", executionRecords)
}

// It is unsafe to access the f.callbacks in the goroutine, as the underlying request
// may be destroyed when the goroutine is running. So if people want to do some IO jobs,
// they need to copy the used data from the request to the Go side before kicking off
Expand Down
14 changes: 2 additions & 12 deletions api/pkg/filtermanager/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ type executionRecord struct {

type ExecutionRecords struct {
records []*executionRecord
lock sync.Mutex
}

func NewExecutionRecords() *ExecutionRecords {
Expand All @@ -65,25 +64,16 @@ func NewExecutionRecords() *ExecutionRecords {
}
}

func (e *ExecutionRecords) Record(name string, duration time.Duration) {
for _, record := range e.records {
if record.name == name {
record.duration += duration
return
}
}
// Record & ForEach should only be called in OnLog phase

e.lock.Lock()
defer e.lock.Unlock()
func (e *ExecutionRecords) Record(name string, duration time.Duration) {
e.records = append(e.records, &executionRecord{
name: name,
duration: duration,
})
}

func (e *ExecutionRecords) ForEach(f func(name string, duration time.Duration)) {
e.lock.Lock()
defer e.lock.Unlock()
for _, record := range e.records {
f(record.name, record.duration)
}
Expand Down
19 changes: 6 additions & 13 deletions api/pkg/filtermanager/wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,9 @@
package filtermanager

import (
"fmt"
"reflect"
"time"

"mosn.io/htnn/api/pkg/filtermanager/api"
"mosn.io/htnn/api/pkg/filtermanager/model"
)

type logExecutionFilter struct {
Expand Down Expand Up @@ -119,6 +116,8 @@ type debugFilter struct {
name string
internal api.Filter
callbacks api.FilterCallbackHandler

record time.Duration
}

func NewDebugFilter(name string, internal api.Filter, callbacks api.FilterCallbackHandler) api.Filter {
Expand All @@ -131,17 +130,11 @@ func NewDebugFilter(name string, internal api.Filter, callbacks api.FilterCallba

func (f *debugFilter) recordExecution(start time.Time) {
duration := time.Since(start)
executionRecords := f.callbacks.PluginState().Get("debugMode", "executionRecords")
if executionRecords == nil {
executionRecords = model.NewExecutionRecords()
f.callbacks.PluginState().Set("debugMode", "executionRecords", executionRecords)
}
f.record += duration
}

records, ok := executionRecords.(*model.ExecutionRecords)
if !ok {
panic(fmt.Sprintf("unexpected type: %s", reflect.TypeOf(executionRecords)))
}
records.Record(f.name, duration)
func (f *debugFilter) reportExecution() (name string, duration time.Duration) {
return f.name, f.record
}

func (f *debugFilter) DecodeHeaders(headers api.RequestHeaderMap, endStream bool) api.ResultAction {
Expand Down
15 changes: 9 additions & 6 deletions api/pkg/filtermanager/wrapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,13 @@ func TestDebugFilter(t *testing.T) {
})
return records
}
executionRecords := cb.PluginState().Get("debugMode", "executionRecords").(*model.ExecutionRecords)
executionRecords := model.NewExecutionRecords()
name, duration := f2.(*debugFilter).reportExecution()
executionRecords.Record(name, duration)
name, duration = f1.(*debugFilter).reportExecution()
executionRecords.Record(name, duration)
records := getRecords(executionRecords)
t.Logf("get records %+v\n", records) // for debug when test failed
assert.Equal(t, 2, len(records))
assert.Equal(t, "two", records[0].PluginName)
assert.True(t, records[0].Record > 0)
assert.Equal(t, "one", records[1].PluginName)
Expand All @@ -81,13 +84,13 @@ func TestDebugFilter(t *testing.T) {
f1.EncodeHeaders(nil, false)
f1.EncodeData(nil, true)

executionRecords = cb.PluginState().Get("debugMode", "executionRecords").(*model.ExecutionRecords)
executionRecords = model.NewExecutionRecords()
name, duration = f1.(*debugFilter).reportExecution()
executionRecords.Record(name, duration)
records = getRecords(executionRecords)
t.Logf("get records %+v\n", records) // for debug when test failed
assert.Equal(t, 2, len(records))
assert.Equal(t, "one", records[1].PluginName)
// Should be the sum of multiple calls
delta := 10 * time.Millisecond
rec := records[1].Record - decodeHeadersCost
rec := records[0].Record - decodeHeadersCost
assert.True(t, 270*time.Millisecond-delta < rec && rec < 270*time.Millisecond+delta, rec)
}

0 comments on commit 1617577

Please sign in to comment.