Skip to content

Commit

Permalink
shutdown fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
porjo committed Dec 27, 2024
1 parent 7c6cf1f commit 492efea
Show file tree
Hide file tree
Showing 4 changed files with 173 additions and 144 deletions.
40 changes: 18 additions & 22 deletions http.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,22 +167,18 @@ func (ws *wsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
ws.logger.Debug("read message", "msg", string(raw))

if msgType == websocket.TextMessage {
wg.Add(1)
go func() {
defer wg.Done()
var req Request
err = json.Unmarshal(raw, &req)
if err != nil {
ws.logger.Error("json unmarshal error", "error", err)
return
}
var req Request
err = json.Unmarshal(raw, &req)
if err != nil {
ws.logger.Error("json unmarshal error", "error", err)
return
}

err := ws.msgHandler(req)
if err != nil {
ws.logger.Error("error", "error", err)
errCh <- err
}
}()
err := ws.msgHandler(req)
if err != nil {
ws.logger.Error("error", "error", err)
errCh <- err
}
} else {
ws.logger.Info("unknown message type - close websocket\n")
conn.Close()
Expand All @@ -199,7 +195,7 @@ func (c *Conn) writeMsg(val interface{}) error {
if err != nil {
return err
}
slog.Info("write message", "ws", c.RemoteAddr(), "msg", string(j))
slog.Debug("write message", "ws", c.RemoteAddr(), "msg", string(j))
if err = c.WriteMessage(websocket.TextMessage, j); err != nil {
return err
}
Expand Down Expand Up @@ -252,11 +248,11 @@ func (ws *wsHandler) msgHandler(req Request) error {
//
// There are a couple of challenges to overcome:
// - how to know when the encoding has finished? The current solution is to wait StreamSourceTimeoutSec and
// end the handler if no data is copied in that time. Is that the best approach?
// - how to handle clients that delay requesting more data? In this case ResponseWriter blocks the
// Copy operation.
//
// end the handler if no data is copied in that time. Is that the best approach?
// - how to handle clients that delay requesting more data? In this case ResponseWriter blocks the Copy operation.
//
// I think the only solution is to set WriteTimeout on http.Sever
// I think the only solution is to set WriteTimeout on http.Server
func ServeStream(webRoot string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
dir := http.Dir(webRoot)
Expand All @@ -280,11 +276,11 @@ func ServeStream(webRoot string) http.HandlerFunc {
return
}
if i == 0 {
if time.Since(lastData) > time.Duration(StreamSourceTimeout) {
if time.Since(lastData) > StreamSourceTimeout {
slog.Info("servestream timeout", "timeout", StreamSourceTimeout)
return
}
time.Sleep(time.Duration(1 * time.Second))
time.Sleep(time.Second)
} else {
lastData = time.Now()
}
Expand Down
45 changes: 30 additions & 15 deletions internal/command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"bufio"
"context"
"errors"
"fmt"
"io"
"log/slog"
"os/exec"
"sync"
"syscall"
Expand All @@ -21,7 +21,7 @@ func RunCommand(ctx context.Context, command string, flags ...string) ([]byte, e
out, err := cmd.Output()
if err != nil {
if ee, ok := err.(*exec.ExitError); ok {
fmt.Printf("runcommand exit error, stderr '%v'\n", string(ee.Stderr))
slog.Debug("runcommand exit error", "stderr", string(ee.Stderr))
}
}
return out, err
Expand All @@ -47,10 +47,10 @@ func RunCommandCh(ctx context.Context, command string, flags ...string) (chan st
go func() {
defer close(outCh)
defer close(errCh)
slog.Debug("command start", "command", command)
err := cmd.Start()
if err != nil {
errCh <- err

nonblockingChSend(errCh, err)
}

wg := sync.WaitGroup{}
Expand All @@ -59,36 +59,51 @@ func RunCommandCh(ctx context.Context, command string, flags ...string) (chan st
defer wg.Done()
for {
line, err := stdoutBuf.ReadString('\n')
outCh <- line
if err != nil {
if !errors.Is(err, io.EOF) {
errCh <- err
nonblockingChSend(errCh, err)
}
return
}
nonblockingChSend(outCh, line)
}
}()

for {
line, err := stderrBuf.ReadString('\n')
outCh <- line
if err != nil {
if !errors.Is(err, io.EOF) {
errCh <- err
wg.Add(1)
go func() {
defer wg.Done()
for {
line, err := stderrBuf.ReadString('\n')
if err != nil {
if !errors.Is(err, io.EOF) {
nonblockingChSend(errCh, err)
}
break
}
break
nonblockingChSend(outCh, line)
}
}
}()

slog.Debug("wait for stdout/stderr readers to end", "command", command)
wg.Wait()

slog.Debug("command wait", "command", command)
err = cmd.Wait()
if err != nil {
errCh <- err
nonblockingChSend(errCh, err)
}
slog.Debug("command wait, done", "command", command)
// kill any orphaned children upon completion, ignore kill error
syscall.Kill(-cmd.Process.Pid, syscall.SIGTERM)
}()

return outCh, errCh, nil
}

func nonblockingChSend[T any](ch chan T, msg T) {
select {
case ch <- msg:
default:
slog.Warn("channel was blocked, message discarded", "msg", msg)
}
}
Loading

0 comments on commit 492efea

Please sign in to comment.