Skip to content

Commit

Permalink
Add a basic end to end test for subscribing to stomp message queues a…
Browse files Browse the repository at this point in the history
…nd running commands
  • Loading branch information
joecorall committed May 7, 2024
1 parent 421b231 commit d98acc9
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 4 deletions.
2 changes: 1 addition & 1 deletion internal/config/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func ReadConfig(yp string) (*ServerConfig, error) {
}

func BuildExecCommand(replacements map[string]string, c *ServerConfig) (*exec.Cmd, error) {
if !IsAllowedMimeType(replacements["sourceMimeType"], c.AllowedMimeTypes) {
if replacements["sourceMimeType"] != "" && !IsAllowedMimeType(replacements["sourceMimeType"], c.AllowedMimeTypes) {
return nil, fmt.Errorf("undefined sourceMimeType: %s", replacements["sourceMimeType"])
}

Expand Down
8 changes: 5 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,24 +154,26 @@ func RecvStompMessages(queueName string, subscribed chan bool) {
message, err := api.DecodeEventMessage(msg.Body)
if err != nil {
slog.Error("could not read the event message", "err", err, "msg", string(msg.Body))
continue
}
cmdArgs := map[string]string{
"sourceMimeType": message.Attachment.Content.SourceMimeType,
"destinationMimeType": message.Attachment.Content.DestinationMimeType,
"addtlArgs": message.Attachment.Content.Args,
"target": message.Target,
}

cmd, err := scyllaridae.BuildExecCommand(cmdArgs, config)
if err != nil {
slog.Error("Error building command", "err", err)
return
continue
}

// log stdout for the command as it prints
stdout, err := cmd.StdoutPipe()
if err != nil {
slog.Error("error creating stdout pipe", "err", err)
return
continue
}

// Create a buffer to stream the error output of the command
Expand All @@ -181,7 +183,7 @@ func RecvStompMessages(queueName string, subscribed chan bool) {
slog.Info("Running command", "cmd", cmd.String())
if err := cmd.Start(); err != nil {
slog.Error("Error starting command", "cmd", cmd.String(), "err", stdErr.String())
return
continue
}

go func() {
Expand Down
116 changes: 116 additions & 0 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,131 @@ package main
import (
"fmt"
"io"
"log/slog"
"net"
"net/http"
"net/http/httptest"
"os"
"testing"
"time"

stomp "github.com/go-stomp/stomp/v3"
ss "github.com/go-stomp/stomp/v3/server"
scyllaridae "github.com/lehigh-university-libraries/scyllaridae/internal/config"
"github.com/stretchr/testify/assert"
)

func TestRecvStompMessages(t *testing.T) {
addr := "127.0.0.1:61613"
os.Setenv("STOMP_SERVER_ADDR", addr)

l, err := net.Listen("tcp", addr)
if err != nil {
t.Fatalf("Could not start stomp mock server: %v", err)
}
defer func() { l.Close() }()
go func() {
err := ss.Serve(l)
if err != nil {
slog.Error("Error starting mock stomp server", "err", err)
os.Exit(1)
}
}()

os.Setenv("SCYLLARIDAE_YML", `
queueName: "test-queue"
allowedMimeTypes:
- "*"
cmdByMimeType:
default:
cmd: touch
args:
- "%target"
`)
config, err = scyllaridae.ReadConfig("")
if err != nil {
t.Fatalf("Could not read YML: %v", err)
os.Exit(1)
}

subscribed := make(chan bool, 1)
go RecvStompMessages("test-queue", subscribed)
<-subscribed

conn, err := stomp.Dial("tcp", addr,
stomp.ConnOpt.AcceptVersion(stomp.V11),
stomp.ConnOpt.AcceptVersion(stomp.V12),
stomp.ConnOpt.Host("dragon"),
stomp.ConnOpt.Header("nonce", "B256B26D320A"))

if err != nil {
t.Fatalf("Could not connection to stomp mock server: %v", err)
}

err = conn.Send(
"test-queue",
"text/plain",
[]byte(`{
"@context": "https://www.w3.org/ns/activitystreams",
"actor": {
"type": "Person",
"id": "urn:uuid:b3f0a1ba-fd0c-4977-a123-3faf470374f2",
"url": [
{
"name": "Canonical",
"type": "Link",
"href": "https://islandora.dev/user/1",
"mediaType": "text/html",
"rel": "canonical"
}
]
},
"object": {
"id": "urn:uuid:abcdef01-2345-6789-abcd-ef0123456789",
"url": [
{
"name": "Canonical",
"type": "Link",
"href": "https://islandora.dev/node/1",
"mediaType": "text/html",
"rel": "canonical"
},
{
"name": "JSON",
"type": "Link",
"href": "https://islandora.dev/node/1?_format=json",
"mediaType": "application/json",
"rel": "alternate"
},
{
"name": "JSONLD",
"type": "Link",
"href": "https://islandora.dev/node/1?_format=jsonld",
"mediaType": "application/ld+json",
"rel": "alternate"
}
],
"isNewVersion": true
},
"target": "/tmp/stomp.success",
"type": "Update",
"summary": "Update a Node"
}`))
if err != nil {
t.Fatalf("Could not send test stomp message: %v", err)
}

// give the command some time to finish
time.Sleep(time.Second * 5)

// make sure the command ran
f := "/tmp/stomp.success"
_, err = os.Stat(f)
if err != nil && os.IsNotExist(err) {
t.Errorf("The stomp subscriber not create the expected file %s", f)
}
}

type Test struct {
name string
authHeader string
Expand Down

0 comments on commit d98acc9

Please sign in to comment.