Skip to content

Commit

Permalink
big API overhaul; some cleanup in examples, too
Browse files Browse the repository at this point in the history
  • Loading branch information
jhump committed Oct 5, 2023
1 parent 19f28a5 commit 81e97dc
Show file tree
Hide file tree
Showing 27 changed files with 2,259 additions and 2,264 deletions.
1 change: 1 addition & 0 deletions .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ linters-settings:
- ok bool
- op *operation
- op Operation
- h *Handler
- t *ttStream
- rw *responseWriter
linters:
Expand Down
4 changes: 4 additions & 0 deletions buf.gen.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,7 @@ plugins:
- plugin: buf.build/connectrpc/go:v1.11.0
out: internal/gen
opt: paths=source_relative
# gRPC generated code is used by vanguardgrpc examples
- plugin: buf.build/grpc/go:v1.3.0
out: internal/gen
opt: paths=source_relative
12 changes: 11 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,27 @@ module connectrpc.com/vanguard
go 1.19

require (
connectrpc.com/connect v1.11.0
buf.build/gen/go/connectrpc/eliza/connectrpc/go v1.11.1-20230822171018-8b8b971d6fde.1
buf.build/gen/go/connectrpc/eliza/grpc/go v1.3.0-20230822171018-8b8b971d6fde.1
buf.build/gen/go/connectrpc/eliza/protocolbuffers/go v1.31.0-20230822171018-8b8b971d6fde.1
connectrpc.com/connect v1.11.1
connectrpc.com/grpcreflect v1.2.0
github.com/google/go-cmp v0.5.9
github.com/stretchr/testify v1.8.4
golang.org/x/net v0.12.0
golang.org/x/sync v0.3.0
google.golang.org/genproto/googleapis/api v0.0.0-20230803162519-f966b187b2e5
google.golang.org/genproto/googleapis/rpc v0.0.0-20230807174057-1744710a1577
google.golang.org/grpc v1.58.2
google.golang.org/protobuf v1.31.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/sys v0.10.0 // indirect
golang.org/x/text v0.11.0 // indirect
google.golang.org/genproto v0.0.0-20230807174057-1744710a1577 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
1,124 changes: 1,121 additions & 3 deletions go.sum

Large diffs are not rendered by default.

7 changes: 0 additions & 7 deletions go.work

This file was deleted.

5 changes: 0 additions & 5 deletions go.work.sum

This file was deleted.

88 changes: 52 additions & 36 deletions handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,35 +24,42 @@ import (
"net/url"
"strconv"
"strings"
"sync"
"time"

"connectrpc.com/connect"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/reflect/protoreflect"
)

// Handler is a Vanguard handler which acts like a router and a middleware. It transforms
// all supported input protocols (Connect, gRPC, gRPC-Web, REST) into a protocol that the
// service handlers support. It can do simple routing based on RPC method name, for simple
// protocols like Connect, gRPC, and gRPC-Web; but it can also route based on REST-ful URI
// paths configured with HTTP transcoding annotations.
type Handler struct {
bufferPool bufferPool
codecs codecMap
compressors compressionMap
methods map[string]*methodConfig
restRoutes routeTrie
unknownHandler http.Handler
defaultHooks *Hooks
}

// ServeHTTP implements http.Handler.
func (m *Mux) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
op := m.newOperation(writer, request)
err := op.validate(m, m.codecs)

useUnknownHandler := m.UnknownHandler != nil && errors.Is(err, errNotFound)
var callback func(context.Context, Operation) (Hooks, error)
if op.methodConf != nil {
callback = op.methodConf.hooksCallback
} else {
callback = m.HooksCallback
}
if callback != nil {
var hookErr error
if op.hooks, hookErr = callback(op.request.Context(), op); hookErr != nil {
useUnknownHandler = false
func (h *Handler) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
op := h.newOperation(writer, request)
err := op.validate(h)

if op.hooks.OnClientRequestHeaders != nil {
if hookErr := op.hooks.OnClientRequestHeaders(op.request.Context(), op, op.request.Header); hookErr != nil {
err = hookErr
}
}
if useUnknownHandler {
if h.unknownHandler != nil && errors.Is(err, errNotFound) {
request.Header = op.originalHeaders // restore headers, just in case initialization removed keys
m.UnknownHandler.ServeHTTP(writer, request)
h.unknownHandler.ServeHTTP(writer, request)
return
}

Expand All @@ -75,15 +82,18 @@ func (m *Mux) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
op.handle()
}

func (m *Mux) newOperation(writer http.ResponseWriter, request *http.Request) *operation {
func (h *Handler) newOperation(writer http.ResponseWriter, request *http.Request) *operation {
ctx, cancel := context.WithCancel(request.Context())
request = request.WithContext(ctx)
op := &operation{
writer: writer,
request: request,
cancel: cancel,
bufferPool: &m.bufferPool,
compressors: m.compressors,
bufferPool: &h.bufferPool,
compressors: h.compressors,
}
if h.defaultHooks != nil {
op.hooks = *h.defaultHooks
}
op.requestLine.fromRequest(request)
return op
Expand Down Expand Up @@ -299,7 +309,7 @@ func (o *operation) HandlerInfo() PeerInfo {

func (o *operation) doNotImplement() {}

func (o *operation) validate(mux *Mux, codecs codecMap) error {
func (o *operation) validate(h *Handler) error {
// Identify the protocol.
clientProtoHandler, queryVars := classifyRequest(o.request)
if clientProtoHandler == nil {
Expand All @@ -316,10 +326,13 @@ func (o *operation) validate(mux *Mux, codecs codecMap) error {
o.request.ContentLength = -1 // transforming it will likely change it

// Identify the method being invoked.
err := o.resolveMethod(mux)
err := o.resolveMethod(h)
if err != nil {
return err
}
if o.methodConf.hooks != nil {
o.hooks = *o.methodConf.hooks
}
if !o.client.protocol.acceptsStreamType(o, o.methodConf.streamType) {
return newHTTPError(http.StatusUnsupportedMediaType, "stream type %s not supported with %s protocol", o.methodConf.streamType, o.client.protocol)
}
Expand Down Expand Up @@ -360,7 +373,7 @@ func (o *operation) validate(mux *Mux, codecs codecMap) error {
return newHTTPError(http.StatusUnsupportedMediaType, "%q compression not supported", reqMeta.compression)
}
}
o.client.codec = codecs.get(reqMeta.codec, o.methodConf.resolver)
o.client.codec = h.codecs.get(reqMeta.codec, o.methodConf.resolver)
if o.client.codec == nil {
return newHTTPError(http.StatusUnsupportedMediaType, "%q sub-format not supported", reqMeta.codec)
}
Expand Down Expand Up @@ -392,11 +405,11 @@ func (o *operation) validate(mux *Mux, codecs codecMap) error {
// NB: This is fine to set even if a custom content-type is used via
// the use of google.api.HttpBody. The actual content-type and body
// data will be written via serverBodyPreparer implementation.
o.server.codec = codecs.get(CodecJSON, o.methodConf.resolver)
o.server.codec = h.codecs.get(CodecJSON, o.methodConf.resolver)
} else if _, supportsCodec := o.methodConf.codecNames[reqMeta.codec]; supportsCodec {
o.server.codec = o.client.codec
} else {
o.server.codec = codecs.get(o.methodConf.preferredCodec, o.methodConf.resolver)
o.server.codec = h.codecs.get(o.methodConf.preferredCodec, o.methodConf.resolver)
}

if reqMeta.compression != "" {
Expand All @@ -418,14 +431,7 @@ func (o *operation) queryValues() url.Values {
return o.queryVars
}

func (o *operation) handle() { //nolint:gocyclo
if o.hooks.OnClientRequestHeaders != nil {
if err := o.hooks.OnClientRequestHeaders(o.request.Context(), o, o.request.Header); err != nil {
o.reportError(err)
return
}
}

func (o *operation) handle() {
o.clientEnveloper, _ = o.client.protocol.(envelopedProtocolHandler)
o.clientPreparer, _ = o.client.protocol.(clientBodyPreparer)
if o.clientPreparer != nil {
Expand Down Expand Up @@ -551,12 +557,12 @@ func (o *operation) handle() { //nolint:gocyclo
o.methodConf.handler.ServeHTTP(o.writer, o.request)
}

func (o *operation) resolveMethod(mux *Mux) error {
func (o *operation) resolveMethod(h *Handler) error {
uriPath := o.request.URL.Path
switch o.client.protocol.protocol() {
case ProtocolREST:
var methods routeMethods
o.restTarget, o.restVars, methods = mux.restRoutes.match(uriPath, o.request.Method)
o.restTarget, o.restVars, methods = h.restRoutes.match(uriPath, o.request.Method)
if o.restTarget != nil {
o.methodConf = o.restTarget.config
return nil
Expand All @@ -578,7 +584,7 @@ func (o *operation) resolveMethod(mux *Mux) error {
},
}
default:
methodConf := mux.methods[uriPath]
methodConf := h.methods[uriPath]
if methodConf == nil {
// TODO: if the service is known, but the method is not, we should send to the client
// a proper RPC error (encoded per protocol handler) with an Unimplemented code.
Expand Down Expand Up @@ -739,6 +745,7 @@ type envelopingReader struct {
rw *responseWriter
r io.ReadCloser

mu sync.Mutex
err error
current io.Reader
mustReleaseCurrent bool
Expand All @@ -747,6 +754,8 @@ type envelopingReader struct {
}

func (r *envelopingReader) Read(data []byte) (n int, err error) {
r.mu.Lock()
defer r.mu.Unlock()
if r.err != nil {
return 0, r.err
}
Expand Down Expand Up @@ -786,6 +795,8 @@ func (r *envelopingReader) Read(data []byte) (n int, err error) {
}

func (r *envelopingReader) Close() error {
r.mu.Lock()
defer r.mu.Unlock()
if r.mustReleaseCurrent {
buf, ok := r.current.(*bytes.Buffer)
if ok {
Expand Down Expand Up @@ -860,6 +871,7 @@ type transformingReader struct {
msg *message
r io.ReadCloser

mu sync.Mutex
consumedFirst bool
err error
buffer *bytes.Buffer
Expand All @@ -868,6 +880,8 @@ type transformingReader struct {
}

func (r *transformingReader) Read(data []byte) (n int, err error) {
r.mu.Lock()
defer r.mu.Unlock()
if r.err != nil {
return 0, r.err
}
Expand Down Expand Up @@ -925,6 +939,8 @@ func (r *transformingReader) Read(data []byte) (n int, err error) {
}

func (r *transformingReader) Close() error {
r.mu.Lock()
defer r.mu.Unlock()
r.err = errors.New("body is closed")
r.msg.release(r.rw.op.bufferPool)
return r.r.Close()
Expand Down
Loading

0 comments on commit 81e97dc

Please sign in to comment.