Skip to content

Commit

Permalink
Add tests for proxy configuration (#105)
Browse files Browse the repository at this point in the history
Add test coverage for use of `httputil.ReverseProxy`. Currently on
`Read` when we return a non `io.EOF` error it will cause the handler to
panic which will incorrectly bubble up to the handler without encoding
the error. Now we first report the error on prepare message before
erroring the read with `io.EOF`.

Issues: 
- httputil suppress a panic
[here](https://cs.opensource.google/go/go/+/master:src/net/http/httputil/reverseproxy.go;l=513)
unless the server context key is set. This was a problem with how the
rest tests the handler directly not the server implemenation. Add the
key and ensure we handle the panic condition gracefully.
- http2 transport errors on reading the body:
https://cs.opensource.google/go/x/net/+/refs/tags/v0.18.0:http2/transport.go;l=1326
aborts the stream with the connect error given in the read. Should
instead return `io.EOF` and cancel the context.
  • Loading branch information
emcfarlane authored Mar 8, 2024
1 parent 2ccfb59 commit d8f80b8
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 11 deletions.
7 changes: 4 additions & 3 deletions transcoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,8 @@ func (r *transformingReader) Read(data []byte) (n int, err error) {
}
if err := r.prepareMessage(); err != nil {
r.err = err
return 0, err
r.rw.reportError(err)
return 0, io.EOF
}
}
}
Expand Down Expand Up @@ -1024,7 +1025,7 @@ func (w *responseWriter) Header() http.Header {
return w.delegate.Header()
}

func (w *responseWriter) Write(data []byte) (int, error) {
func (w *responseWriter) Write(data []byte) (n int, err error) {
if !w.headersWritten {
w.WriteHeader(http.StatusOK)
}
Expand Down Expand Up @@ -1540,7 +1541,7 @@ type transformingWriter struct {
latestEnvelope envelope
}

func (w *transformingWriter) Write(data []byte) (int, error) {
func (w *transformingWriter) Write(data []byte) (n int, err error) {
if w.err != nil {
return 0, w.err
}
Expand Down
67 changes: 61 additions & 6 deletions vanguard_restxrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ package vanguard

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
Expand Down Expand Up @@ -71,11 +73,18 @@ func TestMux_RESTxRPC(t *testing.T) {
connect.WithInterceptors(&interceptor),
))

server := httptest.NewServer(serveMux)
t.Cleanup(server.Close)
serverURL, err := url.Parse(server.URL)
require.NoError(t, err)
proxy := httputil.NewSingleHostReverseProxy(serverURL)
proxy.Transport = server.Client().Transport

type testMux struct {
name string
handler http.Handler
}
makeMux := func(protocol Protocol, codec, compression string) testMux {
wrapHandler := func(protocol Protocol, codec, compression string, handler http.Handler) http.Handler {
opts := []ServiceOption{
WithTargetProtocols(protocol),
WithTargetCodecs(codec),
Expand All @@ -85,22 +94,28 @@ func TestMux_RESTxRPC(t *testing.T) {
} else {
opts = append(opts, WithNoTargetCompression())
}
svcHandler := protocolAssertMiddleware(protocol, codec, compression, serveMux)
name := fmt.Sprintf("%s_%s_%s", protocol, codec, compression)
svcHandler := protocolAssertMiddleware(protocol, codec, compression, handler)

services := make([]*Service, len(serviceNames))
for i, svcName := range serviceNames {
services[i] = NewService(svcName, svcHandler, opts...)
}
handler, err := NewTranscoder(services)
require.NoError(t, err)
return testMux{name: name, handler: handler}
return handler
}
var muxes []testMux
for _, protocol := range protocols {
for _, codec := range codecs {
for _, compression := range compressions {
muxes = append(muxes, makeMux(protocol, codec, compression))
muxes = append(muxes, testMux{
name: fmt.Sprintf("%s_%s_%s", protocol, codec, compression),
handler: wrapHandler(protocol, codec, compression, serveMux),
})
muxes = append(muxes, testMux{
name: fmt.Sprintf("proxy/%s_%s_%s", protocol, codec, compression),
handler: wrapHandler(protocol, codec, compression, proxy),
})
}
}
}
Expand Down Expand Up @@ -366,6 +381,28 @@ func TestMux_RESTxRPC(t *testing.T) {
}
]`,
},
}, {
// Checks errors on decoding the request body.
name: "GetCheckout-Error",
input: input{
method: http.MethodGet,
path: "/v2/checkouts/nan", // invalid ID
body: nil,
meta: http.Header{
"Message": []string{"hello"},
},
},
stream: testStream{
method: testv1connect.LibraryServiceGetBookProcedure,
msgs: []testMsg{},
},
output: output{
code: http.StatusBadRequest,
body: &status.Status{
Code: int32(connect.CodeInvalidArgument),
Message: "invalid parameter \"id\" invalid character 'a' in literal null (expecting 'u')",
},
},
}, {
name: "Index",
input: input{
Expand Down Expand Up @@ -514,7 +551,25 @@ func TestMux_RESTxRPC(t *testing.T) {
t.Log("req:", string(debug))

rsp := httptest.NewRecorder()
opts.mux.handler.ServeHTTP(rsp, req)

func() {
// Capture http.ErrAbortHandler panics.
defer func() {
if recovered := recover(); recovered != nil {
if err, ok := recovered.(error); ok {
if errors.Is(err, http.ErrAbortHandler) {
return
}
}
t.Error("unexpected panic:", recovered)
}
}()
// Inject http.Server into the request context to convince
// httputil.ReverseProxy we are in a server context.
svr := &http.Server{} //nolint:gosec // dummy server for testing
req = req.WithContext(context.WithValue(req.Context(), http.ServerContextKey, svr))
opts.mux.handler.ServeHTTP(rsp, req)
}()

result := rsp.Result()
defer result.Body.Close()
Expand Down
14 changes: 14 additions & 0 deletions vanguard_rpcxrest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"net/http/httputil"
"net/url"
"testing"

"connectrpc.com/connect"
Expand Down Expand Up @@ -102,6 +104,18 @@ func TestMux_RPCxREST(t *testing.T) {
for _, compression := range compressions {
servers = append(servers, makeServer(compression))
}
for _, server := range servers {
serverURL, err := url.Parse(server.server.URL)
require.NoError(t, err)
proxy := httputil.NewSingleHostReverseProxy(serverURL)
proxy.Transport = server.server.Client().Transport
proxyServer := httptest.NewUnstartedServer(proxy)
proxyServer.EnableHTTP2 = true
proxyServer.StartTLS()
disableCompression(proxyServer)
t.Cleanup(proxyServer.Close)
servers = append(servers, testServer{name: server.name + "/proxy", server: proxyServer})
}

type testOpt struct {
name string
Expand Down
16 changes: 15 additions & 1 deletion vanguard_rpcxrpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"fmt"
"net/http"
"net/http/httptest"
"net/http/httputil"
"net/url"
"strings"
"testing"

Expand Down Expand Up @@ -89,14 +91,26 @@ func TestMux_RPCxRPC(t *testing.T) {
t.Cleanup(server.Close)
return testServer{name: name, server: server}
}
var servers []testServer
servers := []testServer{}
for _, protocol := range protocols {
for _, codec := range codecs {
for _, compression := range compressions {
servers = append(servers, makeServer(protocol, codec, compression))
}
}
}
for _, server := range servers {
serverURL, err := url.Parse(server.server.URL)
require.NoError(t, err)
proxy := httputil.NewSingleHostReverseProxy(serverURL)
proxy.Transport = server.server.Client().Transport
proxyServer := httptest.NewUnstartedServer(proxy)
proxyServer.EnableHTTP2 = true
proxyServer.StartTLS()
disableCompression(proxyServer)
t.Cleanup(proxyServer.Close)
servers = append(servers, testServer{name: server.name + "/proxy", server: proxyServer})
}

type testOpt struct {
name string
Expand Down
3 changes: 2 additions & 1 deletion vanguard_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,7 +419,8 @@ func (i *testInterceptor) WrapUnary(next connect.UnaryFunc) connect.UnaryFunc {
}
diff := cmp.Diff(msg, inn.msg, protocmp.Transform())
if diff != "" {
return nil, fmt.Errorf("message didn't match: %s", diff)
stream.Error(diff)
return nil, errors.New("unexpected message diff")
}

if out.err != nil {
Expand Down

0 comments on commit d8f80b8

Please sign in to comment.