From 492efea3ed087c57cc5a0b58bab3207e2c685238 Mon Sep 17 00:00:00 2001 From: Ian Bishop <1296987+porjo@users.noreply.github.com> Date: Fri, 27 Dec 2024 14:10:21 +1000 Subject: [PATCH] shutdown fixes --- http.go | 40 ++++--- internal/command/command.go | 45 +++++--- internal/ytworker/download.go | 195 ++++++++++++++++++---------------- main.go | 37 ++++--- 4 files changed, 173 insertions(+), 144 deletions(-) diff --git a/http.go b/http.go index 2a52b9c..f5442bd 100644 --- a/http.go +++ b/http.go @@ -167,22 +167,18 @@ func (ws *wsHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { ws.logger.Debug("read message", "msg", string(raw)) if msgType == websocket.TextMessage { - wg.Add(1) - go func() { - defer wg.Done() - var req Request - err = json.Unmarshal(raw, &req) - if err != nil { - ws.logger.Error("json unmarshal error", "error", err) - return - } + var req Request + err = json.Unmarshal(raw, &req) + if err != nil { + ws.logger.Error("json unmarshal error", "error", err) + return + } - err := ws.msgHandler(req) - if err != nil { - ws.logger.Error("error", "error", err) - errCh <- err - } - }() + err := ws.msgHandler(req) + if err != nil { + ws.logger.Error("error", "error", err) + errCh <- err + } } else { ws.logger.Info("unknown message type - close websocket\n") conn.Close() @@ -199,7 +195,7 @@ func (c *Conn) writeMsg(val interface{}) error { if err != nil { return err } - slog.Info("write message", "ws", c.RemoteAddr(), "msg", string(j)) + slog.Debug("write message", "ws", c.RemoteAddr(), "msg", string(j)) if err = c.WriteMessage(websocket.TextMessage, j); err != nil { return err } @@ -252,11 +248,11 @@ func (ws *wsHandler) msgHandler(req Request) error { // // There are a couple of challenges to overcome: // - how to know when the encoding has finished? The current solution is to wait StreamSourceTimeoutSec and +// end the handler if no data is copied in that time. Is that the best approach? +// - how to handle clients that delay requesting more data? In this case ResponseWriter blocks the +// Copy operation. // -// end the handler if no data is copied in that time. Is that the best approach? -// - how to handle clients that delay requesting more data? In this case ResponseWriter blocks the Copy operation. -// -// I think the only solution is to set WriteTimeout on http.Sever +// I think the only solution is to set WriteTimeout on http.Server func ServeStream(webRoot string) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { dir := http.Dir(webRoot) @@ -280,11 +276,11 @@ func ServeStream(webRoot string) http.HandlerFunc { return } if i == 0 { - if time.Since(lastData) > time.Duration(StreamSourceTimeout) { + if time.Since(lastData) > StreamSourceTimeout { slog.Info("servestream timeout", "timeout", StreamSourceTimeout) return } - time.Sleep(time.Duration(1 * time.Second)) + time.Sleep(time.Second) } else { lastData = time.Now() } diff --git a/internal/command/command.go b/internal/command/command.go index 744644b..448982c 100644 --- a/internal/command/command.go +++ b/internal/command/command.go @@ -4,8 +4,8 @@ import ( "bufio" "context" "errors" - "fmt" "io" + "log/slog" "os/exec" "sync" "syscall" @@ -21,7 +21,7 @@ func RunCommand(ctx context.Context, command string, flags ...string) ([]byte, e out, err := cmd.Output() if err != nil { if ee, ok := err.(*exec.ExitError); ok { - fmt.Printf("runcommand exit error, stderr '%v'\n", string(ee.Stderr)) + slog.Debug("runcommand exit error", "stderr", string(ee.Stderr)) } } return out, err @@ -47,10 +47,10 @@ func RunCommandCh(ctx context.Context, command string, flags ...string) (chan st go func() { defer close(outCh) defer close(errCh) + slog.Debug("command start", "command", command) err := cmd.Start() if err != nil { - errCh <- err - + nonblockingChSend(errCh, err) } wg := sync.WaitGroup{} @@ -59,36 +59,51 @@ func RunCommandCh(ctx context.Context, command string, flags ...string) (chan st defer wg.Done() for { line, err := stdoutBuf.ReadString('\n') - outCh <- line if err != nil { if !errors.Is(err, io.EOF) { - errCh <- err + nonblockingChSend(errCh, err) } return } + nonblockingChSend(outCh, line) } }() - for { - line, err := stderrBuf.ReadString('\n') - outCh <- line - if err != nil { - if !errors.Is(err, io.EOF) { - errCh <- err + wg.Add(1) + go func() { + defer wg.Done() + for { + line, err := stderrBuf.ReadString('\n') + if err != nil { + if !errors.Is(err, io.EOF) { + nonblockingChSend(errCh, err) + } + break } - break + nonblockingChSend(outCh, line) } - } + }() + slog.Debug("wait for stdout/stderr readers to end", "command", command) wg.Wait() + slog.Debug("command wait", "command", command) err = cmd.Wait() if err != nil { - errCh <- err + nonblockingChSend(errCh, err) } + slog.Debug("command wait, done", "command", command) // kill any orphaned children upon completion, ignore kill error syscall.Kill(-cmd.Process.Pid, syscall.SIGTERM) }() return outCh, errCh, nil } + +func nonblockingChSend[T any](ch chan T, msg T) { + select { + case ch <- msg: + default: + slog.Warn("channel was blocked, message discarded", "msg", msg) + } +} diff --git a/internal/ytworker/download.go b/internal/ytworker/download.go index 383abce..3457da1 100644 --- a/internal/ytworker/download.go +++ b/internal/ytworker/download.go @@ -101,6 +101,8 @@ type Download struct { sponsorBlock bool sponsorBlockCats string ytCmd string + + ctx context.Context } func NewDownload(ctx context.Context, webroot, outPath string, sponsorBlock bool, sponsorBlockCats string, ytCmd string, maxProcessTime time.Duration) *Download { @@ -116,6 +118,7 @@ func NewDownload(ctx context.Context, webroot, outPath string, sponsorBlock bool sponsorBlock: sponsorBlock, sponsorBlockCats: sponsorBlockCats, ytCmd: ytCmd, + ctx: ctx, } dl.OutChan = make(chan websocket.Msg) @@ -133,7 +136,7 @@ func (yt *Download) Work(j *jobs.Job) { id := time.Now().UnixMicro() - ctx, cancel := context.WithTimeout(context.Background(), yt.maxProcessTime) + ctx, cancel := context.WithTimeout(yt.ctx, yt.maxProcessTime) defer cancel() url, err := url.Parse(j.Payload) @@ -210,55 +213,63 @@ func (yt *Download) download(ctx context.Context, id int64, outCh chan<- websock slog.Info("Running command", "command", append([]string{yt.ytCmd}, args...)) cmdOutCh, cmdErrCh, err := command.RunCommandCh(ctx, yt.ytCmd, args...) if err != nil { + if errors.Is(err, context.DeadlineExceeded) { + slog.Warn("command terminated due to cancelled context") + } return err } - var info Info + infoFileName := "" count := 0 for { - count++ - if count > 20 { - return fmt.Errorf("waited too long for info file") - } - infoFileName := diskFileNameTmp + ".info.json" + infoFileName = diskFileNameTmp + ".info.json" time.Sleep(500 * time.Millisecond) - if _, err := os.Stat(infoFileName); os.IsNotExist(err) { - continue - } - raw, err := os.ReadFile(infoFileName) - if err != nil { - return fmt.Errorf("info file read error: %s", err) + _, err := os.Stat(infoFileName) + if err == nil { + break + } else if !os.IsNotExist(err) { + return err } - var ytInfo YTInfo - err = json.Unmarshal(raw, &ytInfo) - if err != nil { - return fmt.Errorf("info file json unmarshal error: %s", err) + if count > 20 { + return fmt.Errorf("waited too long for info file") } + count++ + } - info.Id = id - info.Title = ytInfo.Title - info.Artist = ytInfo.Channel - if info.Artist == "" { - info.Artist = ytInfo.Series - } - // description is mostly ads! - //info.Description = ytInfo.Description - info.FileSize = ytInfo.FileSize - info.Extension = ytInfo.Extension - info.SponsorBlock = len(ytInfo.SponsorBlockChapters) > 0 - - if info.FileSize > MaxFileSize { - return fmt.Errorf("filesize %d too large", info.FileSize) - } + raw, err := os.ReadFile(infoFileName) + if err != nil { + return fmt.Errorf("info file read error: %s", err) + } - m := websocket.Msg{Key: KeyInfo, Value: info} - outCh <- m - break + var ytInfo YTInfo + err = json.Unmarshal(raw, &ytInfo) + if err != nil { + return fmt.Errorf("info file json unmarshal error: %s", err) } + var info Info + info.Id = id + info.Title = ytInfo.Title + info.Artist = ytInfo.Channel + if info.Artist == "" { + info.Artist = ytInfo.Series + } + // description is mostly ads! + //info.Description = ytInfo.Description + info.FileSize = ytInfo.FileSize + info.Extension = ytInfo.Extension + info.SponsorBlock = len(ytInfo.SponsorBlockChapters) > 0 + + if info.FileSize > MaxFileSize { + return fmt.Errorf("filesize %d too large", info.FileSize) + } + + m := websocket.Msg{Key: KeyInfo, Value: info} + outCh <- m + // output size of opus file as it gets written if forceOpus { go getOpusFileSize(ctx, id, info, outCh, errCh, diskFileNameTmp+".opus", yt.outPath) @@ -401,7 +412,7 @@ loop: outCh <- m } - m := websocket.Msg{ + m = websocket.Msg{ Key: KeyCompleted, Value: Misc{ Id: id, @@ -416,7 +427,7 @@ loop: func getYTProgress(v string) *Progress { matches := ytProgressRe.FindStringSubmatch(v) - slog.Debug("yt progress matches", "matches", matches) + //slog.Debug("yt progress matches", "matches", matches) var p *Progress if len(matches) == 5 { @@ -443,71 +454,73 @@ func getYTProgress(v string) *Progress { func getOpusFileSize(ctx context.Context, id int64, info Info, outCh chan<- websocket.Msg, errCh chan error, filename, webPath string) { var startTime time.Time streamURLSent := false + + ticker := time.NewTicker(time.Second) + defer ticker.Stop() + for { select { case <-ctx.Done(): return - default: - } - opusFI, err := os.Stat(filename) - // abort on errors except for ErrNotExist - if err != nil { - if !errors.Is(err, fs.ErrNotExist) { - errCh <- fmt.Errorf("error getting stat on opus file '%s': %w", filename, err) - return + case <-ticker.C: + opusFI, err := os.Stat(filename) + // abort on errors except for ErrNotExist + if err != nil { + if !errors.Is(err, fs.ErrNotExist) { + errCh <- fmt.Errorf("error getting stat on opus file '%s': %w", filename, err) + return + } + continue } - time.Sleep(time.Second) - continue - } - // wait until we have some data before sending stream URL - if !streamURLSent && opusFI.Size() > 10000 { - info.DownloadURL = filepath.Join(webPath, "stream", "t", filepath.Base(filename)) - m := websocket.Msg{Key: KeyLinkStream, Value: info} - outCh <- m - streamURLSent = true - } + // wait until we have some data before sending stream URL + if !streamURLSent && opusFI.Size() > 10000 { + info.DownloadURL = filepath.Join(webPath, "stream", "t", filepath.Base(filename)) + m := websocket.Msg{Key: KeyLinkStream, Value: info} + outCh <- m + streamURLSent = true + } - if startTime.IsZero() { - startTime = time.Now() - } - if info.Extension == "mp3" { - mp3FI, err := os.Stat(strings.TrimSuffix(filename, filepath.Ext(filename)) + ".mp3") - if err == nil { - // Opus compression ratio from MP3 approximately 1:4 - estTotal := mp3FI.Size() / 4 - pct := (float32(opusFI.Size()) / float32(estTotal)) * 100 - diff := time.Since(startTime) - etaStr := "" - if pct > 0 { - eta := time.Duration((float32(diff) / pct) * (100 - pct)).Round(time.Second) - etaStr = eta.String() - } - m := websocket.Msg{ - Key: KeyInfo, - Value: Info{ - Id: id, - Artist: info.Artist, - Title: info.Title, - FileSize: estTotal, - Progress: Progress{ - Pct: pct, + if startTime.IsZero() { + startTime = time.Now() + } + if info.Extension == "mp3" { + mp3FI, err := os.Stat(strings.TrimSuffix(filename, filepath.Ext(filename)) + ".mp3") + if err == nil { + // Opus compression ratio from MP3 approximately 1:4 + estTotal := mp3FI.Size() / 4 + pct := (float32(opusFI.Size()) / float32(estTotal)) * 100 + diff := time.Since(startTime) + etaStr := "" + if pct > 0 { + eta := time.Duration((float32(diff) / pct) * (100 - pct)).Round(time.Second) + etaStr = eta.String() + } + m := websocket.Msg{ + Key: KeyInfo, + Value: Info{ + Id: id, + Artist: info.Artist, + Title: info.Title, FileSize: estTotal, - ETA: etaStr, + Progress: Progress{ + Pct: pct, + FileSize: estTotal, + ETA: etaStr, + }, }, - }, + } + outCh <- m } - outCh <- m } + m := websocket.Msg{ + Key: KeyUnknown, + Value: Misc{ + Id: id, + Msg: fmt.Sprintf("opus file size %.2f MB\n", float32(opusFI.Size())*1e-6), + }, + } + outCh <- m } - m := websocket.Msg{ - Key: KeyUnknown, - Value: Misc{ - Id: id, - Msg: fmt.Sprintf("opus file size %.2f MB\n", float32(opusFI.Size())*1e-6), - }, - } - outCh <- m - time.Sleep(3 * time.Second) } } diff --git a/main.go b/main.go index 2cdec28..d1989cb 100644 --- a/main.go +++ b/main.go @@ -10,7 +10,6 @@ import ( "os" "os/signal" "path/filepath" - "syscall" "time" "github.com/porjo/ytdl-web/internal/jobs" @@ -63,17 +62,6 @@ func main() { ctx, cancel := context.WithCancel(context.Background()) - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, syscall.SIGTERM) - go func() { - <-c - slog.Info("shutting down...") - cancel() - // wait a bit for things to shut down - time.Sleep(time.Second) - os.Exit(1) - }() - dl := ytworker.NewDownload(ctx, *webRoot, *outPath, *sponsorBlock, *sponsorBlockCats, *ytCmd, *maxProcessTime) dispatcher := jobs.NewDispatcher(dl, 10) go func() { @@ -103,9 +91,26 @@ func main() { WriteTimeout: HTTPWriteTimeout, } - err = srv.ListenAndServe() - if err != nil { - slog.Error(err.Error()) - os.Exit(1) + go func() { + if err := srv.ListenAndServe(); err != nil { + slog.Error("http server listen", "error", err) + } + }() + + // Setting up signal capturing + stop := make(chan os.Signal, 1) + signal.Notify(stop, os.Interrupt) + + // Waiting for SIGINT (kill -2) + <-stop + slog.Info("shutting down") + cancel() + + if err := srv.Shutdown(ctx); err != nil { + slog.Error("http server shutdown", "error", err) } + + // wait a bit for things to shut down + time.Sleep(time.Second) + }