Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add tcp/udp proxy feature #234

Closed
wants to merge 106 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
106 commits
Select commit Hold shift + click to select a range
b02b601
[tcpproxy] first commit
jxd1990 Sep 3, 2021
34fe9db
[tcpproxy] add layer4pipeline and layer4proxy(incomplete)
jxd1990 Sep 3, 2021
b689c5e
[tcpproxy] raw tcp proxy model
jxd1990 Sep 3, 2021
1885b16
[tcpproxy] add some function for tcp proxy model
Sep 4, 2021
05ebe9e
[util] add simple io buffer pool
jxd1990 Sep 7, 2021
831d246
[util] add timerpool(copy from nats-io/nats)
jxd1990 Sep 8, 2021
98ea2f5
[pipeline] layer4pipeline add license
jxd1990 Sep 9, 2021
b5de9a0
[layer4proxy] add read loop and write loop
jxd1990 Sep 9, 2021
391be99
[tcpproxy] create upstream conn
Sep 9, 2021
7431d90
[layer4proxy] resolve cycle dependence
jxd1990 Sep 10, 2021
0d33c87
[layer4proxy] extract connection(30%)
jxd1990 Sep 10, 2021
b0623c4
[layer4proxy] abstract connection operation
jxd1990 Sep 10, 2021
c314759
[layer4proxy] extract connection to client connection and upstream c…
jxd1990 Sep 12, 2021
534956f
[layer4proxy] simplify context methods
jxd1990 Sep 12, 2021
9395ff3
[layer4proxy]almost finish layer4pipeline and layer4context
jxd1990 Sep 13, 2021
c61b7c5
[util] add license for iobuffer(copy from mosn)
jxd1990 Sep 16, 2021
e78a512
[layer4proxy] add udp proxy(70%)
jxd1990 Sep 16, 2021
796e8f4
[connection] fix bug in connection util & add udp session
jxd1990 Sep 17, 2021
01bfd85
[iobufferpool] fix comment
jxd1990 Sep 17, 2021
3037b13
[layer4proxy] complete layer4 proxy method
jxd1990 Sep 17, 2021
d8908d2
[layer4proxy] resolve import cycle
jxd1990 Sep 17, 2021
068a704
[layer4proxy] finish majority layer4 proxy function
jxd1990 Oct 3, 2021
de51f31
[layer4proxy] simplify layer4 server
jxd1990 Oct 12, 2021
e6b8d43
[layer4proxy] extract downstream/upstream write buffer to layer4 context
jxd1990 Oct 12, 2021
f2c1017
[layer4proxy] rollback supervisor modify
jxd1990 Oct 13, 2021
ee0b146
[layer4proxy] rollback traffic controller and protocol package modify
jxd1990 Oct 13, 2021
87027b1
[layer4proxy] cleanup code
jxd1990 Oct 13, 2021
e2ea5bc
[layer4proxy] delete layer4 filter
jxd1990 Oct 13, 2021
48901c7
[layer4proxy] add missing license
jxd1990 Oct 13, 2021
d9293ca
[layer4proxy] change layer4 proxy Category
jxd1990 Oct 14, 2021
b5f3325
[layer4proxy] fix some minor problem
jxd1990 Oct 14, 2021
6defc6b
[layer4proxy] fix tcp connection close bug
jxd1990 Oct 15, 2021
ae9063f
[layer4proxy] fix upstream connect bug
jxd1990 Oct 15, 2021
6e8116b
[tcpproxy] remove udp proxy(something wrong on windows platform)
jxd1990 Oct 18, 2021
af4dc0e
[tcpproxy] remove read enable & fix log
jxd1990 Oct 19, 2021
5f5147f
[tcpproxy] extract connection goWithRecover method
jxd1990 Oct 19, 2021
bfbaa4a
[tcpproxy] fix code warning
jxd1990 Oct 19, 2021
b07d48e
[tcpproxy] fix code warning +1
jxd1990 Oct 19, 2021
23841b0
[tcpproxy] change err EOF variable to ErrEOF
jxd1990 Oct 19, 2021
d133528
[tcpproxy] fix revive warning
jxd1990 Oct 20, 2021
b04282e
[tcpproxy] finish first version of udp proxy
jxd1990 Oct 21, 2021
1cd74d8
[tcpproxy] add missing license
jxd1990 Oct 21, 2021
e158867
[tcpproxy] optimization for no response udp proxy scenes
jxd1990 Oct 21, 2021
b120e1a
[udpproxy] fix udp proxy bug
jxd1990 Oct 21, 2021
615b675
[layer4proxy] update timerpool license
jxd1990 Oct 24, 2021
b14f06d
[layer4proxy] extract ipfilters file in tcpproxy and udpproxy package…
jxd1990 Oct 24, 2021
b5cbd6d
[layer4proxy] change tcpproxy listen function to make code more readable
jxd1990 Oct 24, 2021
0862137
[layer4proxy] fix udp receive bug
jxd1990 Oct 24, 2021
9385a32
Merge branch 'main' into resolve_merge_conflict
jxd134 Oct 24, 2021
3e891e9
[layer4proxy] move layer4 ipfilters to util/ipfilter
jxd1990 Oct 25, 2021
2001239
Update GetReadBuffer function comment
jxd134 Oct 25, 2021
6911fcd
[layer4proxy] fix error modify
jxd1990 Oct 25, 2021
d966d6f
Merge branch 'resolve_merge_conflict' of https://github.com/jxd134/ea…
jxd1990 Oct 25, 2021
ceaaec1
[layer4proxy] update io buffer for tcp/udp proxy
jxd1990 Oct 26, 2021
6124cff
[layer4proxy] fix go.mod import
jxd1990 Oct 26, 2021
baf7e54
[layer4proxy] extract backend servers pool related code to util/layer…
jxd1990 Oct 27, 2021
99ad2f5
[layer4proxy] simplify udp server lifecycle
jxd1990 Oct 27, 2021
6a85708
[layer4proxy] fix udp session close data race bug
jxd1990 Oct 28, 2021
b6f9ea9
[layer4proxy] no need to protect check when connPool close function b…
jxd1990 Oct 28, 2021
2da5a57
[layer4proxy] fix udp proxy buffer bug
jxd1990 Oct 28, 2021
9a6803a
[layer4proxy] optimization udp proxy buffer len setting
jxd1990 Oct 28, 2021
51a0d43
[layer4proxy] simplify backend server spec name
jxd1990 Oct 28, 2021
a0c9290
[udpproxy] rename (downstream/upstream to client/server, oldpool to o…
jxd1990 Oct 30, 2021
3fb339d
[udpproxy] fix bug(closing old rules should happen after new rules ar…
jxd1990 Oct 30, 2021
4670fad
[udpproxy] fix bug(check pool rules before get next server)
jxd1990 Oct 30, 2021
f69f29a
[udpproxy] add comment for Layer4IPFilters
jxd1990 Oct 30, 2021
f8132bb
fix wrong comment
jxd134 Oct 30, 2021
4a29306
Merge branch 'main' into resolve_merge_conflict
jxd134 Oct 30, 2021
c00185f
[tcpproxy] rename downstrean/upstream to client/server in tcpproxy mo…
jxd1990 Oct 30, 2021
f8eee9a
Merge remote-tracking branch 'origin/resolve_merge_conflict' into res…
jxd1990 Oct 30, 2021
e1002b1
[tcpproxy] remove connected param & simplify connect method
jxd1990 Nov 1, 2021
802ce1f
[tcpproxy] simplify tcp connection
jxd1990 Nov 2, 2021
24cebfd
[udpproxy] get udp session by client ip
jxd1990 Nov 2, 2021
9645153
[udpproxy] remove udp server runtime cleanup method(session cleanup i…
jxd1990 Nov 2, 2021
3cd6524
[udpproxy] replace atomic with mutex in session close function
jxd1990 Nov 2, 2021
e3abd99
[layer4proxy] bug fix for udp connection pool
Nov 2, 2021
49df67e
[udpproxy] fix udp connection pool judge bug
jxd1990 Nov 3, 2021
e72fb2f
[tcpproxy] fix tcp client close bug & add comment
jxd1990 Nov 5, 2021
9149981
simplify connection start function
jxd134 Nov 6, 2021
51e77d2
[tcpproxy] remove unnecessary parameters `startOnce`
jxd1990 Nov 6, 2021
3dff752
[tcpproxy] early continue to reduce nesting
jxd1990 Nov 6, 2021
5b7d97b
[tcpproxy] add comment for limit listener connection
jxd1990 Nov 6, 2021
f0d1a61
[tcpproxy] no need to call `atmoic.LoadUint32` for `closed` param whe…
jxd1990 Nov 6, 2021
97d36db
[tcpproxy] refactor `startWriteLoop` function
jxd1990 Nov 6, 2021
ca33400
[layer4backend] pool rules is never empty, no need to check it
jxd1990 Nov 6, 2021
4f494c8
[layer4backend] fix listener stop bug(can not close exist connection)
jxd1990 Nov 6, 2021
a05d0a8
[layer4backend] fix timeout exception check bug
jxd1990 Nov 6, 2021
d311dc7
[tcpproxy] fix tcp bufferpool capacity bug(ref: https://github.com/go…
jxd1990 Nov 7, 2021
d5004a3
[tcpproxy] remove debug log
jxd1990 Nov 7, 2021
8be02cf
[tcpproxy] fix write loop busy loop bug
jxd1990 Nov 8, 2021
c93b12b
[tcpproxy] remove unused param
jxd1990 Nov 8, 2021
1785884
[udpproxy] fix udp runtime not initialized bug
jxd1990 Nov 9, 2021
9b19281
[udpproxy] fix udp session close bug & optimization udp byte buffer
jxd1990 Nov 9, 2021
a995b2d
[tcpproxy] remove unused param in tcp connection and checking
jxd1990 Nov 18, 2021
4c08d07
[tcpproxy] notify read/write loop to exit by connection timeout
jxd1990 Nov 24, 2021
7fb34e1
[tcpproxy] simplify io eof
jxd1990 Nov 24, 2021
98b638a
[tcpproxy] Optimize the read/write exit mechanism
jxd1990 Nov 25, 2021
db3b48d
Merge branch 'resolve_merge_conflict' of https://github.com.cnpmjs.or…
Dec 21, 2021
671d234
[layer4proxy] merge upstream/main code change
Dec 21, 2021
0e07f18
[udpproxy] refactor start session method
jxd1990 Dec 31, 2021
298d6c1
Merge remote-tracking branch 'origin/resolve_merge_conflict' into res…
jxd1990 Dec 31, 2021
af9a795
[util/layer4ipfilters] fix ipfilters create return value
jxd1990 Dec 31, 2021
8edcc7a
[util/layer4ipfilters] fix ipfilters nil check problem
jxd1990 Jan 4, 2022
59dc6e6
[tcpproxy] fix tcp connection close check log
jxd1990 Jan 4, 2022
97b8f1a
[tcpproxy/udpproxy] Add dummy TCP and UDP clients to facilitate revie…
jxd1990 Jan 4, 2022
fc145ec
[tcpproxy/udpproxy] add read/write loop channel to make sure close so…
jxd1990 Jan 7, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions pkg/context/httpcontext.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
stdcontext "context"
"fmt"
"io"
"net"
"net/http"
"runtime/debug"
"strings"
Expand All @@ -42,6 +43,37 @@ type (
// HandlerCaller is a helper function to call the handler
HandlerCaller func(lastResult string) string

// Layer4Context is all context of an TCP processing.
// It is not goroutine-safe, callers must use Lock/Unlock
// to protect it by themselves.
Layer4Context interface {
Lock()
Unlock()

stdcontext.Context
Cancel(err error)
Cancelled() bool
ClientDisconnected() bool

ClientConn() *net.TCPConn

Duration() time.Duration // For log, sample, etc.
OnFinish(func()) // For setting final client statistics, etc.
AddTag(tag string) // For debug, log, etc.

Finish()

Host() string
SetHost(host string)
Port() uint16
SetPort(port uint16)

ClientIP() string

CallNextHandler(lastResult string) string
SetHandlerCaller(caller HandlerCaller)
}

// HTTPContext is all context of an HTTP processing.
// It is not goroutine-safe, callers must use Lock/Unlock
// to protect it by themselves.
Expand Down Expand Up @@ -151,6 +183,32 @@ type (
cancelFunc stdcontext.CancelFunc
err error
}

tcpContext struct {
jxd134 marked this conversation as resolved.
Show resolved Hide resolved
mutex sync.Mutex

startTime *time.Time
endTime *time.Time

tags []string

clientConn *net.TCPConn
backendConn *net.TCPConn
stdctx stdcontext.Context
err error
}

udpContext struct {
mutex sync.Mutex

startTime *time.Time
endTime *time.Time

tags []string

stdctx stdcontext.Context
err error
}
)

// New creates an HTTPContext.
Expand Down Expand Up @@ -336,3 +394,7 @@ func (ctx *httpContext) SaveReqToTemplate(filterName string) error {
func (ctx *httpContext) SaveRspToTemplate(filterName string) error {
return ctx.ht.SaveResponse(filterName, ctx)
}

func (ctx *tcpContext) saveBackendConn(conn *net.TCPConn) {
ctx.backendConn = conn
}
103 changes: 103 additions & 0 deletions pkg/filter/layer4proxy/masterslavereader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright (c) 2017, MegaEase
* All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package layer4proxy

import (
"bytes"
"io"
)

type (
// masterSlaveReader reads bytes to master,
// and synchronize them to slave.
// Currently, only support one slave.
masterSlaveReader struct {
masterReader io.Reader
slaveReader io.Reader
}

masterReader struct {
r io.Reader
buffChan chan []byte
}

slaveReader struct {
unreadBuff *bytes.Buffer
buffChan chan []byte
}
)

func newMasterSlaveReader(r io.Reader) (io.ReadCloser, io.Reader) {
buffChan := make(chan []byte, 10)
mr := &masterReader{
r: r,
buffChan: buffChan,
}
sr := &slaveReader{
unreadBuff: bytes.NewBuffer(nil),
buffChan: buffChan,
}

return mr, sr
}

func (mr *masterReader) Read(p []byte) (n int, err error) {
buff := bytes.NewBuffer(nil)
tee := io.TeeReader(mr.r, buff)
n, err = tee.Read(p)

if n != 0 {
mr.buffChan <- buff.Bytes()
}

if err == io.EOF {
close(mr.buffChan)
}

return n, err
}

func (mr *masterReader) Close() error {
if closer, ok := mr.r.(io.ReadCloser); ok {
return closer.Close()
}

return nil
}

func (sr *slaveReader) Read(p []byte) (int, error) {
buff, ok := <-sr.buffChan

if !ok {
return 0, io.EOF
}

var n int
// NOTE: This if-branch is defensive programming,
// Because the callers of Read of both master and slave
// are the same, so it never happens that len(p) < len(buff).
// else-branch is faster because it is one less copy operation than if-branch.
if sr.unreadBuff.Len() > 0 || len(p) < len(buff) {
sr.unreadBuff.Write(buff)
n, _ = sr.unreadBuff.Read(p)
} else {
n = copy(p, buff)
}

return n, nil
}
163 changes: 163 additions & 0 deletions pkg/filter/layer4proxy/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package layer4proxy

import (
"bufio"
"fmt"
"github.com/google/martian/log"
"github.com/megaease/easegress/pkg/context"
"github.com/megaease/easegress/pkg/logger"
"github.com/megaease/easegress/pkg/supervisor"
"github.com/megaease/easegress/pkg/util/layer4stat"
"github.com/megaease/easegress/pkg/util/memorycache"
"github.com/megaease/easegress/pkg/util/stringtool"
"io"
"net"
"time"
)

type (
protocol string

pool struct {
spec *PoolSpec

tagPrefix string
writeResponse bool

servers *servers
layer4stat *layer4stat.Layer4Stat
memoryCache *memorycache.MemoryCache
}

// PoolSpec describes a pool of servers.
PoolSpec struct {
Protocol protocol `yaml:"protocol" jsonschema:"required" `
SpanName string `yaml:"spanName" jsonschema:"omitempty"`
ServersTags []string `yaml:"serversTags" jsonschema:"omitempty,uniqueItems=true"`
Servers []*Server `yaml:"servers" jsonschema:"omitempty"`
ServiceRegistry string `yaml:"serviceRegistry" jsonschema:"omitempty"`
ServiceName string `yaml:"serviceName" jsonschema:"omitempty"`
LoadBalance *LoadBalance `yaml:"loadBalance" jsonschema:"required"`
MemoryCache *memorycache.Spec `yaml:"memoryCache,omitempty" jsonschema:"omitempty"`
}

// PoolStatus is the status of Pool.
PoolStatus struct {
Stat *layer4stat.Status `yaml:"stat"`
}
)

// Validate validates poolSpec.
func (s PoolSpec) Validate() error {
if s.ServiceName == "" && len(s.Servers) == 0 {
return fmt.Errorf("both serviceName and servers are empty")
}

serversGotWeight := 0
for _, server := range s.Servers {
if server.Weight > 0 {
serversGotWeight++
}
}
if serversGotWeight > 0 && serversGotWeight < len(s.Servers) {
return fmt.Errorf("not all servers have weight(%d/%d)",
serversGotWeight, len(s.Servers))
}

if s.ServiceName == "" {
servers := newStaticServers(s.Servers, s.ServersTags, s.LoadBalance)
if servers.len() == 0 {
return fmt.Errorf("serversTags picks none of servers")
}
}

return nil
}

func newPool(super *supervisor.Supervisor, spec *PoolSpec, tagPrefix string, writeResponse bool) *pool {

var memoryCache *memorycache.MemoryCache
if spec.MemoryCache != nil {
memoryCache = memorycache.New(spec.MemoryCache)
}

return &pool{
spec: spec,

tagPrefix: tagPrefix,
writeResponse: writeResponse,

servers: newServers(super, spec),
layer4stat: layer4stat.New(),
memoryCache: memoryCache,
}
}

func (p *pool) status() *PoolStatus {
s := &PoolStatus{Stat: p.layer4stat.Status()}
return s
}

func (p *pool) handle(ctx context.Layer4Context) string {

addTag := func(subPrefix, msg string) {
tag := stringtool.Cat(p.tagPrefix, "#", subPrefix, ": ", msg)
ctx.Lock()
ctx.AddTag(tag)
ctx.Unlock()
}

server, err := p.servers.next(ctx)
if err != nil {
addTag("serverErr", err.Error())
return resultInternalError
}
addTag("addr", server.HostPort)

rawConn, err := net.DialTimeout("tcp", server.HostPort, 1000*time.Millisecond)
if err != nil {
log.Errorf("dial tcp for addr: % failed, err: %v", server.HostPort, err)
}
backendConn := rawConn.(*net.TCPConn)

defer func(backendConn *net.TCPConn) {
closeErr := backendConn.Close()
if closeErr != nil {
logger.Warnf("close backend conn for %v failed, err: %v", server.HostPort, err)
}
}(backendConn)

errChan := make(chan error)
go p.connCopy(backendConn, ctx.ClientConn(), errChan)
go p.connCopy(ctx.ClientConn(), backendConn, errChan)

err = <-errChan
if err != nil {
logger.Errorf("Error during connection: %v", err)
}

err = <-errChan // TODO export tcp config for backend conn, watch client/backend error

ctx.Lock()
defer ctx.Unlock()
// NOTE: The code below can't use addTag and setStatusCode in case of deadlock.

return ""
}

func (p *pool) close() {
p.servers.close()
}

func (p *pool) connCopy(dst *net.TCPConn, src *net.TCPConn, errCh chan error) {
writer := bufio.NewWriter(dst)
reader := bufio.NewReader(src)
_, err := io.Copy(writer, reader)
_ = writer.Flush() // need flush bytes in buffer
errCh <- err

errClose := dst.CloseWrite()
if errClose != nil {
logger.Debugf("Error while terminating connection: %v", errClose)
}
}
Loading