Skip to content

Commit

Permalink
Added reconnection logic when NATS is disconnected
Browse files Browse the repository at this point in the history
How was it tested:
I deployed this version of nats-queue-worker and then simulated
NATS disconnection

Signed-off-by: Bart Smykla <[email protected]>
  • Loading branch information
Bart Smykla committed Jan 14, 2019
1 parent 4d38388 commit 8ea739e
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 11 deletions.
68 changes: 57 additions & 11 deletions handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,63 @@ import (
"encoding/json"
"fmt"
"log"
"sync"
"time"

"github.com/nats-io/go-nats-streaming"
"github.com/openfaas/faas/gateway/queue"
)

// NatsQueue queue for work
type NatsQueue struct {
nc stan.Conn
ClientID string
ClusterID string
NATSURL string
Topic string
nc stan.Conn
ncMutex *sync.RWMutex
maxReconnect int
delayBetweenReconnect time.Duration

ClientID string
ClusterID string
NATSURL string
Topic string
}

func (q *NatsQueue) connect() error {
nc, err := stan.Connect(
q.ClusterID,
q.ClientID,
stan.NatsURL(q.NATSURL),
stan.SetConnectionLostHandler(func(conn stan.Conn, err error) {
log.Printf("Disconnected from %s\n", q.NATSURL)

q.reconnect(0)
}),
)

if err != nil {
return err
}

q.ncMutex.Lock()
q.nc = nc
q.ncMutex.Unlock()

return nil
}

func (q *NatsQueue) reconnect(iteration int) {
if iteration < q.maxReconnect {
time.Sleep(time.Second * time.Duration(iteration) * q.delayBetweenReconnect)

if err := q.connect(); err != nil {
log.Printf("Reconnection (%d) to %s failed", iteration, q.NATSURL)

q.reconnect(iteration + 1)
} else {
log.Printf("Reconnection (%d) to %s succed", iteration, q.NATSURL)
}
} else {
log.Printf("Reconnection limit (%d) reached\n", q.maxReconnect)
}
}

// CreateNatsQueue ready for asynchronous processing
Expand All @@ -27,30 +72,31 @@ func CreateNatsQueue(address string, port int, clientConfig NatsConfig) (*NatsQu
clientID := clientConfig.GetClientID()
clusterID := "faas-cluster"

nc, err := stan.Connect(clusterID, clientID, stan.NatsURL(natsURL))
queue1 := NatsQueue{
nc: nc,
ClientID: clientID,
ClusterID: clusterID,
NATSURL: natsURL,
Topic: "faas-request",
ncMutex: &sync.RWMutex{},
}

err = queue1.connect()

return &queue1, err
}

// Queue request for processing
func (q *NatsQueue) Queue(req *queue.Request) error {
var err error

fmt.Printf("NatsQueue - submitting request: %s.\n", req.Function)

out, err := json.Marshal(req)
if err != nil {
log.Println(err)
}

err = q.nc.Publish(q.Topic, out)
q.ncMutex.RLock()
nc := q.nc
q.ncMutex.RUnlock()

return err
return nc.Publish(q.Topic, out)
}
20 changes: 20 additions & 0 deletions handler/nats_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,27 @@ package handler

import (
"os"
"time"

"github.com/openfaas/nats-queue-worker/nats"
)

type NatsConfig interface {
GetClientID() string
GetMaxReconnect() int
GetDelayBetweenReconnect() time.Duration
}

type DefaultNatsConfig struct {
maxReconnect int
delayBetweenReconnect time.Duration
}

func NewDefaultNatsConfig() DefaultNatsConfig {
return DefaultNatsConfig{
maxReconnect: 5,
delayBetweenReconnect: time.Second * 2,
}
}

// GetClientID returns the ClientID assigned to this producer/consumer.
Expand All @@ -19,6 +31,14 @@ func (DefaultNatsConfig) GetClientID() string {
return getClientID(val)
}

func (c *DefaultNatsConfig) GetMaxReconnect() int {
return c.maxReconnect
}

func (c *DefaultNatsConfig) GetDelayBetweenReconnect() time.Duration {
return c.delayBetweenReconnect
}

func getClientID(hostname string) string {
return "faas-publisher-" + nats.GetClientID(hostname)
}

0 comments on commit 8ea739e

Please sign in to comment.