Skip to content

Commit

Permalink
cache parsed downstream remote address (#283)
Browse files Browse the repository at this point in the history
Signed-off-by: spacewander <[email protected]>
  • Loading branch information
spacewander authored Feb 11, 2024
1 parent c6cfd61 commit 1be8274
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 63 deletions.
4 changes: 2 additions & 2 deletions pkg/expr/cel.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,8 @@ func defineSource() cel.EnvOption {
func (s *source) Receive(function string, overload string, args []ref.Val) ref.Val {
switch function {
case "ip":
ip := pkgRequest.GetRemoteIP(s.callback.StreamInfo())
return types.String(ip)
addr := s.callback.StreamInfo().DownstreamRemoteParsedAddress()
return types.String(addr.IP)
case "address":
ipport := s.callback.StreamInfo().DownstreamRemoteAddress()
return types.String(ipport)
Expand Down
44 changes: 43 additions & 1 deletion pkg/filtermanager/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,13 +102,55 @@ func (f *PassThroughFilter) EncodeResponse(headers api.ResponseHeaderMap, data a
return Continue
}

type HeaderMap = api.HeaderMap
type RequestHeaderMap = api.RequestHeaderMap
type ResponseHeaderMap = api.ResponseHeaderMap
type DataBufferBase = api.DataBufferBase
type BufferInstance = api.BufferInstance
type RequestTrailerMap = api.RequestTrailerMap
type ResponseTrailerMap = api.ResponseTrailerMap

type StreamInfo = api.StreamInfo
type IPAddress struct {
Address string
IP string
Port int
}

type StreamInfo interface {
GetRouteName() string
FilterChainName() string
// Protocol return the request's protocol.
Protocol() (string, bool)
// ResponseCode return the response code.
ResponseCode() (uint32, bool)
// ResponseCodeDetails return the response code details.
ResponseCodeDetails() (string, bool)
// AttemptCount return the number of times the request was attempted upstream.
AttemptCount() uint32
// Get the dynamic metadata of the request
DynamicMetadata() DynamicMetadata
// DownstreamLocalAddress return the downstream local address.
DownstreamLocalAddress() string
// DownstreamRemoteAddress return the downstream remote address.
DownstreamRemoteAddress() string
// UpstreamLocalAddress return the upstream local address.
UpstreamLocalAddress() (string, bool)
// UpstreamRemoteAddress return the upstream remote address.
UpstreamRemoteAddress() (string, bool)
// UpstreamClusterName return the upstream host cluster.
UpstreamClusterName() (string, bool)
// FilterState return the filter state interface.
FilterState() FilterState
// VirtualClusterName returns the name of the virtual cluster which got matched
VirtualClusterName() (string, bool)
// WorkerID returns the ID of the Envoy worker thread
WorkerID() uint32

// Methods added by HTNN

// DownstreamRemoteParsedAddress returns the downstream remote address, in the IPAddress struct
DownstreamRemoteParsedAddress() *IPAddress
}

type PluginConfig interface {
ProtoReflect() protoreflect.Message
Expand Down
51 changes: 50 additions & 1 deletion pkg/filtermanager/filtermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,12 @@ import (
"encoding/json"
"errors"
"fmt"
"net"
"reflect"
"runtime"
"runtime/debug"
"sort"
"strconv"
"sync"

xds "github.com/cncf/xds/go/xds/type/v3"
Expand Down Expand Up @@ -213,14 +215,61 @@ func (m *filterManager) Reset() {
m.canSkipEncodeData = false
m.canSkipOnLog = false

m.callbacks.FilterCallbackHandler = nil
m.callbacks.Reset()
}

type filterManagerStreamInfo struct {
capi.StreamInfo

ipAddress *api.IPAddress
}

func (s *filterManagerStreamInfo) DownstreamRemoteParsedAddress() *api.IPAddress {
if s.ipAddress == nil {
ipport := s.StreamInfo.DownstreamRemoteAddress()
// the IPPort given by Envoy must be valid
ip, port, _ := net.SplitHostPort(ipport)
p, _ := strconv.Atoi(port)
s.ipAddress = &api.IPAddress{
Address: ipport,
IP: ip,
Port: p,
}
}
return s.ipAddress
}

func (s *filterManagerStreamInfo) DownstreamRemoteAddress() string {
if s.ipAddress != nil {
return s.ipAddress.Address
}
return s.StreamInfo.DownstreamRemoteAddress()
}

type filterManagerCallbackHandler struct {
capi.FilterCallbackHandler

namespace string
consumer api.Consumer

streamInfo *filterManagerStreamInfo
}

func (cb *filterManagerCallbackHandler) Reset() {
cb.FilterCallbackHandler = nil
// We don't reset namespace, as filterManager will only be reused in the same route,
// which must have the same namespace.
cb.consumer = nil
cb.streamInfo = nil
}

func (cb *filterManagerCallbackHandler) StreamInfo() api.StreamInfo {
if cb.streamInfo == nil {
cb.streamInfo = &filterManagerStreamInfo{
StreamInfo: cb.FilterCallbackHandler.StreamInfo(),
}
}
return cb.streamInfo
}

func (cb *filterManagerCallbackHandler) LookupConsumer(pluginName, key string) (api.Consumer, bool) {
Expand Down
4 changes: 2 additions & 2 deletions pkg/filtermanager/filtermanager_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
// go test -v -cpu=1 -run=none -bench=. -benchmem -memprofile memprofile.out -cpuprofile cpuprofile.out ./pkg/filtermanager/

func BenchmarkFilterManagerAllPhase(b *testing.B) {
cb := envoy.NewFilterCallbackHandler()
cb := envoy.NewCAPIFilterCallbackHandler()
config := initFilterManagerConfig("ns")
config.current = []*model.ParsedFilterConfig{
{
Expand Down Expand Up @@ -72,7 +72,7 @@ func (f *regularFilter) OnLog() {
}

func BenchmarkFilterManagerRegular(b *testing.B) {
cb := envoy.NewFilterCallbackHandler()
cb := envoy.NewCAPIFilterCallbackHandler()
config := initFilterManagerConfig("ns")
config.current = []*model.ParsedFilterConfig{
{
Expand Down
10 changes: 5 additions & 5 deletions pkg/filtermanager/filtermanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestParse(t *testing.T) {
}

func TestPassThrough(t *testing.T) {
cb := envoy.NewFilterCallbackHandler()
cb := envoy.NewCAPIFilterCallbackHandler()
config := initFilterManagerConfig("ns")
config.current = []*model.ParsedFilterConfig{
{
Expand Down Expand Up @@ -144,7 +144,7 @@ func TestLocalReplyJSON_UseReqHeader(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cb := envoy.NewFilterCallbackHandler()
cb := envoy.NewCAPIFilterCallbackHandler()
config := initFilterManagerConfig("ns")
config.current = []*model.ParsedFilterConfig{
{
Expand Down Expand Up @@ -217,7 +217,7 @@ func TestLocalReplyJSON_UseRespHeader(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cb := envoy.NewFilterCallbackHandler()
cb := envoy.NewCAPIFilterCallbackHandler()
config := initFilterManagerConfig("ns")
config.current = []*model.ParsedFilterConfig{
{
Expand Down Expand Up @@ -253,7 +253,7 @@ func TestLocalReplyJSON_UseRespHeader(t *testing.T) {
}

func TestLocalReplyJSON_DoNotChangeMsgIfContentTypeIsGiven(t *testing.T) {
cb := envoy.NewFilterCallbackHandler()
cb := envoy.NewCAPIFilterCallbackHandler()
config := initFilterManagerConfig("ns")
config.current = []*model.ParsedFilterConfig{
{
Expand Down Expand Up @@ -344,7 +344,7 @@ func (f *addReqFilter) DecodeHeaders(headers api.RequestHeaderMap, endStream boo
}

func TestFiltersFromConsumer(t *testing.T) {
cb := envoy.NewFilterCallbackHandler()
cb := envoy.NewCAPIFilterCallbackHandler()
config := initFilterManagerConfig("ns")
config.authnFiltersEndAt = 1
config.current = []*model.ParsedFilterConfig{
Expand Down
28 changes: 0 additions & 28 deletions pkg/request/stream.go

This file was deleted.

7 changes: 4 additions & 3 deletions plugins/limit_count_redis/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import (

"mosn.io/htnn/pkg/expr"
"mosn.io/htnn/pkg/filtermanager/api"
"mosn.io/htnn/pkg/request"
"mosn.io/htnn/pkg/stringx"
)

Expand All @@ -50,10 +49,12 @@ func (f *filter) getKey(script expr.Script, headers api.RequestHeaderMap) string
if err == nil {
key = res.(string)
}
if key == "" {
api.LogInfo("limitCountRedis filter uses client IP as key because the configured key is empty")
}
}
if key == "" {
api.LogInfo("limitCountRedis filter uses client IP as key because the configured key is empty")
key = request.GetRemoteIP(f.callbacks.StreamInfo())
key = f.callbacks.StreamInfo().DownstreamRemoteParsedAddress().IP
}
return key
}
Expand Down
8 changes: 3 additions & 5 deletions plugins/limit_req/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"time"

"mosn.io/htnn/pkg/filtermanager/api"
"mosn.io/htnn/pkg/request"
)

func factory(c interface{}, callbacks api.FilterCallbackHandler) api.Filter {
Expand Down Expand Up @@ -49,11 +48,10 @@ func (f *filter) DecodeHeaders(headers api.RequestHeaderMap, endStream bool) api
key = res.(string)
if key == "" {
api.LogInfo("limitReq uses client IP as key because the configured key is empty")
key = request.GetRemoteIP(f.callbacks.StreamInfo())
}

} else {
key = request.GetRemoteIP(f.callbacks.StreamInfo())
}
if key == "" {
key = f.callbacks.StreamInfo().DownstreamRemoteParsedAddress().IP
}

// Get also extends the ttl
Expand Down
Loading

0 comments on commit 1be8274

Please sign in to comment.