Skip to content

Commit

Permalink
Merge branch 'main' into feat/sentinel-plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
WeixinX committed Oct 6, 2024
2 parents 068114c + 65fdf19 commit 5a96fb9
Show file tree
Hide file tree
Showing 45 changed files with 377 additions and 167 deletions.
16 changes: 15 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@ jobs:
envoy_version:
- 1.29
- 1.31
- dev
env:
ENVOY_API_VERSION: ${{ matrix.envoy_version }}
# patch version should not contain API breaking changes, so we just pick the first one
FULL_ENVOY_VERSION: ${{ matrix.envoy_version }}.0
PROXY_IMAGE: envoyproxy/envoy:contrib-v${{ matrix.envoy_version }}.0
defaults:
run:
Expand All @@ -47,6 +47,18 @@ jobs:
cache-dependency-path: "**/*.sum"
- name: Choose the Envoy API
run: |
FULL_ENVOY_VERSION=${ENVOY_API_VERSION}.0
if [[ $ENVOY_API_VERSION == dev ]]; then
# update this once there are more breaking changes
FULL_ENVOY_VERSION=1.31.1-0.20240909145059-353737786a7f
# This is the envoy:contrib-dev image pull in 2024-09-25.
# Use docker inspect --format='{{index .RepoDigests 0}}' envoyproxy/envoy:contrib-dev to get the sha256 ID.
# We don't use the envoy:contrib-dev tag directly because it will be rewritten by the latest commit and
# our test suite uses IfPresent policy to pull image.
# We don't use the CI to catch the breaking change from the upstream so far.
export PROXY_IMAGE=envoyproxy/envoy@sha256:845c392c1f128a00c49439d129ca9f2c12ff8748aeb42c8aa2b3b8240d6e0d5b
echo PROXY_IMAGE=$PROXY_IMAGE >> $GITHUB_ENV
fi
pushd ..
./patch/switch-envoy-go-version.sh ${FULL_ENVOY_VERSION}
popd
Expand Down Expand Up @@ -233,6 +245,8 @@ jobs:
./api-module-test-cover-1.29/cover_integration.out,
./api-module-test-cover-1.31/cover.out,
./api-module-test-cover-1.31/cover_integration.out,
./api-module-test-cover-dev/cover.out,
./api-module-test-cover-dev/cover_integration.out,
./types-module-test-cover/cover.out,
./plugins-unit-test-cover/cover.out,
./plugins-integration-test-cover/cover.out,
Expand Down
2 changes: 1 addition & 1 deletion api/pkg/filtermanager/api/result_action.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ type ResultAction interface {
}

type isResultAction struct {
typeid int // we need to add a field, otherwises Go will optimize all `&isResultAction{}` to same address.
typeid int // we need to add a field, otherwise Go will optimize all `&isResultAction{}` to same address.
}

func (i *isResultAction) OK() {}
Expand Down
10 changes: 4 additions & 6 deletions api/pkg/filtermanager/api_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func TestPluginState(t *testing.T) {
Factory: getPluginStateFilterFactory,
},
}
m := FilterManagerFactory(config, cb).(*filterManager)
m := FilterManagerFactory(config, cb)
h := http.Header{}
hdr := envoy.NewRequestHeaderMap(h)
m.DecodeHeaders(hdr, true)
Expand All @@ -99,8 +99,6 @@ type accessCacheFieldsFilter struct {
}

func (f *accessCacheFieldsFilter) do(headers api.RequestHeaderMap) api.ResultAction {
// Maybe we can relax the concurreny requirement for header modification?
// Update headers in OnLog is meaningless. Anyway, add lock for now.
headers.Set("Cookie", "k=v")
p := headers.URL().Path
headers.Add("Cookie", fmt.Sprintf("k=%s", p))
Expand Down Expand Up @@ -143,11 +141,11 @@ func TestAccessCacheFieldsConcurrently(t *testing.T) {
for i := 0; i < n; i++ {
go func(i int) {
cb := envoy.NewCAPIFilterCallbackHandler()
m := FilterManagerFactory(config, cb).(*filterManager)
m := unwrapFilterManager(FilterManagerFactory(config, cb))
h := http.Header{}
hdr := envoy.NewRequestHeaderMap(h)
m.DecodeHeaders(hdr, true)
m.OnLog()
m.OnLog(hdr, nil, nil, nil)
wg.Done()
}(i)
}
Expand Down Expand Up @@ -223,7 +221,7 @@ func TestLogWithArgs(t *testing.T) {
Factory: testLogFilterFactory,
},
}
m := FilterManagerFactory(config, cb).(*filterManager)
m := FilterManagerFactory(config, cb)
h := http.Header{}
hdr := envoy.NewRequestHeaderMap(h)
m.DecodeHeaders(hdr, true)
Expand Down
45 changes: 12 additions & 33 deletions api/pkg/filtermanager/filtermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,13 @@ type filterManager struct {
decodeRequestNeeded bool
decodeIdx int
reqHdr api.RequestHeaderMap // don't access it in Encode phases
contentType string

encodeResponseNeeded bool
encodeIdx int
rspHdr api.ResponseHeaderMap

runningInGoThread atomic.Int32
hdrLock sync.Mutex // FIXME: remove this once we get request headers from the OnLog directly
hdrLock sync.Mutex

// use a group of bools instead of map to avoid lookup
canSkipDecodeHeaders bool
Expand All @@ -67,7 +66,6 @@ func (m *filterManager) Reset() {
m.decodeRequestNeeded = false
m.decodeIdx = -1
m.reqHdr = nil
m.contentType = ""

m.encodeResponseNeeded = false
m.encodeIdx = -1
Expand Down Expand Up @@ -226,7 +224,7 @@ func FilterManagerFactory(c interface{}, cb capi.FilterCallbackHandler) (streamF
fm.canSkipEncodeData = fm.canSkipMethod["EncodeData"] && fm.canSkipMethod["EncodeResponse"]
fm.canSkipOnLog = fm.canSkipMethod["OnLog"]

return fm
return wrapFilterManager(fm)
}

func (m *filterManager) recordLocalReplyPluginName(name string) {
Expand All @@ -248,7 +246,8 @@ func (m *filterManager) handleAction(res api.ResultAction, phase phase, filter *
} else if phase == phaseEncodeHeaders {
m.encodeResponseNeeded = true
} else {
api.LogErrorf("WaitAllData only allowed when processing headers, phase: %v", phase)
api.LogErrorf("WaitAllData only allowed when processing headers, phase: %v. "+
" In the mean time, use DecodeRequest / EncodeResponse instead of DecodeData / EncodeData to handle fully buffered body.", phase)
}
return false
}
Expand Down Expand Up @@ -295,10 +294,8 @@ func (m *filterManager) localReply(v *api.LocalResponse, decoding bool) {
if ct == "application/json" {
isJSON = true
}
} else {
// use the Content-Type header passed by the client, not the header
// provided by the gateway if have.
ct = m.contentType
} else if decoding {
ct, _ = m.reqHdr.Get("content-type")
if ct == "" || ct == "application/json" {
isJSON = true
}
Expand All @@ -325,11 +322,9 @@ func (m *filterManager) localReply(v *api.LocalResponse, decoding bool) {
}

func (m *filterManager) DecodeHeaders(headers capi.RequestHeaderMap, endStream bool) capi.StatusType {
m.contentType, _ = headers.Get("content-type")

// Ensure the headers are cached on the Go side.
// FIXME: remove this once we support OnLog phase headers in Envoy Go.
if m.DebugModeEnabled() {
if !supportGettingHeadersOnLog && m.DebugModeEnabled() {
// Ensure the headers are cached on the Go side.
headers.Get("test")
headers := &filterManagerRequestHeaderMap{
RequestHeaderMap: headers,
}
Expand Down Expand Up @@ -612,9 +607,8 @@ func (m *filterManager) DecodeData(buf capi.BufferInstance, endStream bool) capi
}

func (m *filterManager) EncodeHeaders(headers capi.ResponseHeaderMap, endStream bool) capi.StatusType {
// Ensure the headers are cached on the Go side.
// FIXME: remove this once we support OnLog phase headers in Envoy Go.
if m.DebugModeEnabled() {
if !supportGettingHeadersOnLog && m.DebugModeEnabled() {
// Ensure the headers are cached on the Go side.
headers.Get("test")
m.rspHdr = headers
}
Expand Down Expand Up @@ -745,27 +739,12 @@ func (m *filterManager) EncodeData(buf capi.BufferInstance, endStream bool) capi

// TODO: handle trailers

func (m *filterManager) OnLog() {
if m.canSkipOnLog {
return
}

func (m *filterManager) runOnLogPhase(reqHdr api.RequestHeaderMap, rspHdr api.ResponseHeaderMap) {
// It is unsafe to access the f.callbacks in the goroutine, as the underlying request
// may be destroyed when the goroutine is running. So if people want to do some IO jobs,
// they need to copy the used data from the request to the Go side before kicking off
// the goroutine.
var reqHdr api.RequestHeaderMap
m.hdrLock.Lock()
reqHdr = m.reqHdr
m.hdrLock.Unlock()
var rspHdr api.ResponseHeaderMap
m.hdrLock.Lock()
rspHdr = m.rspHdr
m.hdrLock.Unlock()

for _, f := range m.filters {
// TODO: the cached headers passed here is not precise. We need to get the real one via
// Envoy Go API. But it is not supported yet.
f.OnLog(reqHdr, nil, rspHdr, nil)
}

Expand Down
12 changes: 6 additions & 6 deletions api/pkg/filtermanager/filtermanager_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func BenchmarkFilterManagerAllPhase(b *testing.B) {
respBuf := envoy.NewBufferInstance([]byte{})

for n := 0; n < b.N; n++ {
m := FilterManagerFactory(config, cb)
m := unwrapFilterManager(FilterManagerFactory(config, cb))
m.DecodeHeaders(reqHdr, false)
cb.WaitContinued()
m.DecodeData(reqBuf, true)
Expand All @@ -53,7 +53,7 @@ func BenchmarkFilterManagerAllPhase(b *testing.B) {
cb.WaitContinued()
m.EncodeData(respBuf, true)
cb.WaitContinued()
m.OnLog()
m.OnLog(reqHdr, nil, respHdr, nil)
}
}

Expand Down Expand Up @@ -88,10 +88,10 @@ func BenchmarkFilterManagerRegular(b *testing.B) {
reqHdr := envoy.NewRequestHeaderMap(http.Header{})

for n := 0; n < b.N; n++ {
m := FilterManagerFactory(config, cb)
m := unwrapFilterManager(FilterManagerFactory(config, cb))
m.DecodeHeaders(reqHdr, false)
cb.WaitContinued()
m.OnLog()
m.OnLog(reqHdr, nil, nil, nil)
}
}

Expand Down Expand Up @@ -129,9 +129,9 @@ func BenchmarkFilterManagerConsumerWithFilter(b *testing.B) {
}

for n := 0; n < b.N; n++ {
m := FilterManagerFactory(config, cb)
m := unwrapFilterManager(FilterManagerFactory(config, cb))
m.DecodeHeaders(reqHdrs[n%num], false)
cb.WaitContinued()
m.OnLog()
m.OnLog(reqHdrs[n%num], nil, nil, nil)
}
}
54 changes: 54 additions & 0 deletions api/pkg/filtermanager/filtermanager_dev.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
// Copyright The HTNN Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build envoydev

package filtermanager

import (
capi "github.com/envoyproxy/envoy/contrib/golang/common/go/api"
)

const (
supportGettingHeadersOnLog = true
)

func (m *filterManager) OnLog(reqHdr capi.RequestHeaderMap, _ capi.RequestTrailerMap, rspHdr capi.ResponseHeaderMap, _ capi.ResponseTrailerMap) {
if m.canSkipOnLog {
return
}

m.hdrLock.Lock()
if m.reqHdr == nil {
m.reqHdr = &filterManagerRequestHeaderMap{
RequestHeaderMap: reqHdr,
}
} else {
// In our benchmark BenchmarkFilterManagerRegular, reuse the request header wrapper is 5% faster than create a new one,
// even the reusage requires holding the lock though it is running on fast path.
h, _ := m.reqHdr.(*filterManagerRequestHeaderMap)
h.RequestHeaderMap = reqHdr
}
m.hdrLock.Unlock()
m.runOnLogPhase(m.reqHdr, rspHdr)
}

func wrapFilterManager(fm *filterManager) capi.StreamFilter {
return fm
}

// This method is test only
func unwrapFilterManager(wrapper capi.StreamFilter) *filterManager {
return wrapper.(*filterManager)
}
64 changes: 64 additions & 0 deletions api/pkg/filtermanager/filtermanager_latest.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright The HTNN Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build !envoydev

package filtermanager

import (
capi "github.com/envoyproxy/envoy/contrib/golang/common/go/api"

"mosn.io/htnn/api/pkg/filtermanager/api"
)

const (
supportGettingHeadersOnLog = false
)

func (m *filterManager) OnLog(_ capi.RequestHeaderMap, _ capi.RequestTrailerMap, _ capi.ResponseHeaderMap, _ capi.ResponseTrailerMap) {
if m.canSkipOnLog {
return
}

var reqHdr api.RequestHeaderMap
m.hdrLock.Lock()
reqHdr = m.reqHdr
m.hdrLock.Unlock()
var rspHdr api.ResponseHeaderMap
m.hdrLock.Lock()
rspHdr = m.rspHdr
m.hdrLock.Unlock()

m.runOnLogPhase(reqHdr, rspHdr)
}

type filterManagerWrapper struct {
*filterManager
}

func (w *filterManagerWrapper) OnLog() {
w.filterManager.OnLog(nil, nil, nil, nil)
}

// we will get rid of this wrapper once Envoy 1.32 is released

func wrapFilterManager(fm *filterManager) capi.StreamFilter {
return &filterManagerWrapper{fm}
}

// This method is test only
func unwrapFilterManager(wrapper capi.StreamFilter) *filterManager {
fmw, _ := wrapper.(*filterManagerWrapper)
return fmw.filterManager
}
Loading

0 comments on commit 5a96fb9

Please sign in to comment.