Skip to content

Commit

Permalink
Merge pull request #14 from begonia-org/feature/feat-grpc-proxy
Browse files Browse the repository at this point in the history
Feature/feat grpc proxy
  • Loading branch information
geebytes authored May 22, 2024
2 parents 7f0e5bf + 375cefd commit 172ecc6
Show file tree
Hide file tree
Showing 35 changed files with 1,219 additions and 2,589 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@
*.so
*.dylib
*.cov
*.log
test.log
protos/
protos
# Test binary, built with `go test -c`
*.test

Expand Down
3 changes: 0 additions & 3 deletions gateway/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,6 @@ func (g *GrpcLoadBalancer) Select(method string, args ...interface{}) (loadbalan
return nil, loadbalance.ErrNoEndpoint
}


type GrpcProxyMiddleware func(srv interface{}, serverStream grpc.ServerStream) error
type GrpcProxy struct {
lb *GrpcLoadBalancer
Expand Down Expand Up @@ -157,7 +156,6 @@ func (g *GrpcProxy) getXForward(ctx context.Context) []string {
return md.Get("X-Forwarded-For")
}


func (g *GrpcProxy) Handler(srv interface{}, serverStream grpc.ServerStream) error {

// 执行中间件
Expand Down Expand Up @@ -220,7 +218,6 @@ func (g *GrpcProxy) Handler(srv interface{}, serverStream grpc.ServerStream) err
s2cErrChan := g.forwardServerToClient(serverStream, clientStream)
// 从服务端到客户端
c2sErrChan := g.forwardClientToServer(clientStream, serverStream)

for i := 0; i < 2; i++ {
select {
case s2cErr := <-s2cErrChan:
Expand Down
179 changes: 179 additions & 0 deletions gateway/grpc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
package gateway

import (
"context"
"fmt"
"io"
"math/rand"
"net"
"os"
"path/filepath"
"runtime"
"strings"
"testing"
"time"

"github.com/agiledragon/gomonkey/v2"
loadbalance "github.com/begonia-org/go-loadbalancer"
api "github.com/begonia-org/go-sdk/api/endpoint/v1"
hello "github.com/begonia-org/go-sdk/api/example/v1"
"github.com/begonia-org/go-sdk/example"
c "github.com/smartystreets/goconvey/convey"

"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
)

type streamMock struct {
}
type clientStreamMock struct{}

func (*streamMock) SendHeader(md metadata.MD) error {
return nil
}
func (*streamMock) SetHeader(md metadata.MD) error {
return nil
}
func (*streamMock) SetTrailer(md metadata.MD) {
}
func (*streamMock) Context() context.Context {
return context.Background()
}
func (*streamMock) SendMsg(m interface{}) error {
return nil
}
func (*streamMock) RecvMsg(m interface{}) error {
return nil
}
func (*clientStreamMock) SendMsg(m interface{}) error {
return nil
}

func (*clientStreamMock) RecvMsg(m interface{}) error {
return nil

}
func (*clientStreamMock) CloseSend() error {
return nil
}
func (*clientStreamMock) Header() (metadata.MD, error) {
return metadata.MD{}, nil
}
func (*clientStreamMock) Trailer() metadata.MD {
return metadata.MD{}
}
func (*clientStreamMock) Context() context.Context {
return context.Background()
}
func (*clientStreamMock) SendHeader(md metadata.MD) error {
return nil
}

func TestGrpcHandleErr(t *testing.T) {
c.Convey("test grpc handle err", t, func() {
_, filename, _, _ := runtime.Caller(0)
pbFile := filepath.Join(filepath.Dir(filepath.Dir(filename)), "testdata", "helloworld.pb")
pb, err := os.ReadFile(pbFile)
c.So(err, c.ShouldBeNil)
pd, err := NewDescriptionFromBinary(pb, filepath.Join("tmp", "test-pd"))
c.So(err, c.ShouldBeNil)
rander := rand.New(rand.NewSource(time.Now().Unix())) // 初始化随机数种子
min := 1949
max := 12138
randomNumber := rander.Intn(max-min+1) + min
helloAddr := fmt.Sprintf("127.0.0.1:%d", randomNumber+2)
go example.Run(helloAddr)
time.Sleep(2 * time.Second)
endps, err := NewLoadBalanceEndpoint(loadbalance.RRBalanceType, []*api.EndpointMeta{{
Addr: helloAddr,
Weight: 0,
}})
c.So(err, c.ShouldBeNil)

load, _ := loadbalance.New(loadbalance.RRBalanceType, endps)
lb := NewGrpcLoadBalancer()
lb.Register(load, pd)
mid := func(srv interface{}, serverStream grpc.ServerStream) error {
return nil
}
proxy := NewGrpcProxy(lb, mid)
stream := &streamMock{}
patch := gomonkey.ApplyFuncReturn(grpc.MethodFromServerStream, strings.ToUpper("/helloworld.Greeter/SayHelloWebsocket"), true)
patch.ApplyFuncReturn(grpc.NewClientStream, &clientStreamMock{}, nil)
addrs, _ := net.InterfaceAddrs()
var localAddr net.Addr
for _, addr := range addrs {
// 检查地址类型和如果是IP地址我们就打印它
if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipnet.IP.To4() != nil {
// fmt.Println(ipnet.IP.String())
localAddr = addr
break
}
}
}
patch.ApplyFuncReturn(peer.FromContext, &peer.Peer{Addr: localAddr}, true)
defer patch.Reset()

cases := []struct {
patch interface{}
err error
output []interface{}
}{
{
patch: mid,
err: fmt.Errorf("mid handle err"),
output: []interface{}{fmt.Errorf("mid handle err")},
},
{
patch: (*clientStreamMock).CloseSend,
err: fmt.Errorf("close send err"),
output: []interface{}{fmt.Errorf("close send err")},
},
{
patch: (*clientStreamMock).Header,
err: fmt.Errorf("header err"),
output: []interface{}{nil, fmt.Errorf("header err")},
},
{
patch: (*streamMock).SendHeader,
err: fmt.Errorf("send header err"),
output: []interface{}{fmt.Errorf("send header err")},
},
{
patch: (*streamMock).SendMsg,
err: fmt.Errorf("send msg err"),
output: []interface{}{fmt.Errorf("send msg err")},
},
}

patch3 := gomonkey.ApplyFunc((*clientStreamMock).SendMsg, func(_ *clientStreamMock, m interface{}) error {
time.Sleep(3 * time.Second)
return io.EOF
})
defer patch3.Reset()
for _, caseV := range cases {
patch2 := gomonkey.ApplyFuncReturn(caseV.patch, caseV.output...)
defer patch2.Reset()
err = proxy.Handler(&hello.HelloRequest{}, stream)
c.So(err, c.ShouldNotBeNil)
c.So(err.Error(), c.ShouldContainSubstring, caseV.err.Error())
patch2.Reset()
}
patch3.Reset()

errChan2 := make(chan error, 3)

errChan2 <- io.EOF
errChan2 <- io.EOF
patch4 := gomonkey.ApplyFuncReturn((*GrpcProxy).forwardServerToClient, errChan2)
err = proxy.Handler(&hello.HelloRequest{}, stream)
c.So(err, c.ShouldNotBeNil)
c.So(err.Error(), c.ShouldContainSubstring, "proxying should never reach")
defer patch4.Reset()
patch.Reset()
patch4.Reset()

})
}
6 changes: 5 additions & 1 deletion gateway/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,11 @@ func testRegisterClient(t *testing.T) {
go example.Run(helloAddr)
time.Sleep(2 * time.Second)
go gw.Start()
time.Sleep(1 * time.Second)
f:=func(){
gw.Start()
}
c.So(f,c.ShouldPanic)
time.Sleep(4 * time.Second)
_, err = gw.proxyLB.Select("test/.test")
c.So(err, c.ShouldNotBeNil)
Expand Down Expand Up @@ -1077,7 +1082,6 @@ func testUpdateLoadbalance(t *testing.T) {
}
func TestHttp(t *testing.T) {
t.Run("testRegisterClient", testRegisterClient)

t.Run("testRequestGet", testRequestGet)
t.Run("testCors", testCors)
t.Run("testRequestPost", testRequestPost)
Expand Down
32 changes: 0 additions & 32 deletions gateway/protos/Makefile

This file was deleted.

Binary file removed gateway/protos/desc.pb
Binary file not shown.
Loading

0 comments on commit 172ecc6

Please sign in to comment.