Skip to content

Commit

Permalink
Try recovering from timeouts better
Browse files Browse the repository at this point in the history
  • Loading branch information
joecorall committed May 7, 2024
1 parent 81a1bd1 commit 7e26188
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 15 deletions.
9 changes: 5 additions & 4 deletions examples/cache-warmer/cmd.sh
Original file line number Diff line number Diff line change
Expand Up @@ -91,17 +91,18 @@ done
rm -f links.xml

# now that the sitemap is warm, get all the IIIF paged content manifests warm
curl -v "$DRUPAL_URL/api/v1/paged-content" > pc.json
curl -s "$DRUPAL_URL/api/v1/paged-content" > pc.json
mapfile -t NIDS < <(jq -r '.[]' pc.json)
for NID in "${NIDS[@]}"; do
for ((i = 0; i < PARALLEL_EXECUTIONS; i++)); do
array_length=${#URLS[@]}
array_length=${#NIDS[@]}
if [ "$array_length" -gt 0 ]; then
URL="${URLS[$((array_length-1))]}"
unset "URLS[$((array_length-1))]"
NID="${NIDS[$((array_length-1))]}"
unset "NIDS[$((array_length-1))]"
else
break
fi
echo "Crawling: $DRUPAL_URL/node/$NID/book-manifest"
curl -s -o /dev/null "$DRUPAL_URL/node/$NID/book-manifest?cache-warmer=1" &
job_ids+=($!)
done
Expand Down
42 changes: 35 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package main
import (
"bufio"
"bytes"
"fmt"
"log/slog"
"math/rand"
"net/http"
"os"
"os/exec"
Expand Down Expand Up @@ -44,7 +46,7 @@ func main() {
slog.Info("Subscription to queue successful")
case <-stopChan:
slog.Info("Received stop signal, exiting")
os.Exit(0)
return
}

<-stopChan
Expand Down Expand Up @@ -138,15 +140,28 @@ func MessageHandler(w http.ResponseWriter, r *http.Request) {

func RecvStompMessages(queueName string, subscribed chan bool) {
defer close(subscribed)
attempt := 0
maxAttempts := 30
for attempt = 0; attempt < maxAttempts; attempt += 1 {
if err := connectAndSubscribe(queueName, subscribed); err != nil {
slog.Error("Failed to connect or subscribe, retrying...", "error", err)
if err := retryWithExponentialBackoff(attempt, maxAttempts); err != nil {
slog.Error("Failed subscribing after too many failed attempts", "attempts", attempt)
return
}
}
}
}

func connectAndSubscribe(queueName string, subscribed chan bool) error {
addr := os.Getenv("STOMP_SERVER_ADDR")
if addr == "" {
addr = "activemq:61613"
}
conn, err := stomp.Dial("tcp", addr, stomp.ConnOpt.Host("/"))
if err != nil {
slog.Error("cannot connect to server", "err", err.Error())
return
return err
}
defer func() {
err := conn.Disconnect()
Expand All @@ -158,7 +173,7 @@ func RecvStompMessages(queueName string, subscribed chan bool) {
sub, err := conn.Subscribe(queueName, stomp.AckAuto)
if err != nil {
slog.Error("cannot subscribe to queue", "queue", queueName, "err", err.Error())
return
return err
}
defer func() {
err := sub.Unsubscribe()
Expand All @@ -167,12 +182,16 @@ func RecvStompMessages(queueName string, subscribed chan bool) {
}
}()
slog.Info("Server subscribed to", "queue", queueName)
// Notify main goroutine that subscription is successful
subscribed <- true

for msg := range sub.C {
if msg == nil || len(msg.Body) == 0 {
time.Sleep(time.Second * 5)
for {
msg, err := sub.Read()
if err != nil || msg == nil || len(msg.Body) == 0 {
// if the subscription isn't active return so we can try reconnecting
if !sub.Active() {
return err
}
// else just try reading again. There's probably just no messages in the queue
continue
}
handleStompMessage(msg)
Expand Down Expand Up @@ -224,3 +243,12 @@ func runCommand(cmd *exec.Cmd) {
slog.Error("command finished with error", "err", stdErr.String())
}
}

func retryWithExponentialBackoff(attempt int, maxAttempts int) error {
if attempt >= maxAttempts {
return fmt.Errorf("maximum retry attempts reached")
}
wait := time.Duration(rand.Intn(1<<attempt)) * time.Second
time.Sleep(wait)
return nil
}
5 changes: 1 addition & 4 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,7 @@ cmdByMimeType:

// make sure the command ran
f := "/tmp/stomp.success"
_, err = os.Stat(f)
if err != nil && os.IsNotExist(err) {
t.Errorf("The stomp subscriber not create the expected file %s", f)
}
assert.FileExists(t, f)
}

type Test struct {
Expand Down

0 comments on commit 7e26188

Please sign in to comment.