Skip to content

Commit

Permalink
Make mqtt topics configurable.
Browse files Browse the repository at this point in the history
  • Loading branch information
brocaar committed Feb 26, 2018
1 parent 359742a commit 7fc9b2d
Show file tree
Hide file tree
Showing 9 changed files with 149 additions and 34 deletions.
14 changes: 14 additions & 0 deletions cmd/lora-gateway-bridge/cmd/configfile.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,20 @@ skip_crc_check = {{ .PacketForwarder.SkipCRCCheck }}
# Configuration for the MQTT backend.
[backend.mqtt]
# MQTT topic templates for the different MQTT topics.
#
# The meaning of these topics are documented at:
# https://docs.loraserver.io/lora-gateway-bridge/use/data/
#
# The default values match the default expected configuration of the
# LoRa Server MQTT backend. Therefore only change these values when
# absolutely needed.
# Use "{{ "{{ .MAC }}" }}" as an substitution for the LoRa gateway MAC.
uplink_topic_template="gateway/{{ "{{ .MAC }}" }}/rx"
downlink_topic_template="gateway/{{ "{{ .MAC }}" }}/tx"
stats_topic_template="gateway/{{ "{{ .MAC }}" }}/stats"
ack_topic_template="gateway/{{ "{{ .MAC }}" }}/ack"
# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="{{ .Backend.MQTT.Server }}"
Expand Down
34 changes: 27 additions & 7 deletions cmd/lora-gateway-bridge/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,14 @@ func init() {
rootCmd.PersistentFlags().Int("log-level", 4, "debug=5, info=4, error=2, fatal=1, panic=0")

// for backwards compatibility
rootCmd.PersistentFlags().String("udp-bind", "0.0.0.0:1700", "ip:port to bind the UDP listener to")
rootCmd.PersistentFlags().String("mqtt-server", "tcp://127.0.0.1:1883", "mqtt server (e.g. scheme://host:port where scheme is tcp, ssl or ws)")
rootCmd.PersistentFlags().String("mqtt-username", "", "mqtt server username (optional)")
rootCmd.PersistentFlags().String("mqtt-password", "", "mqtt server password (optional)")
rootCmd.PersistentFlags().String("mqtt-ca-cert", "", "mqtt CA certificate file (optional)")
rootCmd.PersistentFlags().String("udp-bind", "", "")
rootCmd.PersistentFlags().String("mqtt-server", "", "")
rootCmd.PersistentFlags().String("mqtt-username", "", "")
rootCmd.PersistentFlags().String("mqtt-password", "", "")
rootCmd.PersistentFlags().String("mqtt-ca-cert", "", "")
rootCmd.PersistentFlags().String("mqtt-tls-cert", "", "")
rootCmd.PersistentFlags().String("mqtt-tls-key", "", "")
rootCmd.PersistentFlags().Bool("skip-crc-check", false, "skip the CRC status-check of received packets")
rootCmd.PersistentFlags().Bool("skip-crc-check", false, "")
rootCmd.PersistentFlags().MarkHidden("udp-bind")
rootCmd.PersistentFlags().MarkHidden("mqtt-server")
rootCmd.PersistentFlags().MarkHidden("mqtt-username")
Expand Down Expand Up @@ -76,6 +76,15 @@ func init() {
viper.BindPFlag("backend.mqtt.tls_cert", rootCmd.PersistentFlags().Lookup("mqtt-tls-cert"))
viper.BindPFlag("backend.mqtt.tls_key", rootCmd.PersistentFlags().Lookup("mqtt-tls-key"))

// default values
viper.SetDefault("packet_forwarder.udp_bind", "0.0.0.0:1700")

viper.SetDefault("backend.mqtt.uplink_topic_template", "gateway/{{ .MAC }}/rx")
viper.SetDefault("backend.mqtt.downlink_topic_template", "gateway/{{ .MAC }}/tx")
viper.SetDefault("backend.mqtt.stats_topic_template", "gateway/{{ .MAC }}/stats")
viper.SetDefault("backend.mqtt.ack_topic_template", "gateway/{{ .MAC }}/ack")
viper.SetDefault("backend.mqtt.server", "tcp://127.0.0.1:1883")

rootCmd.AddCommand(versionCmd)
rootCmd.AddCommand(configCmd)
}
Expand All @@ -99,7 +108,18 @@ func run(cmd *cobra.Command, args []string) error {
var pubsub *mqttpubsub.Backend
for {
var err error
pubsub, err = mqttpubsub.NewBackend(config.C.Backend.MQTT.Server, config.C.Backend.MQTT.Username, config.C.Backend.MQTT.Password, config.C.Backend.MQTT.CACert, config.C.Backend.MQTT.TLSCert, config.C.Backend.MQTT.TLSKey)
pubsub, err = mqttpubsub.NewBackend(
config.C.Backend.MQTT.Server,
config.C.Backend.MQTT.Username,
config.C.Backend.MQTT.Password,
config.C.Backend.MQTT.CACert,
config.C.Backend.MQTT.TLSCert,
config.C.Backend.MQTT.TLSKey,
config.C.Backend.MQTT.UplinkTopicTemplate,
config.C.Backend.MQTT.DownlinkTopicTemplate,
config.C.Backend.MQTT.StatsTopicTemplate,
config.C.Backend.MQTT.AckTopicTemplate,
)
if err == nil {
break
}
Expand Down
2 changes: 1 addition & 1 deletion docs/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,4 +32,4 @@ googleAnalytics = "UA-3512995-9"
weight = 4

[params]
version = "2.3.0"
version = "2.3.1"
14 changes: 14 additions & 0 deletions docs/content/install/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,20 @@ skip_crc_check = false

# Configuration for the MQTT backend.
[backend.mqtt]
# MQTT topic templates for the different MQTT topic.
#
# The meaning of these topics are documented at:
# https://docs.loraserver.io/lora-gateway-bridge/use/data/
#
# The default values match the default expected configuration of the
# LoRa Server MQTT backend. Therefore only change these values when
# absolutely needed.
# Use "{{ .MAC }}" as an substitution for the LoRa gateway MAC.
uplink_topic_template="gateway/{{ .MAC }}/rx"
downlink_topic_template="gateway/{{ .MAC }}/tx"
stats_topic_template="gateway/{{ .MAC }}/stats"
ack_topic_template="gateway/{{ .MAC }}/ack"

# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="tcp://127.0.0.1:1883"

Expand Down
7 changes: 7 additions & 0 deletions docs/content/overview/changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,13 @@ menu:

## Changelog

### 2.3.1

**Improvements:**

* MQTT topics are now configurable through the configuration file.
See [Configuration](https://docs.loraserver.io/lora-gateway-bridge/install/config/).

### 2.3.0

**Features:**
Expand Down
72 changes: 56 additions & 16 deletions internal/backend/mqttpubsub/backend.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
package mqttpubsub

import (
"bytes"
"crypto/tls"
"crypto/x509"
"encoding/json"
"fmt"
"io/ioutil"
"sync"
"text/template"
"time"

"github.com/brocaar/loraserver/api/gw"
"github.com/brocaar/lorawan"
"github.com/eclipse/paho.mqtt.golang"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)

Expand All @@ -21,15 +24,42 @@ type Backend struct {
txPacketChan chan gw.TXPacketBytes
gateways map[lorawan.EUI64]struct{}
mutex sync.RWMutex

UplinkTemplate *template.Template
DownlinkTemplate *template.Template
StatsTemplate *template.Template
AckTemplate *template.Template
}

// NewBackend creates a new Backend.
func NewBackend(server, username, password, cafile, certFile, certKeyFile string) (*Backend, error) {
func NewBackend(server, username, password, cafile, certFile, certKeyFile, uplinkTopic, downlinkTopic, statsTopic, ackTopic string) (*Backend, error) {
var err error

b := Backend{
txPacketChan: make(chan gw.TXPacketBytes),
gateways: make(map[lorawan.EUI64]struct{}),
}

b.UplinkTemplate, err = template.New("uplink").Parse(uplinkTopic)
if err != nil {
return nil, errors.Wrap(err, "parse uplink template error")
}

b.DownlinkTemplate, err = template.New("downlink").Parse(downlinkTopic)
if err != nil {
return nil, errors.Wrap(err, "parse downlink template error")
}

b.StatsTemplate, err = template.New("stats").Parse(statsTopic)
if err != nil {
return nil, errors.Wrap(err, "parse stats template error")
}

b.AckTemplate, err = template.New("ack").Parse(ackTopic)
if err != nil {
return nil, errors.Wrap(err, "parse ack template error")
}

opts := mqtt.NewClientOptions()
opts.AddBroker(server)
opts.SetUsername(username)
Expand Down Expand Up @@ -114,9 +144,13 @@ func (b *Backend) SubscribeGatewayTX(mac lorawan.EUI64) error {
defer b.mutex.Unlock()
b.mutex.Lock()

topic := fmt.Sprintf("gateway/%s/tx", mac.String())
log.WithField("topic", topic).Info("backend: subscribing to topic")
if token := b.conn.Subscribe(topic, 0, b.txPacketHandler); token.Wait() && token.Error() != nil {
topic := bytes.NewBuffer(nil)
if err := b.DownlinkTemplate.Execute(topic, struct{ MAC lorawan.EUI64 }{mac}); err != nil {
return errors.Wrap(err, "execute uplink template error")
}

log.WithField("topic", topic.String()).Info("backend: subscribing to topic")
if token := b.conn.Subscribe(topic.String(), 0, b.txPacketHandler); token.Wait() && token.Error() != nil {
return token.Error()
}
b.gateways[mac] = struct{}{}
Expand All @@ -129,9 +163,13 @@ func (b *Backend) UnSubscribeGatewayTX(mac lorawan.EUI64) error {
defer b.mutex.Unlock()
b.mutex.Lock()

topic := fmt.Sprintf("gateway/%s/tx", mac.String())
log.WithField("topic", topic).Info("backend: unsubscribing from topic")
if token := b.conn.Unsubscribe(topic); token.Wait() && token.Error() != nil {
topic := bytes.NewBuffer(nil)
if err := b.DownlinkTemplate.Execute(topic, struct{ MAC lorawan.EUI64 }{mac}); err != nil {
return errors.Wrap(err, "execute uplink template error")
}

log.WithField("topic", topic.String()).Info("backend: unsubscribing from topic")
if token := b.conn.Unsubscribe(topic.String()); token.Wait() && token.Error() != nil {
return token.Error()
}
delete(b.gateways, mac)
Expand All @@ -140,29 +178,31 @@ func (b *Backend) UnSubscribeGatewayTX(mac lorawan.EUI64) error {

// PublishGatewayRX publishes a RX packet to the MQTT broker.
func (b *Backend) PublishGatewayRX(mac lorawan.EUI64, rxPacket gw.RXPacketBytes) error {
topic := fmt.Sprintf("gateway/%s/rx", mac.String())
return b.publish(topic, rxPacket)
return b.publish(mac, b.UplinkTemplate, rxPacket)
}

// PublishGatewayStats publishes a GatewayStatsPacket to the MQTT broker.
func (b *Backend) PublishGatewayStats(mac lorawan.EUI64, stats gw.GatewayStatsPacket) error {
topic := fmt.Sprintf("gateway/%s/stats", mac.String())
return b.publish(topic, stats)
return b.publish(mac, b.StatsTemplate, stats)
}

// PublishGatewayTXAck publishes a TX ack to the MQTT broker.
func (b *Backend) PublishGatewayTXAck(mac lorawan.EUI64, ack gw.TXAck) error {
topic := fmt.Sprintf("gateway/%s/ack", mac.String())
return b.publish(topic, ack)
return b.publish(mac, b.AckTemplate, ack)
}

func (b *Backend) publish(topic string, v interface{}) error {
func (b *Backend) publish(mac lorawan.EUI64, topicTemplate *template.Template, v interface{}) error {
topic := bytes.NewBuffer(nil)
if err := topicTemplate.Execute(topic, struct{ MAC lorawan.EUI64 }{mac}); err != nil {
return errors.Wrap(err, "execute template error")
}

bytes, err := json.Marshal(v)
if err != nil {
return err
}
log.WithField("topic", topic).Info("backend: publishing packet")
if token := b.conn.Publish(topic, 0, false, bytes); token.Wait() && token.Error() != nil {
log.WithField("topic", topic.String()).Info("backend: publishing packet")
if token := b.conn.Publish(topic.String(), 0, false, bytes); token.Wait() && token.Error() != nil {
return token.Error()
}
return nil
Expand Down
12 changes: 11 additions & 1 deletion internal/backend/mqttpubsub/backend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"testing"
"time"

"github.com/brocaar/lora-gateway-bridge/internal/config"
"github.com/brocaar/loraserver/api/gw"
"github.com/eclipse/paho.mqtt.golang"
. "github.com/smartystreets/goconvey/convey"
Expand All @@ -22,7 +23,16 @@ func TestBackend(t *testing.T) {
defer c.Disconnect(0)

Convey("Given a new Backend", func() {
backend, err := NewBackend(conf.Server, conf.Username, conf.Password, "", "", "")
backend, err := NewBackend(
conf.Server,
conf.Username,
conf.Password,
"", "", "",
config.C.Backend.MQTT.UplinkTopicTemplate,
config.C.Backend.MQTT.DownlinkTopicTemplate,
config.C.Backend.MQTT.StatsTopicTemplate,
config.C.Backend.MQTT.AckTopicTemplate,
)
So(err, ShouldBeNil)
defer backend.Close()

Expand Down
12 changes: 9 additions & 3 deletions internal/backend/mqttpubsub/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,27 @@ package mqttpubsub
import (
"os"

"github.com/brocaar/lora-gateway-bridge/internal/config"
log "github.com/sirupsen/logrus"
)

func init() {
log.SetLevel(log.ErrorLevel)
}

type config struct {
type conf struct {
Server string
Username string
Password string
}

func getConfig() *config {
c := &config{
func getConfig() *conf {
config.C.Backend.MQTT.DownlinkTopicTemplate = "gateway/{{ .MAC }}/tx"
config.C.Backend.MQTT.UplinkTopicTemplate = "gateway/{{ .MAC }}/rx"
config.C.Backend.MQTT.StatsTopicTemplate = "gateway/{{ .MAC }}/stats"
config.C.Backend.MQTT.AckTopicTemplate = "gateway/{{ .MAC }}/ack"

c := &conf{
Server: "tcp://127.0.0.1:1883",
}

Expand Down
16 changes: 10 additions & 6 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,16 @@ type Config struct {

Backend struct {
MQTT struct {
Server string
Username string
Password string
CACert string `mapstructure:"ca_cert"`
TLSCert string `mapstructure:"tls_cert"`
TLSKey string `mapstructure:"tls_key"`
Server string
Username string
Password string
CACert string `mapstructure:"ca_cert"`
TLSCert string `mapstructure:"tls_cert"`
TLSKey string `mapstructure:"tls_key"`
UplinkTopicTemplate string `mapstructure:"uplink_topic_template"`
DownlinkTopicTemplate string `mapstructure:"downlink_topic_template"`
StatsTopicTemplate string `mapstructure:"stats_topic_template"`
AckTopicTemplate string `mapstructure:"ack_topic_template"`
}
}
}
Expand Down

0 comments on commit 7fc9b2d

Please sign in to comment.