Skip to content

Commit

Permalink
DGW/mqtt: fixed reconnection to the broker (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
krylovsk committed Sep 23, 2014
1 parent 4029a16 commit 580a38f
Showing 1 changed file with 30 additions and 21 deletions.
51 changes: 30 additions & 21 deletions cmd/device-gateway/mqtt.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import (
)

type MQTTPublisher struct {
config *MqttProtocol
client *MQTT.MqttClient
dataCh chan AgentResponse
config *MqttProtocol
clientId string
client *MQTT.MqttClient
dataCh chan AgentResponse
}

func newMQTTPublisher(conf *Config) *MQTTPublisher {
Expand Down Expand Up @@ -45,17 +46,19 @@ func newMQTTPublisher(conf *Config) *MQTTPublisher {
return nil
}

// Prepare MQTT connection opts
broker := fmt.Sprintf("tcp://%s:%v", config.Host, config.Port)
clientId := conf.Id
connOpts := MQTT.NewClientOptions().AddBroker(broker).SetClientId(clientId).SetCleanSession(true).SetOnConnectionLost(onConnectionLost)

// Create and return publisher
publisher := &MQTTPublisher{
config: &config,
client: MQTT.NewClient(connOpts),
dataCh: make(chan AgentResponse),
config: &config,
clientId: conf.Id,
dataCh: make(chan AgentResponse),
}

// Prepare MQTT connection opts
broker := fmt.Sprintf("tcp://%s:%v", config.Host, config.Port)
connOpts := MQTT.NewClientOptions().AddBroker(broker).SetClientId(publisher.clientId).
SetCleanSession(true).SetOnConnectionLost(publisher.onConnectionLost)

publisher.client = MQTT.NewClient(connOpts)
return publisher
}

Expand All @@ -67,7 +70,7 @@ func (self *MQTTPublisher) start() {
log.Println("MQTTPublisher.start()")
// start the connection routine
log.Printf("MQTTPublisher: Will connect to the broker tcp://%s:%v", self.config.Host, self.config.Port)
go connect(self.client, 0)
go self.connect(0)

qos := 1
prefix := self.config.Prefix
Expand Down Expand Up @@ -95,30 +98,36 @@ func (self *MQTTPublisher) stop() {
}
}

func connect(client *MQTT.MqttClient, backOff int) {
log.Println("MQTTPublisher.connect() with backOff (sec): ", backOff)
func (self *MQTTPublisher) connect(backOff int) {
log.Printf("MQTTPublisher: connecting to the broker %s:%v, backOff: %v sec\n", self.config.Host, self.config.Port, backOff)
// sleep for backOff seconds
time.Sleep(time.Duration(backOff) * time.Second)
_, err := client.Start()
_, err := self.client.Start()

if err != nil {
log.Printf("Failed to connected to MQTT broker: %v\n", err.Error())
log.Printf("MQTTPublisher: failed to connect: %v\n", err.Error())
// intial backOff 10 sec, every further retry backOff*2 unless <= 10 min
if backOff == 0 {
backOff = 10
} else if backOff <= 600 {
backOff *= 2
}
go connect(client, backOff)
go self.connect(backOff)
return
}

log.Printf("MQTTPublisher: Connected to the broker")
log.Printf("MQTTPublisher: connected to the broker %s:%v", self.config.Host, self.config.Port)
return
}

func onConnectionLost(client *MQTT.MqttClient, reason error) {
func (self *MQTTPublisher) onConnectionLost(client *MQTT.MqttClient, reason error) {
log.Println("MQTTPulbisher: lost connection to the broker: ", reason.Error())
// FIXME: bug in mqtt library (panic on reconnect)?
// go connect(client, 0)

// Initialize a new client and reconnect
broker := fmt.Sprintf("tcp://%s:%v", self.config.Host, self.config.Port)
connOpts := MQTT.NewClientOptions().AddBroker(broker).SetClientId(self.clientId).
SetCleanSession(true).SetOnConnectionLost(self.onConnectionLost)

self.client = MQTT.NewClient(connOpts)
go self.connect(0)
}

0 comments on commit 580a38f

Please sign in to comment.