Skip to content

Commit

Permalink
Better channel management
Browse files Browse the repository at this point in the history
  • Loading branch information
joecorall committed May 7, 2024
1 parent 8a3b4e4 commit d30ed2f
Showing 1 changed file with 77 additions and 57 deletions.
134 changes: 77 additions & 57 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ package main
import (
"bufio"
"bytes"
"io"
"log/slog"
"net/http"
"os"
"os/exec"
"os/signal"
"syscall"

stomp "github.com/go-stomp/stomp/v3"
scyllaridae "github.com/lehigh-university-libraries/scyllaridae/internal/config"
Expand All @@ -32,12 +33,21 @@ func main() {
// either subscribe to activemq directly
if config.QueueName != "" {
subscribed := make(chan bool)
stopChan := make(chan os.Signal, 1)
signal.Notify(stopChan, os.Interrupt, syscall.SIGTERM)

go RecvStompMessages(config.QueueName, subscribed)
<-subscribed

// wait for messages
stop := make(chan os.Signal, 1)
<-stop
select {
case <-subscribed:
slog.Info("Subscription to queue successful")
case <-stopChan:
slog.Info("Received stop signal, exiting")
os.Exit(0)
}

<-stopChan
slog.Info("Shutting down message listener")
} else {
// or make this an available API ala crayfish
http.HandleFunc("/", MessageHandler)
Expand Down Expand Up @@ -126,82 +136,92 @@ func MessageHandler(w http.ResponseWriter, r *http.Request) {
}

func RecvStompMessages(queueName string, subscribed chan bool) {
var options []func(*stomp.Conn) error = []func(*stomp.Conn) error{
// stomp.ConnOpt.Login("guest", "guest"),
stomp.ConnOpt.Host("/"),
}

addr := os.Getenv("STOMP_SERVER_ADDR")
if addr == "" {
addr = "activemq:61613"
}
conn, err := stomp.Dial("tcp", addr, options...)

conn, err := stomp.Dial("tcp", addr, stomp.ConnOpt.Host("/"))
if err != nil {
slog.Error("cannot connect to server", "err", err.Error())
close(subscribed)
return
}
defer func() {
err := conn.Disconnect()
if err != nil {
slog.Error("problem disconnecting from stomp server", "err", err)
}
}()

sub, err := conn.Subscribe(queueName, stomp.AckAuto)
if err != nil {
slog.Error("cannot subscribe to", queueName, err.Error())
slog.Error("cannot subscribe to queue", "queue", queueName, "err", err.Error())
close(subscribed)
return
}
defer func() {
err := sub.Unsubscribe()
if err != nil {
slog.Error("problem disconnecting from stomp server", "err", err)
}
}()
slog.Info("Server subscribed to", "queue", queueName)
// Notify main goroutine that subscription is successful
close(subscribed)

for msg := range sub.C {
if msg == nil {
break
}
handleStompMessage(msg, conn)
}
}

message, err := api.DecodeEventMessage(msg.Body)
if err != nil {
slog.Error("could not read the event message", "err", err, "msg", string(msg.Body))
continue
}
cmdArgs := map[string]string{
"sourceMimeType": message.Attachment.Content.SourceMimeType,
"destinationMimeType": message.Attachment.Content.DestinationMimeType,
"addtlArgs": message.Attachment.Content.Args,
"target": message.Target,
}

cmd, err := scyllaridae.BuildExecCommand(cmdArgs, config)
if err != nil {
slog.Error("Error building command", "err", err)
continue
}
func handleStompMessage(msg *stomp.Message, conn *stomp.Conn) {
message, err := api.DecodeEventMessage(msg.Body)
if err != nil {
slog.Error("could not read the event message", "err", err, "msg", string(msg.Body))
return
}

// log stdout for the command as it prints
stdout, err := cmd.StdoutPipe()
if err != nil {
slog.Error("error creating stdout pipe", "err", err)
continue
}
cmdArgs := map[string]string{
"sourceMimeType": message.Attachment.Content.SourceMimeType,
"destinationMimeType": message.Attachment.Content.DestinationMimeType,
"addtlArgs": message.Attachment.Content.Args,
"target": message.Target,
}
cmd, err := scyllaridae.BuildExecCommand(cmdArgs, config)
if err != nil {
slog.Error("Error building command", "err", err)
return
}

// Create a buffer to stream the error output of the command
var stdErr bytes.Buffer
cmd.Stderr = &stdErr
messageID := msg.Header.Get("message-id")
runCommand(cmd, msg)
if err := msg.Conn.Ack(msg); err != nil {
slog.Error("could not ack message", "err", err)
}
}

slog.Info("Running command", "message-id", messageID, "cmd", cmd.String())
if err := cmd.Start(); err != nil {
slog.Error("Error starting command", "cmd", cmd.String(), "err", stdErr.String())
func runCommand(cmd *exec.Cmd, msg *stomp.Message) {
stdout, err := cmd.StdoutPipe()
if err != nil {
slog.Error("error creating stdout pipe", "err", err)
return
}
scanner := bufio.NewScanner(stdout)
go func() {
for scanner.Scan() {
slog.Info("cmd output", "stdout", scanner.Text())
}
}()

go func(cmd *exec.Cmd, stdout io.ReadCloser, messageID string) {
scanner := bufio.NewScanner(stdout)
for scanner.Scan() {
slog.Info("cmd output", "message-id", messageID, "stdout", scanner.Text())
}

if err := cmd.Wait(); err != nil {
slog.Error("command finished with error", "message-id", messageID, "err", stdErr.String())
}
slog.Info("Great success!")
}(cmd, stdout, messageID)
if err := msg.Conn.Ack(msg); err != nil {
slog.Error("could not ack msg", "message-id", messageID, "err", stdErr.String())
}
var stdErr bytes.Buffer
cmd.Stderr = &stdErr
if err := cmd.Start(); err != nil {
slog.Error("Error starting command", "cmd", cmd.String(), "err", stdErr.String())
return
}
if err := cmd.Wait(); err != nil {
slog.Error("command finished with error", "err", stdErr.String())
}
}

0 comments on commit d30ed2f

Please sign in to comment.