From 70d82c80b833930ed3894a47152f923c12ce8722 Mon Sep 17 00:00:00 2001 From: Joe Corall Date: Mon, 16 Dec 2024 09:19:37 -0500 Subject: [PATCH] Add some more timeout settings in STOMP connection --- stomp.go | 93 +++++++++++++++++++++++++++++--------------------------- 1 file changed, 49 insertions(+), 44 deletions(-) diff --git a/stomp.go b/stomp.go index 3e3903d..ff6af3c 100644 --- a/stomp.go +++ b/stomp.go @@ -77,58 +77,63 @@ func RecvAndProcessMessage(queueName string, middleware scyllaridae.QueueMiddlew slog.Error("Cannot set keepalive period", "err", err.Error()) return err } - - conn, err := stomp.Connect(tcpConn, stomp.ConnOpt.HeartBeat(10*time.Second, 10*time.Second)) - if err != nil { - slog.Error("Cannot connect to STOMP server", "err", err.Error()) - return err - } - defer func() { - err := conn.Disconnect() + for { + conn, err := stomp.Connect(tcpConn, + stomp.ConnOpt.HeartBeat(10*time.Second, 10*time.Second), + stomp.ConnOpt.HeartBeatGracePeriodMultiplier(1.5), + stomp.ConnOpt.HeartBeatError(60*time.Second), + ) if err != nil { - slog.Error("Problem disconnecting from STOMP server", "err", err) + slog.Error("Cannot connect to STOMP server", "err", err.Error()) + return err } - }() - sub, err := conn.Subscribe(queueName, stomp.AckClient) - if err != nil { - slog.Error("Cannot subscribe to queue", "queue", queueName, "err", err.Error()) - return err - } - defer func() { - if !sub.Active() { - return - } - err := sub.Unsubscribe() + defer func() { + err := conn.Disconnect() + if err != nil { + slog.Error("Problem disconnecting from STOMP server", "err", err) + } + }() + sub, err := conn.Subscribe(queueName, stomp.AckClient) if err != nil { - slog.Error("Problem unsubscribing", "err", err) - } - }() - slog.Info("Subscribed to queue", "queue", queueName) - - // Process one message at a time - for { - // Wait for the next message (blocks if the channel is empty) - msg, ok := <-sub.C - if !ok { - // Subscription is no longer active - return fmt.Errorf("subscription to %s is closed", queueName) + slog.Error("Cannot subscribe to queue", "queue", queueName, "err", err.Error()) + return err } - - // Check for an empty or nil message - if msg == nil || len(msg.Body) == 0 { + defer func() { if !sub.Active() { - return fmt.Errorf("no longer subscribed to %s", queueName) + return + } + err := sub.Unsubscribe() + if err != nil { + slog.Error("Problem unsubscribing", "err", err) + } + }() + slog.Info("Subscribed to queue", "queue", queueName) + + // Process one message at a time + for { + // Wait for the next message (blocks if the channel is empty) + msg, ok := <-sub.C + if !ok { + // Subscription is no longer active + return fmt.Errorf("subscription to %s is closed", queueName) } - continue - } - // Process the message synchronously - handleMessage(msg, middleware) + // Check for an empty or nil message + if msg == nil || len(msg.Body) == 0 { + if !sub.Active() { + return fmt.Errorf("no longer subscribed to %s", queueName) + } + continue + } - // Acknowledge the message after successful processing - err := msg.Conn.Ack(msg) - if err != nil { - slog.Error("Failed to acknowledge message", "queue", queueName, "error", err) + // Process the message synchronously + handleMessage(msg, middleware) + + // Acknowledge the message after successful processing + err := msg.Conn.Ack(msg) + if err != nil { + slog.Error("Failed to acknowledge message", "queue", queueName, "error", err) + } } }