Skip to content

Commit

Permalink
Merge pull request #12 from begonia-org/feature/feat-grpc-proxy
Browse files Browse the repository at this point in the history
Feature/feat grpc proxy
  • Loading branch information
geebytes authored May 17, 2024
2 parents c10d57c + 9e0d588 commit 67d6bd8
Show file tree
Hide file tree
Showing 104 changed files with 890 additions and 966 deletions.
5 changes: 3 additions & 2 deletions cmd/gateway/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import (

"github.com/begonia-org/begonia/config"
"github.com/begonia-org/begonia/internal"
"github.com/begonia-org/begonia/transport"
"github.com/spf13/cobra"
"github.com/begonia-org/begonia/gateway"

)

// var ProviderSet = wire.NewSet(NewMasterCmd)
Expand Down Expand Up @@ -42,7 +43,7 @@ func NewGatewayCmd() *cobra.Command {
// name, _ := cmd.Flags().GetString("name")
env, _ := cmd.Flags().GetString("env")
config := config.ReadConfig(env)
worker := internal.New(config, transport.Log, endpoint)
worker := internal.New(config, gateway.Log, endpoint)
worker.Start()

},
Expand Down
1 change: 1 addition & 0 deletions config/settings.yml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ gateway:
cors:
- "localhost"
- "127.0.0.1:8081"
- "example.com"
plugins:
local:
logger: 1
Expand Down
4 changes: 2 additions & 2 deletions transport/endpoint.go → gateway/endpoint.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package transport
package gateway

import (
"context"
Expand Down Expand Up @@ -50,7 +50,7 @@ func (e *httpForwardGrpcEndpointImpl) Request(req GrpcRequest) (proto.Message, r
in := req.GetIn()
ctx := req.GetContext()

err = conn.Invoke(ctx, req.GetFullMethodName(), in, out,grpc.Header(&metadata.HeaderMD),grpc.Trailer(&metadata.TrailerMD))
err = conn.Invoke(ctx, req.GetFullMethodName(), in, out, grpc.Header(&metadata.HeaderMD), grpc.Trailer(&metadata.TrailerMD))
return out, metadata, err

}
Expand Down
7 changes: 3 additions & 4 deletions transport/endpoint_test.go → gateway/endpoint_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
package transport

package gateway

import (
"context"
Expand All @@ -21,7 +20,7 @@ func TestRequest(t *testing.T) {
// Out: &v1.HelloReply{},
// Ctx: context.Background(),
// }
request:=NewGrpcRequest(context.Background(),nil,nil,"helloworld.Greeter/SayHello",WithIn(&v1.HelloRequest{Msg: "begonia"}),WithOut(&v1.HelloReply{}))
request := NewGrpcRequest(context.Background(), nil, nil, "helloworld.Greeter/SayHello", WithIn(&v1.HelloRequest{Msg: "begonia"}), WithOut(&v1.HelloReply{}))
pool := NewGrpcConnPool("127.0.0.1:12138")
endpoint := NewEndpoint(pool)
reply, metadata, err := endpoint.Request(request)
Expand All @@ -32,4 +31,4 @@ func TestRequest(t *testing.T) {
c.So(metadata, c.ShouldNotBeNil)
})

}
}
21 changes: 15 additions & 6 deletions transport/gateway.go → gateway/gateway.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package transport
package gateway

import (
"context"
Expand All @@ -17,8 +17,6 @@ import (
"google.golang.org/grpc"
)



type GrpcServerOptions struct {
Middlewares []GrpcProxyMiddleware
Options []grpc.ServerOption
Expand All @@ -41,7 +39,6 @@ type GatewayServer struct {
mux *sync.Mutex
}


func NewGrpcServer(opts *GrpcServerOptions, lb *GrpcLoadBalancer) *grpc.Server {

proxy := NewGrpcProxy(lb, opts.Middlewares...)
Expand All @@ -57,7 +54,6 @@ func NewHttpServer(addr string, poolOpt ...loadbalance.PoolOptionsBuildOption) (

return NewHttpEndpoint(endpoint)


}
func NewGateway(cfg *GatewayConfig, opts *GrpcServerOptions) *GatewayServer {
lb := NewGrpcLoadBalancer()
Expand Down Expand Up @@ -101,7 +97,7 @@ func (g *GatewayServer) DeleteLocalService(pd ProtobufDescription) {
g.mux.Lock()
defer g.mux.Unlock()
g.proxyLB.Delete(pd)
_= g.DeleteHandlerClient(context.Background(), pd)
_ = g.DeleteHandlerClient(context.Background(), pd)
}
func (g *GatewayServer) GetLoadbalanceName() loadbalance.BalanceType {
return g.proxyLB.Name()
Expand All @@ -115,6 +111,19 @@ func (g *GatewayServer) Start() {
g.grpcServer.ServeHTTP(w, r)
} else {
g.gatewayMux.ServeHTTP(w, r)

// var handler = func(h http.Handler) http.Handler {
// return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// g.gatewayMux.ServeHTTP(w, r)
// })
// }
// var httpHandle http.Handler
// for _, h := range g.opts.HttpHandlers {
// // handler = h(handler)
// // handler=h(handler)
// // httpHandle =
// }
// handler.ServeHTTP(w, r)
}
}), &http2.Server{})

Expand Down
2 changes: 1 addition & 1 deletion transport/grpc.go → gateway/grpc.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package transport
package gateway

import (
"context"
Expand Down
28 changes: 16 additions & 12 deletions transport/http.go → gateway/http.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package transport
package gateway

import (
"bytes"
Expand All @@ -15,6 +15,7 @@ import (
"github.com/gorilla/websocket"
"github.com/grpc-ecosystem/grpc-gateway/v2/runtime"
"github.com/grpc-ecosystem/grpc-gateway/v2/utilities"
"github.com/spark-lence/tiga"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
Expand Down Expand Up @@ -306,7 +307,7 @@ func (h *HttpEndpointImpl) inParamsHandle(params map[string]string, req *http.Re
field := fields.ByName(protoreflect.Name(param))
if field == nil {
// Log.Errorf("inParamsHandle no parameter %s", param)
return status.Errorf(codes.InvalidArgument, "no parameter %s", param)
return status.Errorf(codes.InvalidArgument, "no such parameter %s", param)
}
in.Set(field, protoreflect.ValueOfString(msg))

Expand Down Expand Up @@ -379,7 +380,7 @@ func (h *HttpEndpointImpl) newRequest(ctx context.Context, item *HttpEndpointIte
for _, param := range item.PathParams {
val, ok = pathParams[param]
if !ok {
return nil, status.Errorf(codes.InvalidArgument, "missing parameter %s", param)
continue
}
msg, err := runtime.String(val)
if err != nil {
Expand All @@ -388,9 +389,12 @@ func (h *HttpEndpointImpl) newRequest(ctx context.Context, item *HttpEndpointIte
fields := in.Descriptor().Fields()
field := fields.ByName(protoreflect.Name(param))
if field == nil {
return nil, status.Errorf(codes.InvalidArgument, "no parameter %s", param)
continue
}
if err := tiga.SetFieldValueFromString(in, field, msg); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "set field value error: %v", err)

}
in.Set(field, protoreflect.ValueOfString(msg))

}
query := req.URL.Query()
Expand All @@ -401,10 +405,14 @@ func (h *HttpEndpointImpl) newRequest(ctx context.Context, item *HttpEndpointIte
}
fields := in.Descriptor().Fields()
field := fields.ByName(protoreflect.Name(k))

if field == nil {
return nil, status.Errorf(codes.InvalidArgument, "no parameter %s", k)
continue
}

if err := tiga.SetFieldValueFromString(in, field, msg); err != nil {
return nil, status.Errorf(codes.InvalidArgument, "set field value error: %v", err)
}
in.Set(field, protoreflect.ValueOfString(msg))
}
grpcReq := NewGrpcRequest(ctx, item.In, item.Out, item.FullMethodName, WithIn(in), WithOut(dynamicpb.NewMessage(item.Out)))
return grpcReq, nil
Expand Down Expand Up @@ -470,11 +478,7 @@ func (h *HttpEndpointImpl) RegisterHandlerClient(ctx context.Context, pd Protobu
req.Header.Set(GatewayXParams, strings.Join(params, ","))

}
// annotatedContext, err = runtime.AnnotateIncomingContext(ctx, mux, req, item.FullMethodName, runtime.WithHTTPPathPattern(item.HttpUri))
// if err != nil {
// runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
// return
// }

annotatedContext, err = runtime.AnnotateContext(ctx, mux, req, item.FullMethodName, runtime.WithHTTPPathPattern(item.HttpUri))
if err != nil {
runtime.HTTPError(ctx, mux, outboundMarshaler, w, req, err)
Expand Down
Loading

0 comments on commit 67d6bd8

Please sign in to comment.