diff --git a/main.go b/main.go index 6a1b024..0c1036e 100644 --- a/main.go +++ b/main.go @@ -11,6 +11,7 @@ import ( "os" "os/signal" "strings" + "sync" "time" "net/http" @@ -49,17 +50,134 @@ func makeClient() http.Client { return proxyClient } +type NatsQueue struct { + clusterID string + clientID string + natsURL string + + maxReconnect int + reconnectDelay time.Duration + conn stan.Conn + connMutex *sync.RWMutex + quitCh chan struct{} + + subject string + qgroup string + durable string + ackWait time.Duration + messageHandler func(*stan.Msg) + startOption stan.SubscriptionOption + maxInFlight stan.SubscriptionOption + subscription stan.Subscription +} + +func (q *NatsQueue) init() error { + q.connMutex.Lock() + defer q.connMutex.Unlock() + + log.Printf("Connecting to: %s\n", q.natsURL) + + sc, err := stan.Connect( + q.clusterID, + q.clientID, + stan.NatsURL(q.natsURL), + stan.SetConnectionLostHandler(func(conn stan.Conn, err error) { + log.Printf("Disconnected from %s\n", q.natsURL) + + q.reconnect() + }), + ) + if err != nil { + return fmt.Errorf("Can't connect to %s: %v\n", q.natsURL, err) + } + + q.conn = sc + + log.Printf("Subscribing to: %s at %s\n", q.subject, q.natsURL) + log.Println("Wait for ", q.ackWait) + + subscription, err := q.conn.QueueSubscribe( + q.subject, + q.qgroup, + q.messageHandler, + stan.DurableName(q.durable), + stan.AckWait(q.ackWait), + q.startOption, + q.maxInFlight, + ) + if err != nil { + return fmt.Errorf("couldn't subscribe to %s at %s. Error: %v\n", q.subject, q.natsURL, err) + } + + log.Printf( + "Listening on [%s], clientID=[%s], qgroup=[%s] durable=[%s]\n", + q.subject, + q.clientID, + q.qgroup, + q.durable, + ) + + q.subscription = subscription + + return nil +} + +func (q *NatsQueue) reconnect() { + for i := 0; i < q.maxReconnect; i++ { + select { + case <-time.After(time.Duration(i) * q.reconnectDelay): + if err := q.init(); err == nil { + log.Printf("Reconnection (%d/%d) to %s succeeded\n", i+1, q.maxReconnect, q.natsURL) + + return + } + + nextTryIn := (time.Duration(i+1) * q.reconnectDelay).String() + + log.Printf("Reconnection (%d/%d) to %s failed\n", i+1, q.maxReconnect, q.natsURL) + log.Printf("Waiting %s before next try", nextTryIn) + case <-q.quitCh: + log.Println("Received signal to stop reconnecting...") + + return + } + } + + log.Printf("Reconnection limit (%d) reached\n", q.maxReconnect) +} + +func (q *NatsQueue) unsubscribe() error { + q.connMutex.Lock() + defer q.connMutex.Unlock() + + if q.subscription != nil { + return fmt.Errorf("q.subscription is nil") + } + + return q.subscription.Unsubscribe() +} + +func (q *NatsQueue) closeConnection() error { + q.connMutex.Lock() + defer q.connMutex.Unlock() + + if q.conn == nil { + return fmt.Errorf("q.conn is nil") + } + + close(q.quitCh) + + return q.conn.Close() +} + func main() { readConfig := ReadConfig{} config := readConfig.Read() log.SetFlags(0) - clusterID := "faas-cluster" - val, _ := os.Hostname() - clientID := "faas-worker-" + nats.GetClientID(val) + hostname, _ := os.Hostname() var durable string - var qgroup string var unsubscribe bool var credentials *auth.BasicAuthCredentials var err error @@ -73,12 +191,6 @@ func main() { } client := makeClient() - sc, err := stan.Connect(clusterID, clientID, stan.NatsURL("nats://"+config.NatsAddress+":4222")) - if err != nil { - log.Fatalf("Can't connect to %s: %v\n", "nats://"+config.NatsAddress+":4222", err) - } - - startOpt := stan.StartWithLastReceived() i := 0 mcb := func(msg *stan.Msg) { @@ -196,16 +308,28 @@ func main() { } } - subj := "faas-request" - qgroup = "faas" - - log.Println("Wait for ", config.AckWait) - sub, err := sc.QueueSubscribe(subj, qgroup, mcb, startOpt, stan.DurableName(durable), stan.MaxInflight(config.MaxInflight), stan.AckWait(config.AckWait)) - if err != nil { - log.Panicln(err) + natsQueue := NatsQueue{ + clusterID: "faas-cluster", + clientID: "faas-worker-" + nats.GetClientID(hostname), + natsURL: "nats://" + config.NatsAddress + ":4222", + + connMutex: &sync.RWMutex{}, + maxReconnect: config.MaxReconnect, + reconnectDelay: config.ReconnectDelay, + quitCh: make(chan struct{}), + + subject: "faas-request", + qgroup: "faas", + durable: durable, + messageHandler: mcb, + startOption: stan.StartWithLastReceived(), + maxInFlight: stan.MaxInflight(config.MaxInflight), + ackWait: config.AckWait, } - log.Printf("Listening on [%s], clientID=[%s], qgroup=[%s] durable=[%s]\n", subj, clientID, qgroup, durable) + if initErr := natsQueue.init(); initErr != nil { + log.Panic(initErr) + } // Wait for a SIGINT (perhaps triggered by user with CTRL-C) // Run cleanup when signal is received @@ -217,9 +341,18 @@ func main() { fmt.Printf("\nReceived an interrupt, unsubscribing and closing connection...\n\n") // Do not unsubscribe a durable on exit, except if asked to. if durable == "" || unsubscribe { - sub.Unsubscribe() + if err := natsQueue.unsubscribe(); err != nil { + log.Panicf( + "Cannot unsubscribe subject: %s from %s because of an error: %v", + natsQueue.subject, + natsQueue.natsURL, + err, + ) + } + } + if err := natsQueue.closeConnection(); err != nil { + log.Panicf("Cannot close connection to %s because of an error: %v\n", natsQueue.natsURL, err) } - sc.Close() cleanupDone <- true } }() diff --git a/readconfig.go b/readconfig.go index 2473e9a..5c1fd62 100644 --- a/readconfig.go +++ b/readconfig.go @@ -58,6 +58,26 @@ func (ReadConfig) Read() QueueWorkerConfig { } } + if value, exists := os.LookupEnv("faas_max_reconnect"); exists { + val, err := strconv.Atoi(value) + + if err != nil { + log.Println("converting faas_max_reconnect to int error:", err) + } else { + cfg.MaxReconnect = val + } + } + + if value, exists := os.LookupEnv("faas_reconnect_delay"); exists { + reconnectDelayVal, durationErr := time.ParseDuration(value) + + if durationErr != nil { + log.Println("parse env var: faas_reconnect_delay as time.Duration error:", durationErr) + } else { + cfg.ReconnectDelay = reconnectDelayVal + } + } + if val, exists := os.LookupEnv("ack_wait"); exists { ackWaitVal, durationErr := time.ParseDuration(val) if durationErr != nil { @@ -78,4 +98,6 @@ type QueueWorkerConfig struct { WriteDebug bool MaxInflight int AckWait time.Duration + MaxReconnect int + ReconnectDelay time.Duration }