diff --git a/pkg/expr/cel.go b/pkg/expr/cel.go index 1be601ff..7f044e6c 100644 --- a/pkg/expr/cel.go +++ b/pkg/expr/cel.go @@ -305,8 +305,8 @@ func defineSource() cel.EnvOption { func (s *source) Receive(function string, overload string, args []ref.Val) ref.Val { switch function { case "ip": - ip := pkgRequest.GetRemoteIP(s.callback.StreamInfo()) - return types.String(ip) + addr := s.callback.StreamInfo().DownstreamRemoteParsedAddress() + return types.String(addr.IP) case "address": ipport := s.callback.StreamInfo().DownstreamRemoteAddress() return types.String(ipport) diff --git a/pkg/filtermanager/api/api.go b/pkg/filtermanager/api/api.go index bc8853bb..ddccdcd1 100644 --- a/pkg/filtermanager/api/api.go +++ b/pkg/filtermanager/api/api.go @@ -102,13 +102,55 @@ func (f *PassThroughFilter) EncodeResponse(headers api.ResponseHeaderMap, data a return Continue } +type HeaderMap = api.HeaderMap type RequestHeaderMap = api.RequestHeaderMap type ResponseHeaderMap = api.ResponseHeaderMap +type DataBufferBase = api.DataBufferBase type BufferInstance = api.BufferInstance type RequestTrailerMap = api.RequestTrailerMap type ResponseTrailerMap = api.ResponseTrailerMap -type StreamInfo = api.StreamInfo +type IPAddress struct { + Address string + IP string + Port int +} + +type StreamInfo interface { + GetRouteName() string + FilterChainName() string + // Protocol return the request's protocol. + Protocol() (string, bool) + // ResponseCode return the response code. + ResponseCode() (uint32, bool) + // ResponseCodeDetails return the response code details. + ResponseCodeDetails() (string, bool) + // AttemptCount return the number of times the request was attempted upstream. + AttemptCount() uint32 + // Get the dynamic metadata of the request + DynamicMetadata() DynamicMetadata + // DownstreamLocalAddress return the downstream local address. + DownstreamLocalAddress() string + // DownstreamRemoteAddress return the downstream remote address. + DownstreamRemoteAddress() string + // UpstreamLocalAddress return the upstream local address. + UpstreamLocalAddress() (string, bool) + // UpstreamRemoteAddress return the upstream remote address. + UpstreamRemoteAddress() (string, bool) + // UpstreamClusterName return the upstream host cluster. + UpstreamClusterName() (string, bool) + // FilterState return the filter state interface. + FilterState() FilterState + // VirtualClusterName returns the name of the virtual cluster which got matched + VirtualClusterName() (string, bool) + // WorkerID returns the ID of the Envoy worker thread + WorkerID() uint32 + + // Methods added by HTNN + + // DownstreamRemoteParsedAddress returns the downstream remote address, in the IPAddress struct + DownstreamRemoteParsedAddress() *IPAddress +} type PluginConfig interface { ProtoReflect() protoreflect.Message diff --git a/pkg/filtermanager/filtermanager.go b/pkg/filtermanager/filtermanager.go index b9c519e2..c52667eb 100644 --- a/pkg/filtermanager/filtermanager.go +++ b/pkg/filtermanager/filtermanager.go @@ -18,10 +18,12 @@ import ( "encoding/json" "errors" "fmt" + "net" "reflect" "runtime" "runtime/debug" "sort" + "strconv" "sync" xds "github.com/cncf/xds/go/xds/type/v3" @@ -213,7 +215,35 @@ func (m *filterManager) Reset() { m.canSkipEncodeData = false m.canSkipOnLog = false - m.callbacks.FilterCallbackHandler = nil + m.callbacks.Reset() +} + +type filterManagerStreamInfo struct { + capi.StreamInfo + + ipAddress *api.IPAddress +} + +func (s *filterManagerStreamInfo) DownstreamRemoteParsedAddress() *api.IPAddress { + if s.ipAddress == nil { + ipport := s.StreamInfo.DownstreamRemoteAddress() + // the IPPort given by Envoy must be valid + ip, port, _ := net.SplitHostPort(ipport) + p, _ := strconv.Atoi(port) + s.ipAddress = &api.IPAddress{ + Address: ipport, + IP: ip, + Port: p, + } + } + return s.ipAddress +} + +func (s *filterManagerStreamInfo) DownstreamRemoteAddress() string { + if s.ipAddress != nil { + return s.ipAddress.Address + } + return s.StreamInfo.DownstreamRemoteAddress() } type filterManagerCallbackHandler struct { @@ -221,6 +251,25 @@ type filterManagerCallbackHandler struct { namespace string consumer api.Consumer + + streamInfo *filterManagerStreamInfo +} + +func (cb *filterManagerCallbackHandler) Reset() { + cb.FilterCallbackHandler = nil + // We don't reset namespace, as filterManager will only be reused in the same route, + // which must have the same namespace. + cb.consumer = nil + cb.streamInfo = nil +} + +func (cb *filterManagerCallbackHandler) StreamInfo() api.StreamInfo { + if cb.streamInfo == nil { + cb.streamInfo = &filterManagerStreamInfo{ + StreamInfo: cb.FilterCallbackHandler.StreamInfo(), + } + } + return cb.streamInfo } func (cb *filterManagerCallbackHandler) LookupConsumer(pluginName, key string) (api.Consumer, bool) { diff --git a/pkg/filtermanager/filtermanager_benchmark_test.go b/pkg/filtermanager/filtermanager_benchmark_test.go index c99c45d3..74cc9bc5 100644 --- a/pkg/filtermanager/filtermanager_benchmark_test.go +++ b/pkg/filtermanager/filtermanager_benchmark_test.go @@ -27,7 +27,7 @@ import ( // go test -v -cpu=1 -run=none -bench=. -benchmem -memprofile memprofile.out -cpuprofile cpuprofile.out ./pkg/filtermanager/ func BenchmarkFilterManagerAllPhase(b *testing.B) { - cb := envoy.NewFilterCallbackHandler() + cb := envoy.NewCAPIFilterCallbackHandler() config := initFilterManagerConfig("ns") config.current = []*model.ParsedFilterConfig{ { @@ -72,7 +72,7 @@ func (f *regularFilter) OnLog() { } func BenchmarkFilterManagerRegular(b *testing.B) { - cb := envoy.NewFilterCallbackHandler() + cb := envoy.NewCAPIFilterCallbackHandler() config := initFilterManagerConfig("ns") config.current = []*model.ParsedFilterConfig{ { diff --git a/pkg/filtermanager/filtermanager_test.go b/pkg/filtermanager/filtermanager_test.go index 75be3c32..06a0761a 100644 --- a/pkg/filtermanager/filtermanager_test.go +++ b/pkg/filtermanager/filtermanager_test.go @@ -75,7 +75,7 @@ func TestParse(t *testing.T) { } func TestPassThrough(t *testing.T) { - cb := envoy.NewFilterCallbackHandler() + cb := envoy.NewCAPIFilterCallbackHandler() config := initFilterManagerConfig("ns") config.current = []*model.ParsedFilterConfig{ { @@ -144,7 +144,7 @@ func TestLocalReplyJSON_UseReqHeader(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - cb := envoy.NewFilterCallbackHandler() + cb := envoy.NewCAPIFilterCallbackHandler() config := initFilterManagerConfig("ns") config.current = []*model.ParsedFilterConfig{ { @@ -217,7 +217,7 @@ func TestLocalReplyJSON_UseRespHeader(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - cb := envoy.NewFilterCallbackHandler() + cb := envoy.NewCAPIFilterCallbackHandler() config := initFilterManagerConfig("ns") config.current = []*model.ParsedFilterConfig{ { @@ -253,7 +253,7 @@ func TestLocalReplyJSON_UseRespHeader(t *testing.T) { } func TestLocalReplyJSON_DoNotChangeMsgIfContentTypeIsGiven(t *testing.T) { - cb := envoy.NewFilterCallbackHandler() + cb := envoy.NewCAPIFilterCallbackHandler() config := initFilterManagerConfig("ns") config.current = []*model.ParsedFilterConfig{ { @@ -344,7 +344,7 @@ func (f *addReqFilter) DecodeHeaders(headers api.RequestHeaderMap, endStream boo } func TestFiltersFromConsumer(t *testing.T) { - cb := envoy.NewFilterCallbackHandler() + cb := envoy.NewCAPIFilterCallbackHandler() config := initFilterManagerConfig("ns") config.authnFiltersEndAt = 1 config.current = []*model.ParsedFilterConfig{ diff --git a/pkg/request/stream.go b/pkg/request/stream.go deleted file mode 100644 index d9e758bc..00000000 --- a/pkg/request/stream.go +++ /dev/null @@ -1,28 +0,0 @@ -// Copyright The HTNN Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package request - -import ( - "net" - - "mosn.io/htnn/pkg/filtermanager/api" -) - -func GetRemoteIP(info api.StreamInfo) string { - ipport := info.DownstreamRemoteAddress() - // the IPPort given by Envoy must be valid - ip, _, _ := net.SplitHostPort(ipport) - return ip -} diff --git a/plugins/limit_count_redis/filter.go b/plugins/limit_count_redis/filter.go index d0878f88..be0c75d2 100644 --- a/plugins/limit_count_redis/filter.go +++ b/plugins/limit_count_redis/filter.go @@ -23,7 +23,6 @@ import ( "mosn.io/htnn/pkg/expr" "mosn.io/htnn/pkg/filtermanager/api" - "mosn.io/htnn/pkg/request" "mosn.io/htnn/pkg/stringx" ) @@ -50,10 +49,12 @@ func (f *filter) getKey(script expr.Script, headers api.RequestHeaderMap) string if err == nil { key = res.(string) } + if key == "" { + api.LogInfo("limitCountRedis filter uses client IP as key because the configured key is empty") + } } if key == "" { - api.LogInfo("limitCountRedis filter uses client IP as key because the configured key is empty") - key = request.GetRemoteIP(f.callbacks.StreamInfo()) + key = f.callbacks.StreamInfo().DownstreamRemoteParsedAddress().IP } return key } diff --git a/plugins/limit_req/filter.go b/plugins/limit_req/filter.go index fe1f6520..246f2a35 100644 --- a/plugins/limit_req/filter.go +++ b/plugins/limit_req/filter.go @@ -18,7 +18,6 @@ import ( "time" "mosn.io/htnn/pkg/filtermanager/api" - "mosn.io/htnn/pkg/request" ) func factory(c interface{}, callbacks api.FilterCallbackHandler) api.Filter { @@ -49,11 +48,10 @@ func (f *filter) DecodeHeaders(headers api.RequestHeaderMap, endStream bool) api key = res.(string) if key == "" { api.LogInfo("limitReq uses client IP as key because the configured key is empty") - key = request.GetRemoteIP(f.callbacks.StreamInfo()) } - - } else { - key = request.GetRemoteIP(f.callbacks.StreamInfo()) + } + if key == "" { + key = f.callbacks.StreamInfo().DownstreamRemoteParsedAddress().IP } // Get also extends the ttl diff --git a/plugins/tests/pkg/envoy/capi.go b/plugins/tests/pkg/envoy/capi.go index 1f5cad54..e44ad904 100644 --- a/plugins/tests/pkg/envoy/capi.go +++ b/plugins/tests/pkg/envoy/capi.go @@ -23,27 +23,27 @@ import ( "strconv" "sync" - "github.com/envoyproxy/envoy/contrib/golang/common/go/api" + capi "github.com/envoyproxy/envoy/contrib/golang/common/go/api" - fmapi "mosn.io/htnn/pkg/filtermanager/api" + "mosn.io/htnn/pkg/filtermanager/api" ) func init() { // replace the implementation of methods like api.LogXXX - api.SetCommonCAPI(&capi{}) + capi.SetCommonCAPI(&fakeCapi{}) } -func logInGo(level api.LogType, message string) { +func logInGo(level capi.LogType, message string) { log.Printf("[%s] %s\n", level, message) } -type capi struct{} +type fakeCapi struct{} -func (a *capi) Log(level api.LogType, message string) { +func (a *fakeCapi) Log(level capi.LogType, message string) { logInGo(level, message) } -func (a *capi) LogLevel() api.LogType { +func (a *fakeCapi) LogLevel() capi.LogType { return 0 } @@ -289,7 +289,7 @@ func NewFilterState(data map[string]string) api.FilterState { } } -func (i *FilterState) SetString(key, value string, stateType api.StateType, lifeSpan api.LifeSpan, streamSharing api.StreamSharing) { +func (i *FilterState) SetString(key, value string, stateType capi.StateType, lifeSpan capi.LifeSpan, streamSharing capi.StreamSharing) { i.store[key] = value } @@ -376,6 +376,14 @@ func (i *StreamInfo) WorkerID() uint32 { return 0 } +func (i *StreamInfo) DownstreamRemoteParsedAddress() *api.IPAddress { + return &api.IPAddress{ + Address: "183.128.130.43:54321", + IP: "183.128.130.43", + Port: 54321, + } +} + var _ api.StreamInfo = (*StreamInfo)(nil) type LocalResponse struct { @@ -390,7 +398,7 @@ type filterCallbackHandler struct { streamInfo api.StreamInfo resp LocalResponse - consumer fmapi.Consumer + consumer api.Consumer ch chan struct{} } @@ -416,7 +424,7 @@ func (i *filterCallbackHandler) SetStreamInfo(data api.StreamInfo) { i.streamInfo = data } -func (i *filterCallbackHandler) Continue(status api.StatusType) { +func (i *filterCallbackHandler) Continue(status capi.StatusType) { i.ch <- struct{}{} } @@ -429,7 +437,7 @@ func (i *filterCallbackHandler) SendLocalReply(responseCode int, bodyText string defer i.lock.Unlock() i.resp = LocalResponse{Code: responseCode, Body: bodyText, Headers: headers} - i.Continue(api.LocalReply) + i.Continue(capi.LocalReply) } func (i *filterCallbackHandler) LocalResponse() LocalResponse { @@ -441,11 +449,11 @@ func (i *filterCallbackHandler) LocalResponse() LocalResponse { func (i *filterCallbackHandler) RecoverPanic() { } -func (i *filterCallbackHandler) Log(level api.LogType, msg string) { +func (i *filterCallbackHandler) Log(level capi.LogType, msg string) { logInGo(level, msg) } -func (i *filterCallbackHandler) LogLevel() api.LogType { +func (i *filterCallbackHandler) LogLevel() capi.LogType { return 0 } @@ -453,16 +461,32 @@ func (i *filterCallbackHandler) GetProperty(key string) (string, error) { return "", nil } -func (i *filterCallbackHandler) LookupConsumer(_, _ string) (fmapi.Consumer, bool) { +func (i *filterCallbackHandler) LookupConsumer(_, _ string) (api.Consumer, bool) { return nil, false } -func (i *filterCallbackHandler) GetConsumer() fmapi.Consumer { +func (i *filterCallbackHandler) GetConsumer() api.Consumer { return i.consumer } -func (i *filterCallbackHandler) SetConsumer(c fmapi.Consumer) { +func (i *filterCallbackHandler) SetConsumer(c api.Consumer) { i.consumer = c } var _ api.FilterCallbackHandler = (*filterCallbackHandler)(nil) + +type capiFilterCallbackHandler struct { + *filterCallbackHandler +} + +func (cb *capiFilterCallbackHandler) StreamInfo() capi.StreamInfo { + return cb.filterCallbackHandler.StreamInfo() +} + +var _ capi.FilterCallbackHandler = (*capiFilterCallbackHandler)(nil) + +func NewCAPIFilterCallbackHandler() *capiFilterCallbackHandler { + return &capiFilterCallbackHandler{ + filterCallbackHandler: NewFilterCallbackHandler(), + } +}