diff --git a/internal/martian/flush.go b/internal/martian/flush.go index 35c2ff82..c4c9429a 100644 --- a/internal/martian/flush.go +++ b/internal/martian/flush.go @@ -17,22 +17,40 @@ package martian import ( - "bufio" "bytes" "io" "mime" "net/http" ) -func shouldFlush(res *http.Response) bool { +var ( + sseFlushPattern = [2]byte{'\n', '\n'} + chunkFlushPattern = [2]byte{'\r', '\n'} +) + +func shouldChunk(res *http.Response) bool { + if res.ProtoMajor != 1 || res.ProtoMinor != 1 { + return false + } + if res.ContentLength != -1 { + return false + } + + // Please read 3.3.2 and 3.3.3 of RFC 7230 for more details https://datatracker.ietf.org/doc/html/rfc7230#section-3.3.2. if res.Request.Method == http.MethodHead { return false } + // The 204/304 response MUST NOT contain a + // message-body, and thus is always terminated by the first empty line + // after the header fields. if res.StatusCode == http.StatusNoContent || res.StatusCode == http.StatusNotModified { return false } + if res.StatusCode < 200 { + return false + } - return isTextEventStream(res) || res.ContentLength == -1 + return true } func isTextEventStream(res *http.Response) bool { @@ -42,20 +60,6 @@ func isTextEventStream(res *http.Response) bool { return baseCT == "text/event-stream" } -// flushAfterChunkWriter works with net/http/internal.chunkedWriter and forces a flush after each chunk is written. -// There is also net/http/internal.FlushAfterChunkWriter that does the same thing nicer, but it is not available. -type flushAfterChunkWriter struct { - *bufio.Writer -} - -func (w flushAfterChunkWriter) WriteString(s string) (n int, err error) { - n, err = w.Writer.WriteString(s) - if s == "\r\n" && err == nil { - err = w.Flush() - } - return -} - type flusher interface { Flush() error } diff --git a/internal/martian/proxy_conn.go b/internal/martian/proxy_conn.go index ffc15778..c246d9df 100644 --- a/internal/martian/proxy_conn.go +++ b/internal/martian/proxy_conn.go @@ -441,9 +441,14 @@ func (p *proxyConn) writeResponse(res *http.Response) error { // Add support for Server Sent Events - relay HTTP chunks and flush after each chunk. // This is safe for events that are smaller than the buffer io.Copy uses (32KB). // If the event is larger than the buffer, the event will be split into multiple chunks. - if shouldFlush(res) { - err = res.Write(flushAfterChunkWriter{p.brw.Writer}) - } else { + switch { + case isTextEventStream(res): + w := newPatternFlushWriter(p.brw.Writer, p.brw.Writer, sseFlushPattern) + err = res.Write(w) + case shouldChunk(res): + w := newPatternFlushWriter(p.brw.Writer, p.brw.Writer, chunkFlushPattern) + err = res.Write(w) + default: err = res.Write(p.brw) } } diff --git a/internal/martian/proxy_handler.go b/internal/martian/proxy_handler.go index 39e30b4b..2643acad 100644 --- a/internal/martian/proxy_handler.go +++ b/internal/martian/proxy_handler.go @@ -332,13 +332,6 @@ func (p proxyHandler) handleRequest(rw http.ResponseWriter, req *http.Request) { } } -func newWriteFlusher(rw http.ResponseWriter) writeFlusher { - return writeFlusher{ - rw: rw, - rc: http.NewResponseController(rw), - } -} - type writeFlusher struct { rw io.Writer rc *http.ResponseController @@ -385,9 +378,14 @@ func (p proxyHandler) writeResponse(rw http.ResponseWriter, res *http.Response) } var err error - if shouldFlush(res) { - err = copyBody(newWriteFlusher(rw), res.Body) - } else { + switch { + case isTextEventStream(res): + w := newPatternFlushWriter(rw, http.NewResponseController(rw), sseFlushPattern) + err = copyBody(w, res.Body) + case shouldChunk(res): + w := newPatternFlushWriter(rw, http.NewResponseController(rw), chunkFlushPattern) + err = copyBody(w, res.Body) + default: err = copyBody(rw, res.Body) }