Skip to content

Commit

Permalink
Add some more timeout settings in STOMP connection
Browse files Browse the repository at this point in the history
  • Loading branch information
joecorall committed Dec 16, 2024
1 parent 15ab343 commit 70d82c8
Showing 1 changed file with 49 additions and 44 deletions.
93 changes: 49 additions & 44 deletions stomp.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,58 +77,63 @@ func RecvAndProcessMessage(queueName string, middleware scyllaridae.QueueMiddlew
slog.Error("Cannot set keepalive period", "err", err.Error())
return err
}

conn, err := stomp.Connect(tcpConn, stomp.ConnOpt.HeartBeat(10*time.Second, 10*time.Second))
if err != nil {
slog.Error("Cannot connect to STOMP server", "err", err.Error())
return err
}
defer func() {
err := conn.Disconnect()
for {
conn, err := stomp.Connect(tcpConn,
stomp.ConnOpt.HeartBeat(10*time.Second, 10*time.Second),
stomp.ConnOpt.HeartBeatGracePeriodMultiplier(1.5),
stomp.ConnOpt.HeartBeatError(60*time.Second),
)
if err != nil {
slog.Error("Problem disconnecting from STOMP server", "err", err)
slog.Error("Cannot connect to STOMP server", "err", err.Error())
return err
}
}()
sub, err := conn.Subscribe(queueName, stomp.AckClient)
if err != nil {
slog.Error("Cannot subscribe to queue", "queue", queueName, "err", err.Error())
return err
}
defer func() {
if !sub.Active() {
return
}
err := sub.Unsubscribe()
defer func() {
err := conn.Disconnect()
if err != nil {
slog.Error("Problem disconnecting from STOMP server", "err", err)
}
}()
sub, err := conn.Subscribe(queueName, stomp.AckClient)
if err != nil {
slog.Error("Problem unsubscribing", "err", err)
}
}()
slog.Info("Subscribed to queue", "queue", queueName)

// Process one message at a time
for {
// Wait for the next message (blocks if the channel is empty)
msg, ok := <-sub.C
if !ok {
// Subscription is no longer active
return fmt.Errorf("subscription to %s is closed", queueName)
slog.Error("Cannot subscribe to queue", "queue", queueName, "err", err.Error())
return err
}

// Check for an empty or nil message
if msg == nil || len(msg.Body) == 0 {
defer func() {
if !sub.Active() {
return fmt.Errorf("no longer subscribed to %s", queueName)
return
}
err := sub.Unsubscribe()
if err != nil {
slog.Error("Problem unsubscribing", "err", err)
}
}()
slog.Info("Subscribed to queue", "queue", queueName)

// Process one message at a time
for {
// Wait for the next message (blocks if the channel is empty)
msg, ok := <-sub.C
if !ok {
// Subscription is no longer active
return fmt.Errorf("subscription to %s is closed", queueName)
}
continue
}

// Process the message synchronously
handleMessage(msg, middleware)
// Check for an empty or nil message
if msg == nil || len(msg.Body) == 0 {
if !sub.Active() {
return fmt.Errorf("no longer subscribed to %s", queueName)
}
continue
}

// Acknowledge the message after successful processing
err := msg.Conn.Ack(msg)
if err != nil {
slog.Error("Failed to acknowledge message", "queue", queueName, "error", err)
// Process the message synchronously
handleMessage(msg, middleware)

// Acknowledge the message after successful processing
err := msg.Conn.Ack(msg)
if err != nil {
slog.Error("Failed to acknowledge message", "queue", queueName, "error", err)
}
}
}

Expand Down

0 comments on commit 70d82c8

Please sign in to comment.