Skip to content

Commit

Permalink
Support streaming responses from functions in HTTP mode
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Ellis (OpenFaaS Ltd) <[email protected]>
  • Loading branch information
alexellis committed Jan 11, 2024
1 parent 485e360 commit f0eb480
Show file tree
Hide file tree
Showing 48 changed files with 1,140 additions and 381 deletions.
90 changes: 52 additions & 38 deletions executor/http_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,20 @@ import (
"fmt"
"io"

units "github.com/docker/go-units"

"log"
"net"
"net/http"
"net/http/httputil"
"net/url"
"os"
"os/exec"
"os/signal"
"strings"
"syscall"
"time"

units "github.com/docker/go-units"
fhttputil "github.com/openfaas/faas-provider/httputil"
)

// HTTPFunctionRunner creates and maintains one process responsible for handling all calls
Expand All @@ -38,6 +40,7 @@ type HTTPFunctionRunner struct {
BufferHTTPBody bool
LogPrefix bool
LogBufferSize int
ReverseProxy *httputil.ReverseProxy
}

// Start forks the process used for processing incoming requests
Expand Down Expand Up @@ -127,55 +130,66 @@ func (f *HTTPFunctionRunner) Run(req FunctionRequest, contentLength int64, r *ht
}
defer cancel()

res, err := f.Client.Do(request.WithContext(reqCtx))
if err != nil {
log.Printf("Upstream HTTP request error: %s\n", err.Error())
if r.Header.Get("Accept") == "text/event-stream" {

// Error unrelated to context / deadline
if reqCtx.Err() == nil {
w.Header().Set("X-Duration-Seconds", fmt.Sprintf("%f", time.Since(startedTime).Seconds()))
ww := fhttputil.NewHttpWriteInterceptor(w)

w.WriteHeader(http.StatusInternalServerError)
f.ReverseProxy.ServeHTTP(ww, request)
done := time.Since(startedTime)

return nil
}
log.Printf("%s %s - %d - Bytes: %s (%.4fs)", r.Method, r.RequestURI, ww.Status(), units.HumanSize(float64(ww.BytesWritten())), done.Seconds())
} else {

<-reqCtx.Done()
res, err := f.Client.Do(request.WithContext(reqCtx))
if err != nil {
log.Printf("Upstream HTTP request error: %s\n", err.Error())

if reqCtx.Err() != nil {
// Error due to timeout / deadline
log.Printf("Upstream HTTP killed due to exec_timeout: %s\n", f.ExecTimeout)
w.Header().Set("X-Duration-Seconds", fmt.Sprintf("%f", time.Since(startedTime).Seconds()))
// Error unrelated to context / deadline
if reqCtx.Err() == nil {
w.Header().Set("X-Duration-Seconds", fmt.Sprintf("%f", time.Since(startedTime).Seconds()))

w.WriteHeader(http.StatusGatewayTimeout)
return nil
}
w.WriteHeader(http.StatusInternalServerError)

w.Header().Set("X-Duration-Seconds", fmt.Sprintf("%f", time.Since(startedTime).Seconds()))
w.WriteHeader(http.StatusInternalServerError)
return err
}
return nil
}

copyHeaders(w.Header(), &res.Header)
done := time.Since(startedTime)
<-reqCtx.Done()

w.Header().Set("X-Duration-Seconds", fmt.Sprintf("%f", done.Seconds()))
if reqCtx.Err() != nil {
// Error due to timeout / deadline
log.Printf("Upstream HTTP killed due to exec_timeout: %s\n", f.ExecTimeout)
w.Header().Set("X-Duration-Seconds", fmt.Sprintf("%f", time.Since(startedTime).Seconds()))

w.WriteHeader(res.StatusCode)
if res.Body != nil {
defer res.Body.Close()
w.WriteHeader(http.StatusGatewayTimeout)
return nil
}

bodyBytes, bodyErr := io.ReadAll(res.Body)
if bodyErr != nil {
log.Println("read body err", bodyErr)
w.Header().Set("X-Duration-Seconds", fmt.Sprintf("%f", time.Since(startedTime).Seconds()))
w.WriteHeader(http.StatusInternalServerError)
return err
}
w.Write(bodyBytes)
}

// Exclude logging for health check probes from the kubelet which can spam
// log collection systems.
if !strings.HasPrefix(r.UserAgent(), "kube-probe") {
log.Printf("%s %s - %s - ContentLength: %s (%.4fs)", r.Method, r.RequestURI, res.Status, units.HumanSize(float64(res.ContentLength)), done.Seconds())
copyHeaders(w.Header(), &res.Header)
done := time.Since(startedTime)

w.Header().Set("X-Duration-Seconds", fmt.Sprintf("%f", done.Seconds()))

w.WriteHeader(res.StatusCode)
if res.Body != nil {
defer res.Body.Close()

bodyBytes, bodyErr := io.ReadAll(res.Body)
if bodyErr != nil {
log.Println("read body err", bodyErr)
}
w.Write(bodyBytes)
}

// Exclude logging for health check probes from the kubelet which can spam
// log collection systems.
if !strings.HasPrefix(r.UserAgent(), "kube-probe") {
log.Printf("%s %s - %s - ContentLength: %s (%.4fs)", r.Method, r.RequestURI, res.Status, units.HumanSize(float64(res.ContentLength)), done.Seconds())
}
}

return nil
Expand Down
8 changes: 5 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,25 +1,27 @@
module github.com/openfaas/of-watchdog

go 1.20
go 1.21

require (
github.com/docker/go-units v0.5.0
github.com/openfaas/faas-middleware v1.2.3
github.com/prometheus/client_golang v1.17.0
github.com/openfaas/faas-provider v0.25.2
github.com/prometheus/client_golang v1.18.0
)

require (
github.com/prometheus/client_model v0.5.0 // indirect
github.com/prometheus/common v0.45.0 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
golang.org/x/sys v0.14.0 // indirect
golang.org/x/sys v0.16.0 // indirect
)

require (
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 // indirect
google.golang.org/protobuf v1.31.0 // indirect
Expand Down
11 changes: 11 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,22 @@ github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo=
github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0 h1:jWpvCLoY8Z/e3VKvlsiIGKtc+UG6U5vzxaoagmhXfyg=
github.com/matttproud/golang_protobuf_extensions/v2 v2.0.0/go.mod h1:QUyp042oQthUoa9bqDv0ER0wrtXnBruoNd7aNjkbP+k=
github.com/openfaas/faas-middleware v1.2.3 h1:nRib38/i5eNdUTTKA7ILgO/Xns5zVorCO6lIBjr2xA0=
github.com/openfaas/faas-middleware v1.2.3/go.mod h1:pMyWe0SP0zuzIj2on1pmRkZAjGIS+uRk2mp3N6LSlDI=
github.com/openfaas/faas-provider v0.25.1 h1:7Ryxj5Lf7mPBpNPFLO92jqvsrNUIyZBHutoOgp6MH5Q=
github.com/openfaas/faas-provider v0.25.1/go.mod h1:NsETIfEndZn4mn/w/XnBTcDTwKqULCziphLp7KgeRcA=
github.com/openfaas/faas-provider v0.25.2 h1:sAyL96CzAk/YnuXQZiRJcHo7UrcYMaf7RDvKxsQb/2o=
github.com/openfaas/faas-provider v0.25.2/go.mod h1:NsETIfEndZn4mn/w/XnBTcDTwKqULCziphLp7KgeRcA=
github.com/prometheus/client_golang v1.17.0 h1:rl2sfwZMtSthVU752MqfjQozy7blglC+1SOtjMAMh+Q=
github.com/prometheus/client_golang v1.17.0/go.mod h1:VeL+gMmOAxkS2IqfCq0ZmHSL+LjWfWDUmp1mBz9JgUY=
github.com/prometheus/client_golang v1.18.0 h1:HzFfmkOzH5Q8L8G+kSJKUx5dtG87sewO+FoDDqP5Tbk=
github.com/prometheus/client_golang v1.18.0/go.mod h1:T+GXkCk5wSJyOqMIzVgvvjFDlkOQntgjkJWKrN5txjA=
github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16 h1:v7DLqVdK4VrYkVD5diGdl4sxJurKJEMnODWRJlxV9oM=
github.com/prometheus/client_model v0.4.1-0.20230718164431-9a2bf3000d16/go.mod h1:oMQmHW1/JoDwqLtg57MGgP/Fb1CJEYF2imWWhWtMkYU=
github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw=
Expand All @@ -37,6 +45,9 @@ golang.org/x/sys v0.11.0 h1:eG7RXZHdqOJ1i+0lgLgCpSXAp6M3LYlAo6osgSi0xOM=
golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.14.0 h1:Vz7Qs629MkJkGyHxUlRHizWJRG2j8fbQKjELVSNhy7Q=
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.16.0 h1:xWw16ngr6ZMtmxDyKyIgsE93KNKz5HKmMa3b8ALHidU=
golang.org/x/sys v0.16.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
Expand Down
8 changes: 8 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"io"
"log"
"net/http"
"net/http/httputil"
"net/url"
"os"
"os/signal"
Expand Down Expand Up @@ -349,6 +350,13 @@ func makeHTTPRequestHandler(watchdogConfig config.WatchdogConfig, prefixLogs boo
BufferHTTPBody: watchdogConfig.BufferHTTPBody,
LogPrefix: prefixLogs,
LogBufferSize: logBufferSize,
ReverseProxy: &httputil.ReverseProxy{
Director: func(req *http.Request) {
},
ErrorHandler: func(w http.ResponseWriter, r *http.Request, err error) {
},
ErrorLog: log.New(io.Discard, "", 0),
},
}

if len(watchdogConfig.UpstreamURL) == 0 {
Expand Down
21 changes: 21 additions & 0 deletions vendor/github.com/openfaas/faas-provider/LICENSE

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 12 additions & 0 deletions vendor/github.com/openfaas/faas-provider/httputil/writers.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit f0eb480

Please sign in to comment.