From 7fc9b2de39fc9b611fe330821326029c3d105979 Mon Sep 17 00:00:00 2001 From: Orne Brocaar Date: Mon, 26 Feb 2018 13:38:51 +0100 Subject: [PATCH] Make mqtt topics configurable. --- cmd/lora-gateway-bridge/cmd/configfile.go | 14 ++++ cmd/lora-gateway-bridge/cmd/root.go | 34 ++++++++-- docs/config.toml | 2 +- docs/content/install/config.md | 14 ++++ docs/content/overview/changelog.md | 7 ++ internal/backend/mqttpubsub/backend.go | 72 ++++++++++++++++----- internal/backend/mqttpubsub/backend_test.go | 12 +++- internal/backend/mqttpubsub/base_test.go | 12 +++- internal/config/config.go | 16 +++-- 9 files changed, 149 insertions(+), 34 deletions(-) diff --git a/cmd/lora-gateway-bridge/cmd/configfile.go b/cmd/lora-gateway-bridge/cmd/configfile.go index f5b9f1ce..8c6b3567 100644 --- a/cmd/lora-gateway-bridge/cmd/configfile.go +++ b/cmd/lora-gateway-bridge/cmd/configfile.go @@ -34,6 +34,20 @@ skip_crc_check = {{ .PacketForwarder.SkipCRCCheck }} # Configuration for the MQTT backend. [backend.mqtt] +# MQTT topic templates for the different MQTT topics. +# +# The meaning of these topics are documented at: +# https://docs.loraserver.io/lora-gateway-bridge/use/data/ +# +# The default values match the default expected configuration of the +# LoRa Server MQTT backend. Therefore only change these values when +# absolutely needed. +# Use "{{ "{{ .MAC }}" }}" as an substitution for the LoRa gateway MAC. +uplink_topic_template="gateway/{{ "{{ .MAC }}" }}/rx" +downlink_topic_template="gateway/{{ "{{ .MAC }}" }}/tx" +stats_topic_template="gateway/{{ "{{ .MAC }}" }}/stats" +ack_topic_template="gateway/{{ "{{ .MAC }}" }}/ack" + # MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws) server="{{ .Backend.MQTT.Server }}" diff --git a/cmd/lora-gateway-bridge/cmd/root.go b/cmd/lora-gateway-bridge/cmd/root.go index 06b7d389..0f8d7089 100644 --- a/cmd/lora-gateway-bridge/cmd/root.go +++ b/cmd/lora-gateway-bridge/cmd/root.go @@ -37,14 +37,14 @@ func init() { rootCmd.PersistentFlags().Int("log-level", 4, "debug=5, info=4, error=2, fatal=1, panic=0") // for backwards compatibility - rootCmd.PersistentFlags().String("udp-bind", "0.0.0.0:1700", "ip:port to bind the UDP listener to") - rootCmd.PersistentFlags().String("mqtt-server", "tcp://127.0.0.1:1883", "mqtt server (e.g. scheme://host:port where scheme is tcp, ssl or ws)") - rootCmd.PersistentFlags().String("mqtt-username", "", "mqtt server username (optional)") - rootCmd.PersistentFlags().String("mqtt-password", "", "mqtt server password (optional)") - rootCmd.PersistentFlags().String("mqtt-ca-cert", "", "mqtt CA certificate file (optional)") + rootCmd.PersistentFlags().String("udp-bind", "", "") + rootCmd.PersistentFlags().String("mqtt-server", "", "") + rootCmd.PersistentFlags().String("mqtt-username", "", "") + rootCmd.PersistentFlags().String("mqtt-password", "", "") + rootCmd.PersistentFlags().String("mqtt-ca-cert", "", "") rootCmd.PersistentFlags().String("mqtt-tls-cert", "", "") rootCmd.PersistentFlags().String("mqtt-tls-key", "", "") - rootCmd.PersistentFlags().Bool("skip-crc-check", false, "skip the CRC status-check of received packets") + rootCmd.PersistentFlags().Bool("skip-crc-check", false, "") rootCmd.PersistentFlags().MarkHidden("udp-bind") rootCmd.PersistentFlags().MarkHidden("mqtt-server") rootCmd.PersistentFlags().MarkHidden("mqtt-username") @@ -76,6 +76,15 @@ func init() { viper.BindPFlag("backend.mqtt.tls_cert", rootCmd.PersistentFlags().Lookup("mqtt-tls-cert")) viper.BindPFlag("backend.mqtt.tls_key", rootCmd.PersistentFlags().Lookup("mqtt-tls-key")) + // default values + viper.SetDefault("packet_forwarder.udp_bind", "0.0.0.0:1700") + + viper.SetDefault("backend.mqtt.uplink_topic_template", "gateway/{{ .MAC }}/rx") + viper.SetDefault("backend.mqtt.downlink_topic_template", "gateway/{{ .MAC }}/tx") + viper.SetDefault("backend.mqtt.stats_topic_template", "gateway/{{ .MAC }}/stats") + viper.SetDefault("backend.mqtt.ack_topic_template", "gateway/{{ .MAC }}/ack") + viper.SetDefault("backend.mqtt.server", "tcp://127.0.0.1:1883") + rootCmd.AddCommand(versionCmd) rootCmd.AddCommand(configCmd) } @@ -99,7 +108,18 @@ func run(cmd *cobra.Command, args []string) error { var pubsub *mqttpubsub.Backend for { var err error - pubsub, err = mqttpubsub.NewBackend(config.C.Backend.MQTT.Server, config.C.Backend.MQTT.Username, config.C.Backend.MQTT.Password, config.C.Backend.MQTT.CACert, config.C.Backend.MQTT.TLSCert, config.C.Backend.MQTT.TLSKey) + pubsub, err = mqttpubsub.NewBackend( + config.C.Backend.MQTT.Server, + config.C.Backend.MQTT.Username, + config.C.Backend.MQTT.Password, + config.C.Backend.MQTT.CACert, + config.C.Backend.MQTT.TLSCert, + config.C.Backend.MQTT.TLSKey, + config.C.Backend.MQTT.UplinkTopicTemplate, + config.C.Backend.MQTT.DownlinkTopicTemplate, + config.C.Backend.MQTT.StatsTopicTemplate, + config.C.Backend.MQTT.AckTopicTemplate, + ) if err == nil { break } diff --git a/docs/config.toml b/docs/config.toml index af7f1951..d1b48858 100644 --- a/docs/config.toml +++ b/docs/config.toml @@ -32,4 +32,4 @@ googleAnalytics = "UA-3512995-9" weight = 4 [params] - version = "2.3.0" + version = "2.3.1" diff --git a/docs/content/install/config.md b/docs/content/install/config.md index c5631176..612b3a89 100644 --- a/docs/content/install/config.md +++ b/docs/content/install/config.md @@ -97,6 +97,20 @@ skip_crc_check = false # Configuration for the MQTT backend. [backend.mqtt] +# MQTT topic templates for the different MQTT topic. +# +# The meaning of these topics are documented at: +# https://docs.loraserver.io/lora-gateway-bridge/use/data/ +# +# The default values match the default expected configuration of the +# LoRa Server MQTT backend. Therefore only change these values when +# absolutely needed. +# Use "{{ .MAC }}" as an substitution for the LoRa gateway MAC. +uplink_topic_template="gateway/{{ .MAC }}/rx" +downlink_topic_template="gateway/{{ .MAC }}/tx" +stats_topic_template="gateway/{{ .MAC }}/stats" +ack_topic_template="gateway/{{ .MAC }}/ack" + # MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws) server="tcp://127.0.0.1:1883" diff --git a/docs/content/overview/changelog.md b/docs/content/overview/changelog.md index 3e3f21dc..d7f48f01 100644 --- a/docs/content/overview/changelog.md +++ b/docs/content/overview/changelog.md @@ -8,6 +8,13 @@ menu: ## Changelog +### 2.3.1 + +**Improvements:** + +* MQTT topics are now configurable through the configuration file. + See [Configuration](https://docs.loraserver.io/lora-gateway-bridge/install/config/). + ### 2.3.0 **Features:** diff --git a/internal/backend/mqttpubsub/backend.go b/internal/backend/mqttpubsub/backend.go index 9a79a0a5..ba5f5b70 100644 --- a/internal/backend/mqttpubsub/backend.go +++ b/internal/backend/mqttpubsub/backend.go @@ -1,17 +1,20 @@ package mqttpubsub import ( + "bytes" "crypto/tls" "crypto/x509" "encoding/json" "fmt" "io/ioutil" "sync" + "text/template" "time" "github.com/brocaar/loraserver/api/gw" "github.com/brocaar/lorawan" "github.com/eclipse/paho.mqtt.golang" + "github.com/pkg/errors" log "github.com/sirupsen/logrus" ) @@ -21,15 +24,42 @@ type Backend struct { txPacketChan chan gw.TXPacketBytes gateways map[lorawan.EUI64]struct{} mutex sync.RWMutex + + UplinkTemplate *template.Template + DownlinkTemplate *template.Template + StatsTemplate *template.Template + AckTemplate *template.Template } // NewBackend creates a new Backend. -func NewBackend(server, username, password, cafile, certFile, certKeyFile string) (*Backend, error) { +func NewBackend(server, username, password, cafile, certFile, certKeyFile, uplinkTopic, downlinkTopic, statsTopic, ackTopic string) (*Backend, error) { + var err error + b := Backend{ txPacketChan: make(chan gw.TXPacketBytes), gateways: make(map[lorawan.EUI64]struct{}), } + b.UplinkTemplate, err = template.New("uplink").Parse(uplinkTopic) + if err != nil { + return nil, errors.Wrap(err, "parse uplink template error") + } + + b.DownlinkTemplate, err = template.New("downlink").Parse(downlinkTopic) + if err != nil { + return nil, errors.Wrap(err, "parse downlink template error") + } + + b.StatsTemplate, err = template.New("stats").Parse(statsTopic) + if err != nil { + return nil, errors.Wrap(err, "parse stats template error") + } + + b.AckTemplate, err = template.New("ack").Parse(ackTopic) + if err != nil { + return nil, errors.Wrap(err, "parse ack template error") + } + opts := mqtt.NewClientOptions() opts.AddBroker(server) opts.SetUsername(username) @@ -114,9 +144,13 @@ func (b *Backend) SubscribeGatewayTX(mac lorawan.EUI64) error { defer b.mutex.Unlock() b.mutex.Lock() - topic := fmt.Sprintf("gateway/%s/tx", mac.String()) - log.WithField("topic", topic).Info("backend: subscribing to topic") - if token := b.conn.Subscribe(topic, 0, b.txPacketHandler); token.Wait() && token.Error() != nil { + topic := bytes.NewBuffer(nil) + if err := b.DownlinkTemplate.Execute(topic, struct{ MAC lorawan.EUI64 }{mac}); err != nil { + return errors.Wrap(err, "execute uplink template error") + } + + log.WithField("topic", topic.String()).Info("backend: subscribing to topic") + if token := b.conn.Subscribe(topic.String(), 0, b.txPacketHandler); token.Wait() && token.Error() != nil { return token.Error() } b.gateways[mac] = struct{}{} @@ -129,9 +163,13 @@ func (b *Backend) UnSubscribeGatewayTX(mac lorawan.EUI64) error { defer b.mutex.Unlock() b.mutex.Lock() - topic := fmt.Sprintf("gateway/%s/tx", mac.String()) - log.WithField("topic", topic).Info("backend: unsubscribing from topic") - if token := b.conn.Unsubscribe(topic); token.Wait() && token.Error() != nil { + topic := bytes.NewBuffer(nil) + if err := b.DownlinkTemplate.Execute(topic, struct{ MAC lorawan.EUI64 }{mac}); err != nil { + return errors.Wrap(err, "execute uplink template error") + } + + log.WithField("topic", topic.String()).Info("backend: unsubscribing from topic") + if token := b.conn.Unsubscribe(topic.String()); token.Wait() && token.Error() != nil { return token.Error() } delete(b.gateways, mac) @@ -140,29 +178,31 @@ func (b *Backend) UnSubscribeGatewayTX(mac lorawan.EUI64) error { // PublishGatewayRX publishes a RX packet to the MQTT broker. func (b *Backend) PublishGatewayRX(mac lorawan.EUI64, rxPacket gw.RXPacketBytes) error { - topic := fmt.Sprintf("gateway/%s/rx", mac.String()) - return b.publish(topic, rxPacket) + return b.publish(mac, b.UplinkTemplate, rxPacket) } // PublishGatewayStats publishes a GatewayStatsPacket to the MQTT broker. func (b *Backend) PublishGatewayStats(mac lorawan.EUI64, stats gw.GatewayStatsPacket) error { - topic := fmt.Sprintf("gateway/%s/stats", mac.String()) - return b.publish(topic, stats) + return b.publish(mac, b.StatsTemplate, stats) } // PublishGatewayTXAck publishes a TX ack to the MQTT broker. func (b *Backend) PublishGatewayTXAck(mac lorawan.EUI64, ack gw.TXAck) error { - topic := fmt.Sprintf("gateway/%s/ack", mac.String()) - return b.publish(topic, ack) + return b.publish(mac, b.AckTemplate, ack) } -func (b *Backend) publish(topic string, v interface{}) error { +func (b *Backend) publish(mac lorawan.EUI64, topicTemplate *template.Template, v interface{}) error { + topic := bytes.NewBuffer(nil) + if err := topicTemplate.Execute(topic, struct{ MAC lorawan.EUI64 }{mac}); err != nil { + return errors.Wrap(err, "execute template error") + } + bytes, err := json.Marshal(v) if err != nil { return err } - log.WithField("topic", topic).Info("backend: publishing packet") - if token := b.conn.Publish(topic, 0, false, bytes); token.Wait() && token.Error() != nil { + log.WithField("topic", topic.String()).Info("backend: publishing packet") + if token := b.conn.Publish(topic.String(), 0, false, bytes); token.Wait() && token.Error() != nil { return token.Error() } return nil diff --git a/internal/backend/mqttpubsub/backend_test.go b/internal/backend/mqttpubsub/backend_test.go index 4ce9efb8..97a27765 100644 --- a/internal/backend/mqttpubsub/backend_test.go +++ b/internal/backend/mqttpubsub/backend_test.go @@ -5,6 +5,7 @@ import ( "testing" "time" + "github.com/brocaar/lora-gateway-bridge/internal/config" "github.com/brocaar/loraserver/api/gw" "github.com/eclipse/paho.mqtt.golang" . "github.com/smartystreets/goconvey/convey" @@ -22,7 +23,16 @@ func TestBackend(t *testing.T) { defer c.Disconnect(0) Convey("Given a new Backend", func() { - backend, err := NewBackend(conf.Server, conf.Username, conf.Password, "", "", "") + backend, err := NewBackend( + conf.Server, + conf.Username, + conf.Password, + "", "", "", + config.C.Backend.MQTT.UplinkTopicTemplate, + config.C.Backend.MQTT.DownlinkTopicTemplate, + config.C.Backend.MQTT.StatsTopicTemplate, + config.C.Backend.MQTT.AckTopicTemplate, + ) So(err, ShouldBeNil) defer backend.Close() diff --git a/internal/backend/mqttpubsub/base_test.go b/internal/backend/mqttpubsub/base_test.go index d9f1a79c..edd70df7 100644 --- a/internal/backend/mqttpubsub/base_test.go +++ b/internal/backend/mqttpubsub/base_test.go @@ -3,6 +3,7 @@ package mqttpubsub import ( "os" + "github.com/brocaar/lora-gateway-bridge/internal/config" log "github.com/sirupsen/logrus" ) @@ -10,14 +11,19 @@ func init() { log.SetLevel(log.ErrorLevel) } -type config struct { +type conf struct { Server string Username string Password string } -func getConfig() *config { - c := &config{ +func getConfig() *conf { + config.C.Backend.MQTT.DownlinkTopicTemplate = "gateway/{{ .MAC }}/tx" + config.C.Backend.MQTT.UplinkTopicTemplate = "gateway/{{ .MAC }}/rx" + config.C.Backend.MQTT.StatsTopicTemplate = "gateway/{{ .MAC }}/stats" + config.C.Backend.MQTT.AckTopicTemplate = "gateway/{{ .MAC }}/ack" + + c := &conf{ Server: "tcp://127.0.0.1:1883", } diff --git a/internal/config/config.go b/internal/config/config.go index 0b1ff9b8..60190400 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -13,12 +13,16 @@ type Config struct { Backend struct { MQTT struct { - Server string - Username string - Password string - CACert string `mapstructure:"ca_cert"` - TLSCert string `mapstructure:"tls_cert"` - TLSKey string `mapstructure:"tls_key"` + Server string + Username string + Password string + CACert string `mapstructure:"ca_cert"` + TLSCert string `mapstructure:"tls_cert"` + TLSKey string `mapstructure:"tls_key"` + UplinkTopicTemplate string `mapstructure:"uplink_topic_template"` + DownlinkTopicTemplate string `mapstructure:"downlink_topic_template"` + StatsTopicTemplate string `mapstructure:"stats_topic_template"` + AckTopicTemplate string `mapstructure:"ack_topic_template"` } } }