Skip to content

Commit

Permalink
martian: rework flushing logic, support non-chunked SSE
Browse files Browse the repository at this point in the history
This patch applies patternFlushWriter to support
- non-chunked SSE i.e. flush after '\n\n'
- chunked-encoding i.e. flush after '\r\n'
  • Loading branch information
Choraden authored and mmatczuk committed May 28, 2024
1 parent b895187 commit fe4243e
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 30 deletions.
38 changes: 21 additions & 17 deletions internal/martian/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,40 @@
package martian

import (
"bufio"
"bytes"
"io"
"mime"
"net/http"
)

func shouldFlush(res *http.Response) bool {
var (
sseFlushPattern = [2]byte{'\n', '\n'}
chunkFlushPattern = [2]byte{'\r', '\n'}
)

func shouldChunk(res *http.Response) bool {
if res.ProtoMajor != 1 || res.ProtoMinor != 1 {
return false
}
if res.ContentLength != -1 {
return false
}

// Please read 3.3.2 and 3.3.3 of RFC 7230 for more details https://datatracker.ietf.org/doc/html/rfc7230#section-3.3.2.
if res.Request.Method == http.MethodHead {
return false
}
// The 204/304 response MUST NOT contain a
// message-body, and thus is always terminated by the first empty line
// after the header fields.
if res.StatusCode == http.StatusNoContent || res.StatusCode == http.StatusNotModified {
return false
}
if res.StatusCode < 200 {
return false
}

return isTextEventStream(res) || res.ContentLength == -1
return true
}

func isTextEventStream(res *http.Response) bool {
Expand All @@ -42,20 +60,6 @@ func isTextEventStream(res *http.Response) bool {
return baseCT == "text/event-stream"
}

// flushAfterChunkWriter works with net/http/internal.chunkedWriter and forces a flush after each chunk is written.
// There is also net/http/internal.FlushAfterChunkWriter that does the same thing nicer, but it is not available.
type flushAfterChunkWriter struct {
*bufio.Writer
}

func (w flushAfterChunkWriter) WriteString(s string) (n int, err error) {
n, err = w.Writer.WriteString(s)
if s == "\r\n" && err == nil {
err = w.Flush()
}
return
}

type flusher interface {
Flush() error
}
Expand Down
11 changes: 8 additions & 3 deletions internal/martian/proxy_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,9 +441,14 @@ func (p *proxyConn) writeResponse(res *http.Response) error {
// Add support for Server Sent Events - relay HTTP chunks and flush after each chunk.
// This is safe for events that are smaller than the buffer io.Copy uses (32KB).
// If the event is larger than the buffer, the event will be split into multiple chunks.
if shouldFlush(res) {
err = res.Write(flushAfterChunkWriter{p.brw.Writer})
} else {
switch {
case isTextEventStream(res):
w := newPatternFlushWriter(p.brw.Writer, p.brw.Writer, sseFlushPattern)
err = res.Write(w)
case shouldChunk(res):
w := newPatternFlushWriter(p.brw.Writer, p.brw.Writer, chunkFlushPattern)
err = res.Write(w)
default:
err = res.Write(p.brw)
}
}
Expand Down
18 changes: 8 additions & 10 deletions internal/martian/proxy_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,13 +332,6 @@ func (p proxyHandler) handleRequest(rw http.ResponseWriter, req *http.Request) {
}
}

func newWriteFlusher(rw http.ResponseWriter) writeFlusher {
return writeFlusher{
rw: rw,
rc: http.NewResponseController(rw),
}
}

type writeFlusher struct {
rw io.Writer
rc *http.ResponseController
Expand Down Expand Up @@ -385,9 +378,14 @@ func (p proxyHandler) writeResponse(rw http.ResponseWriter, res *http.Response)
}

var err error
if shouldFlush(res) {
err = copyBody(newWriteFlusher(rw), res.Body)
} else {
switch {
case isTextEventStream(res):
w := newPatternFlushWriter(rw, http.NewResponseController(rw), sseFlushPattern)
err = copyBody(w, res.Body)
case shouldChunk(res):
w := newPatternFlushWriter(rw, http.NewResponseController(rw), chunkFlushPattern)
err = copyBody(w, res.Body)
default:
err = copyBody(rw, res.Body)
}

Expand Down

0 comments on commit fe4243e

Please sign in to comment.