diff --git a/api/pkg/filtermanager/filtermanager.go b/api/pkg/filtermanager/filtermanager.go index 0544a4ab..0c075935 100644 --- a/api/pkg/filtermanager/filtermanager.go +++ b/api/pkg/filtermanager/filtermanager.go @@ -235,13 +235,14 @@ func FilterManagerFactory(c interface{}, cb capi.FilterCallbackHandler) (streamF fm.canSkipEncodeTrailers = fm.canSkipMethods["EncodeTrailers"] && fm.canSkipMethods["EncodeResponse"] fm.canSkipOnLog = fm.canSkipMethods["OnLog"] - // Similar to the skip check + // Similar to the skip check, but the canSyncRun check is more granular as + // it will consider if the request/response is fully buffered. fm.canSyncRunDecodeHeaders = fm.canSyncRunMethods["DecodeHeaders"] && fm.canSyncRunMethods["DecodeRequest"] && fm.config.initOnce == nil - fm.canSyncRunDecodeData = fm.canSyncRunMethods["DecodeData"] && fm.canSyncRunMethods["DecodeRequest"] - fm.canSyncRunDecodeTrailers = fm.canSyncRunMethods["DecodeTrailers"] && fm.canSyncRunMethods["DecodeRequest"] - fm.canSyncRunEncodeHeaders = fm.canSyncRunMethods["EncodeHeaders"] - fm.canSyncRunEncodeData = fm.canSyncRunMethods["EncodeData"] && fm.canSyncRunMethods["EncodeResponse"] - fm.canSyncRunEncodeTrailers = fm.canSyncRunMethods["EncodeTrailers"] && fm.canSyncRunMethods["EncodeResponse"] + fm.canSyncRunDecodeData = fm.canSyncRunMethods["DecodeData"] + fm.canSyncRunDecodeTrailers = fm.canSyncRunMethods["DecodeTrailers"] + fm.canSyncRunEncodeHeaders = fm.canSyncRunMethods["EncodeHeaders"] && fm.canSyncRunMethods["EncodeResponse"] + fm.canSyncRunEncodeData = fm.canSyncRunMethods["EncodeData"] + fm.canSyncRunEncodeTrailers = fm.canSyncRunMethods["EncodeTrailers"] return wrapFilterManager(fm) } @@ -483,13 +484,13 @@ func (m *filterManager) decodeHeaders(headers capi.RequestHeaderMap, endStream b m.canSkipEncodeTrailers = m.canSkipEncodeTrailers && canSkipMethods["EncodeTrailers"] && canSkipMethods["EncodeResponse"] m.canSkipOnLog = m.canSkipOnLog && canSkipMethods["OnLog"] - // Similar to the skip check canSyncRunMethods := c.CanSyncRunMethod - m.canSyncRunDecodeData = m.canSyncRunDecodeData && canSyncRunMethods["DecodeData"] && canSyncRunMethods["DecodeRequest"] - m.canSyncRunDecodeTrailers = m.canSyncRunDecodeTrailers && canSyncRunMethods["DecodeTrailers"] && canSyncRunMethods["DecodeRequest"] - m.canSyncRunEncodeHeaders = m.canSyncRunEncodeData && canSyncRunMethods["EncodeHeaders"] - m.canSyncRunEncodeData = m.canSyncRunEncodeData && canSyncRunMethods["EncodeData"] && canSyncRunMethods["EncodeResponse"] - m.canSyncRunEncodeTrailers = m.canSyncRunEncodeTrailers && canSyncRunMethods["EncodeTrailers"] && canSyncRunMethods["EncodeResponse"] + m.canSyncRunDecodeHeaders = m.canSyncRunDecodeHeaders && canSyncRunMethods["DecodeHeaders"] && canSyncRunMethods["DecodeRequest"] + m.canSyncRunDecodeData = m.canSyncRunDecodeData && canSyncRunMethods["DecodeData"] + m.canSyncRunDecodeTrailers = m.canSyncRunDecodeTrailers && canSyncRunMethods["DecodeTrailers"] + m.canSyncRunEncodeHeaders = m.canSyncRunEncodeHeaders && canSyncRunMethods["EncodeHeaders"] && canSyncRunMethods["EncodeResponse"] + m.canSyncRunEncodeData = m.canSyncRunEncodeData && canSyncRunMethods["EncodeData"] + m.canSyncRunEncodeTrailers = m.canSyncRunEncodeTrailers && canSyncRunMethods["EncodeTrailers"] // TODO: add field to control if merging is allowed i := 0 @@ -642,7 +643,7 @@ func (m *filterManager) DecodeData(buf capi.BufferInstance, endStream bool) capi return capi.Continue } - if m.canSyncRunDecodeData { + if m.canSyncRunDecodeData && (m.decodeIdx == -1 || (m.canSyncRunDecodeHeaders && m.canSyncRunDecodeTrailers)) { return m.decodeData(buf, endStream) } @@ -706,7 +707,7 @@ func (m *filterManager) DecodeTrailers(trailers capi.RequestTrailerMap) capi.Sta return capi.Continue } - if m.canSyncRunDecodeTrailers { + if m.canSyncRunDecodeTrailers && (m.decodeIdx == -1 || (m.canSyncRunDecodeHeaders && m.canSyncRunDecodeTrailers)) { return m.decodeTrailers(trailers) } @@ -894,7 +895,7 @@ func (m *filterManager) EncodeData(buf capi.BufferInstance, endStream bool) capi return capi.Continue } - if m.canSyncRunEncodeData { + if m.canSyncRunEncodeData && (m.encodeIdx == -1 || (m.canSyncRunEncodeHeaders && m.canSyncRunEncodeTrailers)) { return m.encodeData(buf, endStream) } @@ -944,7 +945,7 @@ func (m *filterManager) EncodeTrailers(trailers capi.ResponseTrailerMap) capi.St return capi.Continue } - if m.canSyncRunEncodeTrailers { + if m.canSyncRunEncodeTrailers && (m.encodeIdx == -1 || (m.canSyncRunEncodeHeaders && m.canSyncRunEncodeData)) { return m.encodeTrailers(trailers) } diff --git a/api/pkg/filtermanager/filtermanager_test.go b/api/pkg/filtermanager/filtermanager_test.go index 2ba2ea94..59c6f107 100644 --- a/api/pkg/filtermanager/filtermanager_test.go +++ b/api/pkg/filtermanager/filtermanager_test.go @@ -23,6 +23,7 @@ import ( "testing" "github.com/agiledragon/gomonkey/v2" + capi "github.com/envoyproxy/envoy/contrib/golang/common/go/api" "github.com/stretchr/testify/assert" internalConsumer "mosn.io/htnn/api/internal/consumer" @@ -512,6 +513,7 @@ func TestFiltersFromConsumer(t *testing.T) { assert.Equal(t, 3, len(m.filters)) } assert.Equal(t, true, m.canSyncRunEncodeHeaders) + assert.Equal(t, false, m.canSyncRunDecodeTrailers) _, ok := hdr.Get("x-htnn-route") assert.False(t, ok) @@ -727,3 +729,107 @@ func TestSyncRunWhenThereAreMultiFilters(t *testing.T) { assert.Equal(t, true, m.canSyncRunEncodeTrailers) } } + +func TestSyncRunWhenProcessingBufferedData(t *testing.T) { + cb := envoy.NewCAPIFilterCallbackHandler() + config := initFilterManagerConfig("ns") + config.parsed = []*model.ParsedFilterConfig{ + { + Name: "add_req", + Factory: addReqFactory, + ParsedConfig: addReqConf{ + hdrName: "x-htnn-route", + }, + SyncRunPhases: api.PhaseDecodeTrailers, + }, + { + Name: "access_field_on_log", + Factory: accessFieldOnLogFactory, + SyncRunPhases: api.AllPhases, + }, + } + + m := unwrapFilterManager(FilterManagerFactory(config, cb)) + assert.Equal(t, false, m.canSyncRunDecodeHeaders) + assert.Equal(t, true, m.canSyncRunDecodeData) + assert.Equal(t, true, m.canSyncRunDecodeTrailers) + assert.Equal(t, true, m.canSyncRunEncodeHeaders) + assert.Equal(t, true, m.canSyncRunEncodeData) + assert.Equal(t, true, m.canSyncRunEncodeTrailers) + + m.decodeIdx = -1 // simulate processing request streamingly + buf := envoy.NewBufferInstance([]byte{}) + res := m.DecodeData(buf, false) + assert.Equal(t, capi.Continue, res) + reqTrailer := envoy.NewRequestTrailerMap(http.Header{}) + res = m.DecodeTrailers(reqTrailer) + assert.Equal(t, capi.Continue, res) + + m.encodeIdx = -1 // simulate processing response streamingly + res = m.EncodeData(buf, false) + assert.Equal(t, capi.Continue, res) + rspTrailer := envoy.NewResponseTrailerMap(http.Header{}) + res = m.EncodeTrailers(rspTrailer) + assert.Equal(t, capi.Continue, res) + + m.decodeIdx = 0 // simulate processing buffered request + res = m.DecodeData(buf, false) + assert.Equal(t, capi.Running, res) + cb.WaitContinued() + res = m.DecodeTrailers(reqTrailer) + assert.Equal(t, capi.Running, res) + cb.WaitContinued() + + m.encodeIdx = 0 // simulate processing buffered response + res = m.EncodeData(buf, false) + assert.Equal(t, capi.Continue, res) + res = m.EncodeTrailers(rspTrailer) + assert.Equal(t, capi.Continue, res) +} + +func TestSyncRunWhenProcessingBufferedDataWithFiltersFromConsumer(t *testing.T) { + config := initFilterManagerConfig("ns") + config.consumerFiltersEndAt = 1 + + consumers := map[string]*internalConsumer.Consumer{} + c := internalConsumer.Consumer{ + FilterConfigs: map[string]*model.ParsedFilterConfig{ + "2_add_req": { + Name: "2_add_req", + Factory: addReqFactory, + ParsedConfig: addReqConf{}, + SyncRunPhases: api.PhaseDecodeTrailers, + }, + }, + } + consumers["0"] = &c + config.parsed = []*model.ParsedFilterConfig{ + { + Name: "1_set_consumer", + Factory: setConsumerFactory, + ParsedConfig: setConsumerConf{ + Consumers: consumers, + }, + SyncRunPhases: api.PhaseDecodeHeaders, + }, + } + + cb := envoy.NewCAPIFilterCallbackHandler() + m := unwrapFilterManager(FilterManagerFactory(config, cb)) + assert.Equal(t, true, m.canSyncRunDecodeHeaders) + assert.Equal(t, true, m.canSyncRunDecodeTrailers) + + h := http.Header{} + h.Add("consumer", "0") + hdr := envoy.NewRequestHeaderMap(h) + res := m.DecodeHeaders(hdr, false) + assert.Equal(t, capi.Continue, res) + assert.Equal(t, false, m.canSyncRunDecodeHeaders) + assert.Equal(t, true, m.canSyncRunDecodeTrailers) + + m.decodeIdx = 0 // simulate processing buffered request + reqTrailer := envoy.NewRequestTrailerMap(http.Header{}) + res = m.DecodeTrailers(reqTrailer) + assert.Equal(t, capi.Running, res) + cb.WaitContinued() +}