Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implemented reconnection logic in queue-worker #52

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func AddBasicAuth(req *http.Request) error {

credentials, err := reader.Read()
if err != nil {
return fmt.Errorf("Unable to read basic auth: %s", err.Error())
return fmt.Errorf("unable to read basic auth: %s", err.Error())
}

req.SetBasicAuth(credentials.User, credentials.Password)
Expand All @@ -37,7 +37,7 @@ func LoadCredentials() (*auth.BasicAuthCredentials, error) {

credentials, err := reader.Read()
if err != nil {
return nil, fmt.Errorf("Unable to read basic auth: %s", err.Error())
return nil, fmt.Errorf("unable to read basic auth: %s", err.Error())
}
return credentials, nil
}
177 changes: 157 additions & 20 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os"
"os/signal"
"strings"
"sync"
"time"

"github.com/nats-io/go-nats-streaming"
Expand Down Expand Up @@ -48,17 +49,134 @@ func makeClient() http.Client {
return proxyClient
}

type NATSQueue struct {
clusterID string
clientID string
natsURL string

maxReconnect int
reconnectDelay time.Duration
conn stan.Conn
connMutex *sync.RWMutex
quitCh chan struct{}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If quitCh is for a graceful shutdown and we're editing the file how about shutdownCh?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we even need a suffix of ch in that instance?

Copy link
Contributor Author

@bartsmykla bartsmykla Jan 29, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there is ch suffix it's easier to understand looking into to code. At least for me and I saw that pattern a lot in Go code. If you want I can remove it but I will stand behind the idea to leave the suffix.


subject string
qgroup string
durable string
ackWait time.Duration
messageHandler func(*stan.Msg)
startOption stan.SubscriptionOption
maxInFlight stan.SubscriptionOption
subscription stan.Subscription
}

func (q *NATSQueue) init() error {
log.Printf("Connecting to: %s\n", q.natsURL)

sc, err := stan.Connect(
q.clusterID,
q.clientID,
stan.NatsURL(q.natsURL),
stan.SetConnectionLostHandler(func(conn stan.Conn, err error) {
log.Printf("Disconnected from %s\n", q.natsURL)

q.reconnect()
}),
)
if err != nil {
return fmt.Errorf("can't connect to %s: %v\n", q.natsURL, err)
}

q.connMutex.Lock()
defer q.connMutex.Unlock()

q.conn = sc
bartsmykla marked this conversation as resolved.
Show resolved Hide resolved

log.Printf("Subscribing to: %s at %s\n", q.subject, q.natsURL)
log.Println("Wait for ", q.ackWait)

subscription, err := q.conn.QueueSubscribe(
q.subject,
q.qgroup,
q.messageHandler,
stan.DurableName(q.durable),
stan.AckWait(q.ackWait),
q.startOption,
q.maxInFlight,
)
if err != nil {
return fmt.Errorf("couldn't subscribe to %s at %s. Error: %v\n", q.subject, q.natsURL, err)
}

log.Printf(
"Listening on [%s], clientID=[%s], qgroup=[%s] durable=[%s]\n",
q.subject,
q.clientID,
q.qgroup,
q.durable,
)

q.subscription = subscription

return nil
}

func (q *NATSQueue) reconnect() {
for i := 0; i < q.maxReconnect; i++ {
bartsmykla marked this conversation as resolved.
Show resolved Hide resolved
select {
case <-time.After(time.Duration(i) * q.reconnectDelay):
if err := q.init(); err == nil {
log.Printf("Reconnecting (%d/%d) to %s succeeded\n", i+1, q.maxReconnect, q.natsURL)

return
}

nextTryIn := (time.Duration(i+1) * q.reconnectDelay).String()

log.Printf("Reconnecting (%d/%d) to %s failed\n", i+1, q.maxReconnect, q.natsURL)
log.Printf("Waiting %s before next try", nextTryIn)
case <-q.quitCh:
log.Println("Received signal to stop reconnecting...")

return
}
}

log.Printf("Reconnecting limit (%d) reached\n", q.maxReconnect)
}

func (q *NATSQueue) unsubscribe() error {
q.connMutex.Lock()
defer q.connMutex.Unlock()

if q.subscription != nil {
return fmt.Errorf("q.subscription is nil")
}

return q.subscription.Unsubscribe()
}

func (q *NATSQueue) closeConnection() error {
q.connMutex.Lock()
defer q.connMutex.Unlock()

if q.conn == nil {
return fmt.Errorf("q.conn is nil")
}

close(q.quitCh)

return q.conn.Close()
}

func main() {
readConfig := ReadConfig{}
config := readConfig.Read()
log.SetFlags(0)

clusterID := "faas-cluster"
val, _ := os.Hostname()
clientID := "faas-worker-" + nats.GetClientID(val)
hostname, _ := os.Hostname()

var durable string
var qgroup string
var unsubscribe bool
var credentials *auth.BasicAuthCredentials
var err error
Expand All @@ -72,15 +190,9 @@ func main() {
}

client := makeClient()
sc, err := stan.Connect(clusterID, clientID, stan.NatsURL("nats://"+config.NatsAddress+":4222"))
if err != nil {
log.Fatalf("Can't connect to %s: %v\n", "nats://"+config.NatsAddress+":4222", err)
}

startOpt := stan.StartWithLastReceived()

i := 0
mcb := func(msg *stan.Msg) {
messageHandler := func(msg *stan.Msg) {
i++

printMsg(msg, i)
Expand Down Expand Up @@ -195,16 +307,32 @@ func main() {
}
}

subj := "faas-request"
qgroup = "faas"
natsURL := "nats://" + config.NatsAddress + ":4222"

log.Printf("AckWait set to: %s", config.AckWait.String())
sub, err := sc.QueueSubscribe(subj, qgroup, mcb, startOpt, stan.DurableName(durable), stan.MaxInflight(config.MaxInflight), stan.AckWait(config.AckWait))
if err != nil {
log.Panicln(err)
go nats.Init("http://" + config.NatsAddress + ":8222")

natsQueue := NATSQueue{
clusterID: "faas-cluster",
clientID: "faas-worker-" + nats.GetClientID(hostname),
natsURL: natsURL,

connMutex: &sync.RWMutex{},
maxReconnect: config.MaxReconnect,
reconnectDelay: config.ReconnectDelay,
quitCh: make(chan struct{}),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whilst making changes for the other comments, can struct{} be exchanged for bool or int?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I strongly disagree with that one. This is common pattern to do signalling channels when you are not interested in the value being sent to the channel to use struct{} because it is not allocating any memory for that.


subject: "faas-request",
qgroup: "faas",
durable: durable,
messageHandler: messageHandler,
startOption: stan.StartWithLastReceived(),
maxInFlight: stan.MaxInflight(config.MaxInflight),
ackWait: config.AckWait,
}

log.Printf("Listening on [%s], clientID=[%s], qgroup=[%s] durable=[%s]\n", subj, clientID, qgroup, durable)
if initErr := natsQueue.init(); initErr != nil {
log.Panic(initErr)
}

// Wait for a SIGINT (perhaps triggered by user with CTRL-C)
// Run cleanup when signal is received
Expand All @@ -216,9 +344,18 @@ func main() {
fmt.Printf("\nReceived an interrupt, unsubscribing and closing connection...\n\n")
// Do not unsubscribe a durable on exit, except if asked to.
if durable == "" || unsubscribe {
sub.Unsubscribe()
if err := natsQueue.unsubscribe(); err != nil {
log.Panicf(
"Cannot unsubscribe subject: %s from %s because of an error: %v",
natsQueue.subject,
natsQueue.natsURL,
err,
)
}
}
if err := natsQueue.closeConnection(); err != nil {
log.Panicf("Cannot close connection to %s because of an error: %v\n", natsQueue.natsURL, err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a panic or just logged?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Panic should be fine, because it'll still unwind the stack, and it's after we received signal to close everything

}
sc.Close()
cleanupDone <- true
}
}()
Expand Down
22 changes: 22 additions & 0 deletions readconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,26 @@ func (ReadConfig) Read() QueueWorkerConfig {
}
}

if value, exists := os.LookupEnv("faas_max_reconnect"); exists {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure we should have the faas prefix here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, do you want me to remove it or not?

val, err := strconv.Atoi(value)

if err != nil {
log.Println("converting faas_max_reconnect to int error:", err)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So what is the default otherwise? 0?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

} else {
cfg.MaxReconnect = val
}
}

if value, exists := os.LookupEnv("faas_reconnect_delay"); exists {
reconnectDelayVal, durationErr := time.ParseDuration(value)

if durationErr != nil {
log.Println("parse env var: faas_reconnect_delay as time.Duration error:", durationErr)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the default?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

0

} else {
cfg.ReconnectDelay = reconnectDelayVal
}
}

if val, exists := os.LookupEnv("ack_wait"); exists {
ackWaitVal, durationErr := time.ParseDuration(val)
if durationErr != nil {
Expand All @@ -78,4 +98,6 @@ type QueueWorkerConfig struct {
WriteDebug bool
MaxInflight int
AckWait time.Duration
MaxReconnect int
ReconnectDelay time.Duration
}