Skip to content

Commit

Permalink
Implemented reconnection logic in queue-worker
Browse files Browse the repository at this point in the history
How it was tested:
I have testd it by deploying it to my local cluster with and without
persistence store in nats (mysql) and I tried to simulate
disconnections from nats server by killing the containers with it,
and trying to async invoke functions

Signed-off-by: Bart Smykla <[email protected]>
  • Loading branch information
Bart Smykla committed Jan 16, 2019
1 parent 4d38388 commit 0c37ef2
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 20 deletions.
173 changes: 153 additions & 20 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"os"
"os/signal"
"strings"
"sync"
"time"

"net/http"
Expand Down Expand Up @@ -49,17 +50,134 @@ func makeClient() http.Client {
return proxyClient
}

type NatsQueue struct {
clusterID string
clientID string
natsURL string

maxReconnect int
reconnectDelay time.Duration
conn stan.Conn
connMutex *sync.RWMutex
quitCh chan struct{}

subject string
qgroup string
durable string
ackWait time.Duration
messageHandler func(*stan.Msg)
startOption stan.SubscriptionOption
maxInFlight stan.SubscriptionOption
subscription stan.Subscription
}

func (q *NatsQueue) init() error {
q.connMutex.Lock()
defer q.connMutex.Unlock()

log.Printf("Connecting to: %s\n", q.natsURL)

sc, err := stan.Connect(
q.clusterID,
q.clientID,
stan.NatsURL(q.natsURL),
stan.SetConnectionLostHandler(func(conn stan.Conn, err error) {
log.Printf("Disconnected from %s\n", q.natsURL)

q.reconnect()
}),
)
if err != nil {
return fmt.Errorf("Can't connect to %s: %v\n", q.natsURL, err)
}

q.conn = sc

log.Printf("Subscribing to: %s at %s\n", q.subject, q.natsURL)
log.Println("Wait for ", q.ackWait)

subscription, err := q.conn.QueueSubscribe(
q.subject,
q.qgroup,
q.messageHandler,
stan.DurableName(q.durable),
stan.AckWait(q.ackWait),
q.startOption,
q.maxInFlight,
)
if err != nil {
return fmt.Errorf("couldn't subscribe to %s at %s. Error: %v\n", q.subject, q.natsURL, err)
}

log.Printf(
"Listening on [%s], clientID=[%s], qgroup=[%s] durable=[%s]\n",
q.subject,
q.clientID,
q.qgroup,
q.durable,
)

q.subscription = subscription

return nil
}

func (q *NatsQueue) reconnect() {
for i := 0; i < q.maxReconnect; i++ {
select {
case <-time.After(time.Duration(i) * q.reconnectDelay):
if err := q.init(); err == nil {
log.Printf("Reconnection (%d/%d) to %s succeeded\n", i+1, q.maxReconnect, q.natsURL)

return
}

nextTryIn := (time.Duration(i+1) * q.reconnectDelay).String()

log.Printf("Reconnection (%d/%d) to %s failed\n", i+1, q.maxReconnect, q.natsURL)
log.Printf("Waiting %s before next try", nextTryIn)
case <-q.quitCh:
log.Println("Received signal to stop reconnecting...")

return
}
}

log.Printf("Reconnection limit (%d) reached\n", q.maxReconnect)
}

func (q *NatsQueue) unsubscribe() error {
q.connMutex.Lock()
defer q.connMutex.Unlock()

if q.subscription != nil {
return fmt.Errorf("q.subscription is nil")
}

return q.subscription.Unsubscribe()
}

func (q *NatsQueue) closeConnection() error {
q.connMutex.Lock()
defer q.connMutex.Unlock()

if q.conn == nil {
return fmt.Errorf("q.conn is nil")
}

close(q.quitCh)

return q.conn.Close()
}

func main() {
readConfig := ReadConfig{}
config := readConfig.Read()
log.SetFlags(0)

clusterID := "faas-cluster"
val, _ := os.Hostname()
clientID := "faas-worker-" + nats.GetClientID(val)
hostname, _ := os.Hostname()

var durable string
var qgroup string
var unsubscribe bool
var credentials *auth.BasicAuthCredentials
var err error
Expand All @@ -73,12 +191,6 @@ func main() {
}

client := makeClient()
sc, err := stan.Connect(clusterID, clientID, stan.NatsURL("nats://"+config.NatsAddress+":4222"))
if err != nil {
log.Fatalf("Can't connect to %s: %v\n", "nats://"+config.NatsAddress+":4222", err)
}

startOpt := stan.StartWithLastReceived()

i := 0
mcb := func(msg *stan.Msg) {
Expand Down Expand Up @@ -196,16 +308,28 @@ func main() {
}
}

subj := "faas-request"
qgroup = "faas"

log.Println("Wait for ", config.AckWait)
sub, err := sc.QueueSubscribe(subj, qgroup, mcb, startOpt, stan.DurableName(durable), stan.MaxInflight(config.MaxInflight), stan.AckWait(config.AckWait))
if err != nil {
log.Panicln(err)
natsQueue := NatsQueue{
clusterID: "faas-cluster",
clientID: "faas-worker-" + nats.GetClientID(hostname),
natsURL: "nats://" + config.NatsAddress + ":4222",

connMutex: &sync.RWMutex{},
maxReconnect: config.MaxReconnect,
reconnectDelay: config.ReconnectDelay,
quitCh: make(chan struct{}),

subject: "faas-request",
qgroup: "faas",
durable: durable,
messageHandler: mcb,
startOption: stan.StartWithLastReceived(),
maxInFlight: stan.MaxInflight(config.MaxInflight),
ackWait: config.AckWait,
}

log.Printf("Listening on [%s], clientID=[%s], qgroup=[%s] durable=[%s]\n", subj, clientID, qgroup, durable)
if initErr := natsQueue.init(); initErr != nil {
log.Panic(initErr)
}

// Wait for a SIGINT (perhaps triggered by user with CTRL-C)
// Run cleanup when signal is received
Expand All @@ -217,9 +341,18 @@ func main() {
fmt.Printf("\nReceived an interrupt, unsubscribing and closing connection...\n\n")
// Do not unsubscribe a durable on exit, except if asked to.
if durable == "" || unsubscribe {
sub.Unsubscribe()
if err := natsQueue.unsubscribe(); err != nil {
log.Panicf(
"Cannot unsubscribe subject: %s from %s because of an error: %v",
natsQueue.subject,
natsQueue.natsURL,
err,
)
}
}
if err := natsQueue.closeConnection(); err != nil {
log.Panicf("Cannot close connection to %s because of an error: %v\n", natsQueue.natsURL, err)
}
sc.Close()
cleanupDone <- true
}
}()
Expand Down
22 changes: 22 additions & 0 deletions readconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,26 @@ func (ReadConfig) Read() QueueWorkerConfig {
}
}

if value, exists := os.LookupEnv("faas_max_reconnect"); exists {
val, err := strconv.Atoi(value)

if err != nil {
log.Println("converting faas_max_reconnect to int error:", err)
} else {
cfg.MaxReconnect = val
}
}

if value, exists := os.LookupEnv("faas_reconnect_delay"); exists {
reconnectDelayVal, durationErr := time.ParseDuration(value)

if durationErr != nil {
log.Println("parse env var: faas_reconnect_delay as time.Duration error:", durationErr)
} else {
cfg.ReconnectDelay = reconnectDelayVal
}
}

if val, exists := os.LookupEnv("ack_wait"); exists {
ackWaitVal, durationErr := time.ParseDuration(val)
if durationErr != nil {
Expand All @@ -78,4 +98,6 @@ type QueueWorkerConfig struct {
WriteDebug bool
MaxInflight int
AckWait time.Duration
MaxReconnect int
ReconnectDelay time.Duration
}

0 comments on commit 0c37ef2

Please sign in to comment.