Skip to content

Commit

Permalink
Added reconnection logic when NATS is disconnected
Browse files Browse the repository at this point in the history
How was it tested:
I deployed this version of nats-queue-worker and then simulated
NATS disconnection

Signed-off-by: Bart Smykla <[email protected]>
  • Loading branch information
Bart Smykla committed Jan 14, 2019
1 parent 4d38388 commit e110413
Showing 1 changed file with 45 additions and 2 deletions.
47 changes: 45 additions & 2 deletions handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,17 @@ import (
"encoding/json"
"fmt"
"log"
"time"

"github.com/nats-io/go-nats-streaming"
"github.com/openfaas/faas/gateway/queue"
)

var (
maxReconnect = 5
delayBetweenReconnect = 2
)

// NatsQueue queue for work
type NatsQueue struct {
nc stan.Conn
Expand All @@ -18,6 +24,43 @@ type NatsQueue struct {
Topic string
}

func (q *NatsQueue) connect() error {
nc, err := stan.Connect(
q.ClusterID,
q.ClientID,
stan.NatsURL(q.NATSURL),
stan.SetConnectionLostHandler(func(conn stan.Conn, err error) {
log.Printf("Disconnected from %s\n", q.NATSURL)

q.reconnect(0)
}),
)

if err != nil {
return err
}

q.nc = nc

return nil
}

func (q *NatsQueue) reconnect(iteration int) {
log.Printf("Trying to reconnect (%d) to %s\n", iteration, q.NATSURL)

if iteration < maxReconnect {
time.Sleep(time.Second * time.Duration(iteration * delayBetweenReconnect))

if err := q.connect(); err != nil {
log.Printf("Reconnection (%d) to %s failed", iteration, q.NATSURL)

q.reconnect(iteration + 1)
} else {
log.Printf("Reconnection (%d) to %s succed", iteration, q.NATSURL)
}
}
}

// CreateNatsQueue ready for asynchronous processing
func CreateNatsQueue(address string, port int, clientConfig NatsConfig) (*NatsQueue, error) {
var err error
Expand All @@ -27,15 +70,15 @@ func CreateNatsQueue(address string, port int, clientConfig NatsConfig) (*NatsQu
clientID := clientConfig.GetClientID()
clusterID := "faas-cluster"

nc, err := stan.Connect(clusterID, clientID, stan.NatsURL(natsURL))
queue1 := NatsQueue{
nc: nc,
ClientID: clientID,
ClusterID: clusterID,
NATSURL: natsURL,
Topic: "faas-request",
}

err = queue1.connect()

return &queue1, err
}

Expand Down

0 comments on commit e110413

Please sign in to comment.