diff --git a/go.mod b/go.mod index f8befd7f..74184621 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,7 @@ require ( github.com/golang/protobuf v1.5.4 github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.6.0 - github.com/mochi-mqtt/server/v2 v2.4.6 + github.com/mochi-mqtt/server/v2 v2.6.5 github.com/onsi/ginkgo v1.16.5 github.com/onsi/gomega v1.32.0 github.com/openshift/build-machinery-go v0.0.0-20240419090851-af9c868bcf52 @@ -63,7 +63,6 @@ require ( github.com/spf13/pflag v1.0.5 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.26.0 // indirect - golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 // indirect golang.org/x/net v0.23.0 // indirect golang.org/x/sync v0.6.0 // indirect golang.org/x/sys v0.18.0 // indirect diff --git a/go.sum b/go.sum index 79f8a687..8d91ac1f 100644 --- a/go.sum +++ b/go.sum @@ -130,8 +130,8 @@ github.com/moby/sys/mountinfo v0.6.2 h1:BzJjoreD5BMFNmD9Rus6gdd1pLuecOFPt8wC+Vyg github.com/moby/sys/mountinfo v0.6.2/go.mod h1:IJb6JQeOklcdMU9F5xQ8ZALD+CUr5VlGpwtX+VE0rpI= github.com/moby/term v0.0.0-20221205130635-1aeaba878587 h1:HfkjXDfhgVaN5rmueG8cL8KKeFNecRCXFhaJ2qZ5SKA= github.com/moby/term v0.0.0-20221205130635-1aeaba878587/go.mod h1:8FzsFHVUBGZdbDsJw/ot+X+d5HLUbvklYLJ9uGfcI3Y= -github.com/mochi-mqtt/server/v2 v2.4.6 h1:3iaQLG4hD/2vSh0Rwu4+h//KUcWR2zAKQIxhJuoJmCg= -github.com/mochi-mqtt/server/v2 v2.4.6/go.mod h1:M1lZnLbyowXUyQBIlHYlX1wasxXqv/qFWwQxAzfphwA= +github.com/mochi-mqtt/server/v2 v2.6.5 h1:9PiQ6EJt/Dx0ut0Fuuir4F6WinO/5Bpz9szujNwm+q8= +github.com/mochi-mqtt/server/v2 v2.6.5/go.mod h1:TqztjKGO0/ArOjJt9x9idk0kqPT3CVN8Pb+l+PS5Gdo= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= @@ -214,8 +214,8 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 h1:k/i9J1pBpvlfR+9QsetwPyERsqu1GIbi967PQMq3Ivc= -golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1/go.mod h1:V1LtkGg67GoY2N1AnLN78QLrzxkLyJw7RJb1gzOOz9w= +golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df h1:UA2aFVmmsIlefxMk29Dp2juaUSth8Pyn3Tq5Y5mJGME= +golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df/go.mod h1:FXUEEKJgO7OQYeo8N01OfiKP8RXMtf6e8aTskBGqWdc= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= diff --git a/test/integration/cloudevents/suite_test.go b/test/integration/cloudevents/suite_test.go index aeec22d8..a93eeee1 100644 --- a/test/integration/cloudevents/suite_test.go +++ b/test/integration/cloudevents/suite_test.go @@ -69,7 +69,10 @@ var _ = ginkgo.BeforeSuite(func(done ginkgo.Done) { err := mqttBroker.AddHook(new(util.AllowHook), nil) gomega.Expect(err).ToNot(gomega.HaveOccurred()) - err = mqttBroker.AddListener(listeners.NewTCP("mqtt-test-broker", mqttBrokerHost, nil)) + err = mqttBroker.AddListener(listeners.NewTCP(listeners.Config{ + ID: "mqtt-test-broker", + Address: mqttBrokerHost, + })) gomega.Expect(err).ToNot(gomega.HaveOccurred()) serverCertPairs, err = util.NewServerCertPairs() @@ -78,13 +81,16 @@ var _ = ginkgo.BeforeSuite(func(done ginkgo.Done) { certPool, err = util.AppendCAToCertPool(serverCertPairs.CA) gomega.Expect(err).ToNot(gomega.HaveOccurred()) - err = mqttBroker.AddListener(listeners.NewTCP("mqtt-tls-test-broker", mqttTLSBrokerHost, &listeners.Config{ - TLSConfig: &tls.Config{ - ClientCAs: certPool, - ClientAuth: tls.RequireAndVerifyClientCert, - Certificates: []tls.Certificate{serverCertPairs.ServerTLSCert}, - }, - })) + err = mqttBroker.AddListener(listeners.NewTCP( + listeners.Config{ + ID: "mqtt-tls-test-broker", + Address: mqttBrokerHost, + TLSConfig: &tls.Config{ + ClientCAs: certPool, + ClientAuth: tls.RequireAndVerifyClientCert, + Certificates: []tls.Certificate{serverCertPairs.ServerTLSCert}, + }, + })) gomega.Expect(err).ToNot(gomega.HaveOccurred()) go func() { diff --git a/vendor/github.com/mochi-mqtt/server/v2/Dockerfile b/vendor/github.com/mochi-mqtt/server/v2/Dockerfile index dc3d2bd8..7f643c15 100644 --- a/vendor/github.com/mochi-mqtt/server/v2/Dockerfile +++ b/vendor/github.com/mochi-mqtt/server/v2/Dockerfile @@ -11,21 +11,12 @@ RUN go mod download COPY . ./ -RUN go build -o /app/mochi ./cmd - +RUN go build -o /app/mochi ./cmd/docker FROM alpine WORKDIR / COPY --from=builder /app/mochi . -# tcp -EXPOSE 1883 - -# websockets -EXPOSE 1882 - -# dashboard -EXPOSE 8080 - ENTRYPOINT [ "/mochi" ] +CMD ["/cmd/docker", "--config", "config.yaml"] \ No newline at end of file diff --git a/vendor/github.com/mochi-mqtt/server/v2/README-CN.md b/vendor/github.com/mochi-mqtt/server/v2/README-CN.md index 76807a8d..fa869163 100644 --- a/vendor/github.com/mochi-mqtt/server/v2/README-CN.md +++ b/vendor/github.com/mochi-mqtt/server/v2/README-CN.md @@ -45,7 +45,7 @@ MQTT 代表 MQ Telemetry Transport。它是一种发布/订阅、非常简单和 - 通过所有 [Paho互操作性测试](https://github.com/eclipse/paho.mqtt.testing/tree/master/interoperability)(MQTT v5 和 MQTT v3)。 - 超过一千多个经过仔细考虑的单元测试场景。 - 支持 TCP、Websocket(包括 SSL/TLS)和$SYS 服务状态监控。 -- 内置 基于Redis、Badger 和 Bolt 的持久化(使用Hook钩子,你也可以自己创建)。 +- 内置 基于Redis、Badger、Pebble 和 Bolt 的持久化(使用Hook钩子,你也可以自己创建)。 - 内置基于规则的认证和 ACL 权限管理(使用Hook钩子,你也可以自己创建)。 ### 兼容性说明(Compatibility Notes) @@ -60,7 +60,6 @@ MQTT 代表 MQ Telemetry Transport。它是一种发布/订阅、非常简单和 - 请[提出问题](https://github.com/mochi-mqtt/server/issues)来请求新功能或新的hook钩子接口! - 集群支持。 - 统计度量支持。 -- 配置文件支持(支持 Docker)。 ## 快速开始(Quick Start) ### 使用 Go 运行服务端 @@ -77,18 +76,49 @@ go build -o mqtt && ./mqtt ```sh docker pull mochimqtt/server 或者 -docker run mochimqtt/server +docker run -v $(pwd)/config.yaml:/config.yaml mochimqtt/server ``` -我们还在积极完善这部分的工作,现在正在实现使用[配置文件的启动](https://github.com/orgs/mochi-mqtt/projects/2)方式。更多关于 Docker 的支持正在[这里](https://github.com/orgs/mochi-mqtt/discussions/281#discussion-5544545)和[这里](https://github.com/orgs/mochi-mqtt/discussions/209)进行讨论。如果你有在这个场景下使用 Mochi-MQTT,也可以参与到讨论中来。 - -我们提供了一个简单的 Dockerfile,用于运行 cmd/main.go 中的 Websocket(:1882)、TCP(:1883) 和服务端状态信息(:8080)这三个服务监听: +一般情况下,您可以使用基于文件的方式来配置服务端,只需指定一个有效的 yaml 或 json 配置文件。 +我们提供了一个简单的 Dockerfile,用于运行 [cmd/main.go](cmd/main.go) 中的 Websocket(:1882)、TCP(:1883) 和服务端状态信息(:8080)这三个网络服务,它使用了一个 allow-all 的鉴权策略(Hook)。 ```sh docker build -t mochi:latest . -docker run -p 1883:1883 -p 1882:1882 -p 8080:8080 mochi:latest +docker run -p 1883:1883 -p 1882:1882 -p 8080:8080 -v $(pwd)/config.yaml:/config.yaml mochi:latest +``` + +### 基于文件的配置 +你可以使用基于文件的配置与 Docker 镜像(上节所述)一起使用,或者通过运行编译好的可执行文件并使用 `--config=config.yaml` 或 `--config=config.json` 指定配置文件。 + +配置文件使得服务端更易于管理和维护。你可以启用和配置内置的钩子(hooks)和监听器(listeners),并指定服务器的一些选项(options)和能力(compatibilities): + +```yaml +listeners: + - type: "tcp" + id: "tcp12" + address: ":1883" + - type: "ws" + id: "ws1" + address: ":1882" + - type: "sysinfo" + id: "stats" + address: ":1880" +hooks: + auth: + allow_all: true +options: + inline_client: true ``` +你可以参考请 [examples/config](examples/config) 中的示例,以了解所有可用的配置。 +有一些需要注意的地方: + +1. 如果你使用基于文件的配置,现在支持配置的hook类型只有auth、storage、debug这三种,每种类型的钩子只能有一个。 +2. 你只能在基于文件的配置中使用内置钩子(mochi-mqtt里面默认已经存在的hook,你自己创建的不算),因为钩子的配置需要先跟conf.toml的结构匹配。 +3. 你只能使用内置监听器(listeners),原因同上。 + +如果你需要实现自定义的钩子(Hooks)或监听器(listeners),请使用 [cmd/main.go](cmd/main.go) 中那样的传统方式来实现。 + ## 使用 Mochi MQTT 进行开发 ### 将Mochi MQTT作为包导入使用 @@ -119,7 +149,7 @@ func main() { _ = server.AddHook(new(auth.AllowHook), nil) // 在标1883端口上创建一个 TCP 服务端。 - tcp := listeners.NewTCP("t1", ":1883", nil) + tcp := listeners.NewTCP(listeners.Config{ID: "t1", Address: ":1883"}) err := server.AddListener(tcp) if err != nil { log.Fatal(err) @@ -167,6 +197,7 @@ func main() { server := mqtt.New(&mqtt.Options{ Capabilities: mqtt.Capabilities{ MaximumSessionExpiryInterval: 3600, + MaximumClientWritesPending: 3, Compatibilities: mqtt.Compatibilities{ ObscureNotAuthorized: true, }, @@ -177,7 +208,7 @@ server := mqtt.New(&mqtt.Options{ InlineClient: false, }) ``` -请参考 mqtt.Options、mqtt.Capabilities 和 mqtt.Compatibilities 结构体,以查看完整的所有服务端选项。ClientNetWriteBufferSize 和 ClientNetReadBufferSize 可以根据你的需求配置调整每个客户端的内存使用状况。 +请参考 mqtt.Options、mqtt.Capabilities 和 mqtt.Compatibilities 结构体,以查看完整的所有服务端选项。 ClientNetWriteBufferSize 和 ClientNetReadBufferSize 可以根据你的需求配置调整每个客户端的内存使用状况。其中 Capabilities.MaximumClientWritesPending 的大小会影响服务器运行内存占用,如果 IoT 设备同时在线的数量比较多,设置的值很大,尽管没有收发数据,服务器运行内存占用也会增加很多,默认该数值为 1024*8 ,可以根据实际情况调整该参数。 ### 默认配置说明(Default Configuration Notes) @@ -197,6 +228,7 @@ server := mqtt.New(&mqtt.Options{ | 访问控制 | [mochi-mqtt/server/hooks/auth . Auth](hooks/auth/auth.go) | 基于规则的访问权限控制。 | | 数据持久性 | [mochi-mqtt/server/hooks/storage/bolt](hooks/storage/bolt/bolt.go) | 使用 [BoltDB](https://dbdb.io/db/boltdb) 进行持久性存储(已弃用)。 | | 数据持久性 | [mochi-mqtt/server/hooks/storage/badger](hooks/storage/badger/badger.go) | 使用 [BadgerDB](https://github.com/dgraph-io/badger) 进行持久性存储。 | +| 数据持久性 | [mochi-mqtt/server/hooks/storage/pebble](hooks/storage/pebble/pebble.go) | 使用 [PebbleDB](https://github.com/cockroachdb/pebble) 进行持久性存储。 | | 数据持久性 | [mochi-mqtt/server/hooks/storage/redis](hooks/storage/redis/redis.go) | 使用 [Redis](https://redis.io) 进行持久性存储。 | | 调试跟踪 | [mochi-mqtt/server/hooks/debug](hooks/debug/debug.go) | 调试输出以查看数据包在服务端的链路追踪。 | @@ -299,9 +331,25 @@ if err != nil { ``` 有关 Redis 钩子的工作原理或如何使用它的更多信息,请参阅 [examples/persistence/redis/main.go](examples/persistence/redis/main.go) 或 [hooks/storage/redis](hooks/storage/redis) 。 +#### Pebble DB + +如果您更喜欢基于文件的存储,还有一个 PebbleDB 存储钩子(Hook)可用。它可以以与其他钩子大致相同的方式添加和配置(具有较少的选项)。 + +```go +err := server.AddHook(new(pebble.Hook), &pebble.Options{ + Path: pebblePath, + Mode: pebble.NoSync, +}) +if err != nil { + log.Fatal(err) +} +``` + +有关 pebble 钩子(Hook)的工作原理或如何使用它的更多信息,请参阅 [examples/persistence/pebble/main.go](examples/persistence/pebble/main.go) 或 [hooks/storage/pebble](hooks/storage/pebble)。 + #### Badger DB -如果您更喜欢基于文件的存储,还有一个 BadgerDB 存储钩子(Hook)可用。它可以以与其他钩子大致相同的方式添加和配置(具有较少的选项)。 +同样是基于文件的存储,还有一个 BadgerDB 存储钩子(Hook)可用。它可以以与其他钩子大致相同的方式添加和配置。 ```go err := server.AddHook(new(badger.Hook), &badger.Options{ @@ -367,7 +415,7 @@ if err != nil { ### 内联客户端 (Inline Client v2.4.0+支持) -现在可以通过使用内联客户端功能直接在服务端上订阅主题和发布消息。内联客户端是内置在服务端中的特殊的客户端,可以在服务端的配置中启用: +现在可以通过使用内联客户端功能直接在服务端上订阅主题和发布消息。目前,内联客户端暂时还不支持共享订阅。内联客户端是内置在服务端中的特殊的客户端,可以在服务端的配置中启用: ```go server := mqtt.New(&mqtt.Options{ diff --git a/vendor/github.com/mochi-mqtt/server/v2/README-JP.md b/vendor/github.com/mochi-mqtt/server/v2/README-JP.md index 3cb9d868..5b8b27cc 100644 --- a/vendor/github.com/mochi-mqtt/server/v2/README-JP.md +++ b/vendor/github.com/mochi-mqtt/server/v2/README-JP.md @@ -119,7 +119,7 @@ func main() { _ = server.AddHook(new(auth.AllowHook), nil) // Create a TCP listener on a standard port. - tcp := listeners.NewTCP("t1", ":1883", nil) + tcp := listeners.NewTCP(listeners.Config{ID: "t1", Address: ":1883"}) err := server.AddListener(tcp) if err != nil { log.Fatal(err) @@ -168,6 +168,7 @@ TLSを設定するには`*listeners.Config`を渡すことができます。 server := mqtt.New(&mqtt.Options{ Capabilities: mqtt.Capabilities{ MaximumSessionExpiryInterval: 3600, + MaximumClientWritesPending: 3, Compatibilities: mqtt.Compatibilities{ ObscureNotAuthorized: true, }, @@ -181,6 +182,7 @@ server := mqtt.New(&mqtt.Options{ mqtt.Options、mqtt.Capabilities、mqtt.Compatibilitiesの構造体はオプションの理解に役立ちます。 必要に応じて`ClientNetWriteBufferSize`と`ClientNetReadBufferSize`はクライアントの使用するメモリに合わせて設定できます。 +`Capabilities.MaximumClientWritesPending`のサイズは、サーバーのメモリ使用量に影響を与えます。IoTデバイスが同時にオンラインで多数存在する場合、また設定値が非常に大きい場合、データの送受信がなくても、サーバーのメモリ使用量は大幅に増加します。デフォルト値は1024*8で、実際の状況に応じてこのパラメータを調整することができます。 ### デフォルト設定に関する注意事項 diff --git a/vendor/github.com/mochi-mqtt/server/v2/README.md b/vendor/github.com/mochi-mqtt/server/v2/README.md index 41ef4be2..b3d0ec8b 100644 --- a/vendor/github.com/mochi-mqtt/server/v2/README.md +++ b/vendor/github.com/mochi-mqtt/server/v2/README.md @@ -45,7 +45,7 @@ MQTT stands for [MQ Telemetry Transport](https://en.wikipedia.org/wiki/MQTT). It - Passes all [Paho Interoperability Tests](https://github.com/eclipse/paho.mqtt.testing/tree/master/interoperability) for MQTT v5 and MQTT v3. - Over a thousand carefully considered unit test scenarios. - TCP, Websocket (including SSL/TLS), and $SYS Dashboard listeners. -- Built-in Redis, Badger, and Bolt Persistence using Hooks (but you can also make your own). +- Built-in Redis, Badger, Pebble and Bolt Persistence using Hooks (but you can also make your own). - Built-in Rule-based Authentication and ACL Ledger using Hooks (also make your own). ### Compatibility Notes @@ -60,7 +60,6 @@ Unless it's a critical issue, new releases typically go out over the weekend. - Please [open an issue](https://github.com/mochi-mqtt/server/issues) to request new features or event hooks! - Cluster support. - Enhanced Metrics support. -- File-based server configuration (supporting docker). ## Quick Start ### Running the Broker with Go @@ -77,18 +76,50 @@ You can now pull and run the [official Mochi MQTT image](https://hub.docker.com/ ```sh docker pull mochimqtt/server or -docker run mochimqtt/server +docker run -v $(pwd)/config.yaml:/config.yaml mochimqtt/server ``` -This is a work in progress, and a [file-based configuration](https://github.com/orgs/mochi-mqtt/projects/2) is being developed to better support this implementation. _More substantial docker support is being discussed [here](https://github.com/orgs/mochi-mqtt/discussions/281#discussion-5544545) and [here](https://github.com/orgs/mochi-mqtt/discussions/209). Please join the discussion if you use Mochi-MQTT in this environment._ +For most use cases, you can use File Based Configuration to configure the server, by specifying a valid `yaml` or `json` config file. -A simple Dockerfile is provided for running the [cmd/main.go](cmd/main.go) Websocket, TCP, and Stats server: +A simple Dockerfile is provided for running the [cmd/main.go](cmd/main.go) Websocket, TCP, and Stats server, using the `allow-all` auth hook. ```sh docker build -t mochi:latest . -docker run -p 1883:1883 -p 1882:1882 -p 8080:8080 mochi:latest +docker run -p 1883:1883 -p 1882:1882 -p 8080:8080 -v $(pwd)/config.yaml:/config.yaml mochi:latest ``` +### File Based Configuration +You can use File Based Configuration with either the Docker image (described above), or by running the build binary with the `--config=config.yaml` or `--config=config.json` parameter. + +Configuration files provide a convenient mechanism for easily preparing a server with the most common configurations. You can enable and configure built-in hooks and listeners, and specify server options and compatibilities: + +```yaml +listeners: + - type: "tcp" + id: "tcp12" + address: ":1883" + - type: "ws" + id: "ws1" + address: ":1882" + - type: "sysinfo" + id: "stats" + address: ":1880" +hooks: + auth: + allow_all: true +options: + inline_client: true +``` + +Please review the examples found in [examples/config](examples/config) for all available configuration options. + +There are a few conditions to note: +1. If you use file-based configuration, the supported hook types for configuration are currently limited to auth, storage, and debug. Each type of hook can only have one instance. +2. You can only use built in hooks with file-based configuration, as the type and configuration structure needs to be known by the server in order for it to be applied. +3. You can only use built in listeners, for the reasons above. + +If you need to implement custom hooks or listeners, please do so using the traditional manner indicated in [cmd/main.go](cmd/main.go). + ## Developing with Mochi MQTT ### Importing as a package Importing Mochi MQTT as a package requires just a few lines of code to get started. @@ -118,7 +149,7 @@ func main() { _ = server.AddHook(new(auth.AllowHook), nil) // Create a TCP listener on a standard port. - tcp := listeners.NewTCP("t1", ":1883", nil) + tcp := listeners.NewTCP(listeners.Config{ID: "t1", Address: ":1883"}) err := server.AddListener(tcp) if err != nil { log.Fatal(err) @@ -167,6 +198,7 @@ A number of configurable options are available which can be used to alter the be server := mqtt.New(&mqtt.Options{ Capabilities: mqtt.Capabilities{ MaximumSessionExpiryInterval: 3600, + MaximumClientWritesPending: 3, Compatibilities: mqtt.Compatibilities{ ObscureNotAuthorized: true, }, @@ -178,7 +210,7 @@ server := mqtt.New(&mqtt.Options{ }) ``` -Review the mqtt.Options, mqtt.Capabilities, and mqtt.Compatibilities structs for a comprehensive list of options. `ClientNetWriteBufferSize` and `ClientNetReadBufferSize` can be configured to adjust memory usage per client, based on your needs. +Review the mqtt.Options, mqtt.Capabilities, and mqtt.Compatibilities structs for a comprehensive list of options. `ClientNetWriteBufferSize` and `ClientNetReadBufferSize` can be configured to adjust memory usage per client, based on your needs. The size of `Capabilities.MaximumClientWritesPending` will affect the memory usage of the server. If the number of IoT devices online at the same time is large, and the set value is very large, even if there is no data transmission, the memory usage of the server will increase a lot. The default value is 1024*8, and this parameter can be adjusted according to the actual situation. ### Default Configuration Notes @@ -197,6 +229,7 @@ Hooks are stackable - you can add multiple hooks to a server, and they will be r | Access Control | [mochi-mqtt/server/hooks/auth . Auth](hooks/auth/auth.go) | Rule-based access control ledger. | | Persistence | [mochi-mqtt/server/hooks/storage/bolt](hooks/storage/bolt/bolt.go) | Persistent storage using [BoltDB](https://dbdb.io/db/boltdb) (deprecated). | | Persistence | [mochi-mqtt/server/hooks/storage/badger](hooks/storage/badger/badger.go) | Persistent storage using [BadgerDB](https://github.com/dgraph-io/badger). | +| Persistence | [mochi-mqtt/server/hooks/storage/pebble](hooks/storage/pebble/pebble.go) | Persistent storage using [PebbleDB](https://github.com/cockroachdb/pebble). | | Persistence | [mochi-mqtt/server/hooks/storage/redis](hooks/storage/redis/redis.go) | Persistent storage using [Redis](https://redis.io). | | Debugging | [mochi-mqtt/server/hooks/debug](hooks/debug/debug.go) | Additional debugging output to visualise packet flow. | @@ -291,8 +324,21 @@ if err != nil { ``` For more information on how the redis hook works, or how to use it, see the [examples/persistence/redis/main.go](examples/persistence/redis/main.go) or [hooks/storage/redis](hooks/storage/redis) code. +#### Pebble DB +There's also a Pebble Db storage hook if you prefer file-based storage. It can be added and configured in much the same way as the other hooks (with somewhat less options). +```go +err := server.AddHook(new(pebble.Hook), &pebble.Options{ + Path: pebblePath, + Mode: pebble.NoSync, +}) +if err != nil { + log.Fatal(err) +} +``` +For more information on how the pebble hook works, or how to use it, see the [examples/persistence/pebble/main.go](examples/persistence/pebble/main.go) or [hooks/storage/pebble](hooks/storage/pebble) code. + #### Badger DB -There's also a BadgerDB storage hook if you prefer file based storage. It can be added and configured in much the same way as the other hooks (with somewhat less options). +Similarly, for file-based storage, there is also a BadgerDB storage hook available. It can be added and configured in much the same way as the other hooks. ```go err := server.AddHook(new(badger.Hook), &badger.Options{ Path: badgerPath, @@ -354,7 +400,7 @@ The function signatures for all the hooks and `mqtt.Hook` interface can be found If you are building a persistent storage hook, see the existing persistent hooks for inspiration and patterns. If you are building an auth hook, you will need `OnACLCheck` and `OnConnectAuthenticate`. ### Inline Client (v2.4.0+) -It's now possible to subscribe and publish to topics directly from the embedding code, by using the `inline client` feature. The Inline Client is an embedded client which operates as part of the server, and can be enabled in the server options: +It's now possible to subscribe and publish to topics directly from the embedding code, by using the `inline client` feature. Currently, the inline client does not support shared subscriptions. The Inline Client is an embedded client which operates as part of the server, and can be enabled in the server options: ```go server := mqtt.New(&mqtt.Options{ InlineClient: true, diff --git a/vendor/github.com/mochi-mqtt/server/v2/clients.go b/vendor/github.com/mochi-mqtt/server/v2/clients.go index dca82254..dacff7ed 100644 --- a/vendor/github.com/mochi-mqtt/server/v2/clients.go +++ b/vendor/github.com/mochi-mqtt/server/v2/clients.go @@ -417,6 +417,11 @@ func (cl *Client) StopCause() error { return cl.State.stopCause.Load().(error) } +// StopTime returns the the time the client disconnected in unix time, else zero. +func (cl *Client) StopTime() int64 { + return atomic.LoadInt64(&cl.State.disconnected) +} + // Closed returns true if client connection is closed. func (cl *Client) Closed() bool { return cl.State.open == nil || cl.State.open.Err() != nil diff --git a/vendor/github.com/mochi-mqtt/server/v2/config.yaml b/vendor/github.com/mochi-mqtt/server/v2/config.yaml new file mode 100644 index 00000000..9bd37983 --- /dev/null +++ b/vendor/github.com/mochi-mqtt/server/v2/config.yaml @@ -0,0 +1,15 @@ +listeners: + - type: "tcp" + id: "tcp1" + address: ":1883" + - type: "ws" + id: "ws1" + address: ":1882" + - type: "sysinfo" + id: "stats" + address: ":1880" +hooks: + auth: + allow_all: true +options: + inline_client: true diff --git a/vendor/github.com/mochi-mqtt/server/v2/hooks.go b/vendor/github.com/mochi-mqtt/server/v2/hooks.go index 25b625dc..4da709f7 100644 --- a/vendor/github.com/mochi-mqtt/server/v2/hooks.go +++ b/vendor/github.com/mochi-mqtt/server/v2/hooks.go @@ -62,6 +62,12 @@ var ( ErrInvalidConfigType = errors.New("invalid config type provided") ) +// HookLoadConfig contains the hook and configuration as loaded from a configuration (usually file). +type HookLoadConfig struct { + Hook Hook + Config any +} + // Hook provides an interface of handlers for different events which occur // during the lifecycle of the broker. type Hook interface { @@ -70,6 +76,7 @@ type Hook interface { Init(config any) error Stop() error SetOpts(l *slog.Logger, o *HookOptions) + OnStarted() OnStopped() OnConnectAuthenticate(cl *Client, pk packets.Packet) bool diff --git a/vendor/github.com/mochi-mqtt/server/v2/hooks/storage/storage.go b/vendor/github.com/mochi-mqtt/server/v2/hooks/storage/storage.go index eb57508f..c8b7d5df 100644 --- a/vendor/github.com/mochi-mqtt/server/v2/hooks/storage/storage.go +++ b/vendor/github.com/mochi-mqtt/server/v2/hooks/storage/storage.go @@ -25,6 +25,12 @@ var ( ErrDBFileNotOpen = errors.New("db file not open") ) +// Serializable is an interface for objects that can be serialized and deserialized. +type Serializable interface { + UnmarshalBinary([]byte) error + MarshalBinary() (data []byte, err error) +} + // Client is a storable representation of an MQTT client. type Client struct { Will ClientWill `json:"will"` // will topic and payload data if applicable @@ -40,28 +46,28 @@ type Client struct { // ClientProperties contains a limited set of the mqtt v5 properties specific to a client connection. type ClientProperties struct { - AuthenticationData []byte `json:"authenticationData"` - User []packets.UserProperty `json:"user"` - AuthenticationMethod string `json:"authenticationMethod"` - SessionExpiryInterval uint32 `json:"sessionExpiryInterval"` - MaximumPacketSize uint32 `json:"maximumPacketSize"` - ReceiveMaximum uint16 `json:"receiveMaximum"` - TopicAliasMaximum uint16 `json:"topicAliasMaximum"` - SessionExpiryIntervalFlag bool `json:"sessionExpiryIntervalFlag"` - RequestProblemInfo byte `json:"requestProblemInfo"` - RequestProblemInfoFlag bool `json:"requestProblemInfoFlag"` - RequestResponseInfo byte `json:"requestResponseInfo"` + AuthenticationData []byte `json:"authenticationData,omitempty"` + User []packets.UserProperty `json:"user,omitempty"` + AuthenticationMethod string `json:"authenticationMethod,omitempty"` + SessionExpiryInterval uint32 `json:"sessionExpiryInterval,omitempty"` + MaximumPacketSize uint32 `json:"maximumPacketSize,omitempty"` + ReceiveMaximum uint16 `json:"receiveMaximum,omitempty"` + TopicAliasMaximum uint16 `json:"topicAliasMaximum,omitempty"` + SessionExpiryIntervalFlag bool `json:"sessionExpiryIntervalFlag,omitempty"` + RequestProblemInfo byte `json:"requestProblemInfo,omitempty"` + RequestProblemInfoFlag bool `json:"requestProblemInfoFlag,omitempty"` + RequestResponseInfo byte `json:"requestResponseInfo,omitempty"` } // ClientWill contains a will message for a client, and limited mqtt v5 properties. type ClientWill struct { - Payload []byte `json:"payload"` - User []packets.UserProperty `json:"user"` - TopicName string `json:"topicName"` - Flag uint32 `json:"flag"` - WillDelayInterval uint32 `json:"willDelayInterval"` - Qos byte `json:"qos"` - Retain bool `json:"retain"` + Payload []byte `json:"payload,omitempty"` + User []packets.UserProperty `json:"user,omitempty"` + TopicName string `json:"topicName,omitempty"` + Flag uint32 `json:"flag,omitempty"` + WillDelayInterval uint32 `json:"willDelayInterval,omitempty"` + Qos byte `json:"qos,omitempty"` + Retain bool `json:"retain,omitempty"` } // MarshalBinary encodes the values into a json string. @@ -79,29 +85,29 @@ func (d *Client) UnmarshalBinary(data []byte) error { // Message is a storable representation of an MQTT message (specifically publish). type Message struct { - Properties MessageProperties `json:"properties"` // - - Payload []byte `json:"payload"` // the message payload (if retained) - T string `json:"t"` // the data type - ID string `json:"id" storm:"id"` // the storage key - Origin string `json:"origin"` // the id of the client who sent the message - TopicName string `json:"topic_name"` // the topic the message was sent to (if retained) - FixedHeader packets.FixedHeader `json:"fixedheader"` // the header properties of the message - Created int64 `json:"created"` // the time the message was created in unixtime - Sent int64 `json:"sent"` // the last time the message was sent (for retries) in unixtime (if inflight) - PacketID uint16 `json:"packet_id"` // the unique id of the packet (if inflight) + Properties MessageProperties `json:"properties"` // - + Payload []byte `json:"payload"` // the message payload (if retained) + T string `json:"t,omitempty"` // the data type + ID string `json:"id,omitempty" storm:"id"` // the storage key + Origin string `json:"origin,omitempty"` // the id of the client who sent the message + TopicName string `json:"topic_name,omitempty"` // the topic the message was sent to (if retained) + FixedHeader packets.FixedHeader `json:"fixedheader"` // the header properties of the message + Created int64 `json:"created,omitempty"` // the time the message was created in unixtime + Sent int64 `json:"sent,omitempty"` // the last time the message was sent (for retries) in unixtime (if inflight) + PacketID uint16 `json:"packet_id,omitempty"` // the unique id of the packet (if inflight) } // MessageProperties contains a limited subset of mqtt v5 properties specific to publish messages. type MessageProperties struct { - CorrelationData []byte `json:"correlationData"` - SubscriptionIdentifier []int `json:"subscriptionIdentifier"` - User []packets.UserProperty `json:"user"` - ContentType string `json:"contentType"` - ResponseTopic string `json:"responseTopic"` - MessageExpiryInterval uint32 `json:"messageExpiry"` - TopicAlias uint16 `json:"topicAlias"` - PayloadFormat byte `json:"payloadFormat"` - PayloadFormatFlag bool `json:"payloadFormatFlag"` + CorrelationData []byte `json:"correlationData,omitempty"` + SubscriptionIdentifier []int `json:"subscriptionIdentifier,omitempty"` + User []packets.UserProperty `json:"user,omitempty"` + ContentType string `json:"contentType,omitempty"` + ResponseTopic string `json:"responseTopic,omitempty"` + MessageExpiryInterval uint32 `json:"messageExpiry,omitempty"` + TopicAlias uint16 `json:"topicAlias,omitempty"` + PayloadFormat byte `json:"payloadFormat,omitempty"` + PayloadFormatFlag bool `json:"payloadFormatFlag,omitempty"` } // MarshalBinary encodes the values into a json string. @@ -149,15 +155,15 @@ func (d *Message) ToPacket() packets.Packet { // Subscription is a storable representation of an MQTT subscription. type Subscription struct { - T string `json:"t"` - ID string `json:"id" storm:"id"` - Client string `json:"client"` + T string `json:"t,omitempty"` + ID string `json:"id,omitempty" storm:"id"` + Client string `json:"client,omitempty"` Filter string `json:"filter"` - Identifier int `json:"identifier"` - RetainHandling byte `json:"retain_handling"` + Identifier int `json:"identifier,omitempty"` + RetainHandling byte `json:"retain_handling,omitempty"` Qos byte `json:"qos"` - RetainAsPublished bool `json:"retain_as_pub"` - NoLocal bool `json:"no_local"` + RetainAsPublished bool `json:"retain_as_pub,omitempty"` + NoLocal bool `json:"no_local,omitempty"` } // MarshalBinary encodes the values into a json string. diff --git a/vendor/github.com/mochi-mqtt/server/v2/listeners/http_healthcheck.go b/vendor/github.com/mochi-mqtt/server/v2/listeners/http_healthcheck.go index a82e2e36..fc0f13dc 100644 --- a/vendor/github.com/mochi-mqtt/server/v2/listeners/http_healthcheck.go +++ b/vendor/github.com/mochi-mqtt/server/v2/listeners/http_healthcheck.go @@ -13,24 +13,23 @@ import ( "time" ) +const TypeHealthCheck = "healthcheck" + // HTTPHealthCheck is a listener for providing an HTTP healthcheck endpoint. type HTTPHealthCheck struct { sync.RWMutex id string // the internal id of the listener address string // the network address to bind to - config *Config // configuration values for the listener + config Config // configuration values for the listener listen *http.Server // the http server end uint32 // ensure the close methods are only called once } -// NewHTTPHealthCheck initialises and returns a new HTTP listener, listening on an address. -func NewHTTPHealthCheck(id, address string, config *Config) *HTTPHealthCheck { - if config == nil { - config = new(Config) - } +// NewHTTPHealthCheck initializes and returns a new HTTP listener, listening on an address. +func NewHTTPHealthCheck(config Config) *HTTPHealthCheck { return &HTTPHealthCheck{ - id: id, - address: address, + id: config.ID, + address: config.Address, config: config, } } diff --git a/vendor/github.com/mochi-mqtt/server/v2/listeners/http_sysinfo.go b/vendor/github.com/mochi-mqtt/server/v2/listeners/http_sysinfo.go index 0e064d57..f1860634 100644 --- a/vendor/github.com/mochi-mqtt/server/v2/listeners/http_sysinfo.go +++ b/vendor/github.com/mochi-mqtt/server/v2/listeners/http_sysinfo.go @@ -17,27 +17,26 @@ import ( "github.com/mochi-mqtt/server/v2/system" ) +const TypeSysInfo = "sysinfo" + // HTTPStats is a listener for presenting the server $SYS stats on a JSON http endpoint. type HTTPStats struct { sync.RWMutex id string // the internal id of the listener address string // the network address to bind to - config *Config // configuration values for the listener + config Config // configuration values for the listener listen *http.Server // the http server sysInfo *system.Info // pointers to the server data log *slog.Logger // server logger end uint32 // ensure the close methods are only called once } -// NewHTTPStats initialises and returns a new HTTP listener, listening on an address. -func NewHTTPStats(id, address string, config *Config, sysInfo *system.Info) *HTTPStats { - if config == nil { - config = new(Config) - } +// NewHTTPStats initializes and returns a new HTTP listener, listening on an address. +func NewHTTPStats(config Config, sysInfo *system.Info) *HTTPStats { return &HTTPStats{ - id: id, - address: address, sysInfo: sysInfo, + id: config.ID, + address: config.Address, config: config, } } diff --git a/vendor/github.com/mochi-mqtt/server/v2/listeners/listeners.go b/vendor/github.com/mochi-mqtt/server/v2/listeners/listeners.go index 301dd56e..ded7c370 100644 --- a/vendor/github.com/mochi-mqtt/server/v2/listeners/listeners.go +++ b/vendor/github.com/mochi-mqtt/server/v2/listeners/listeners.go @@ -14,8 +14,10 @@ import ( // Config contains configuration values for a listener. type Config struct { - // TLSConfig is a tls.Config configuration to be used with the listener. - // See examples folder for basic and mutual-tls use. + Type string + ID string + Address string + // TLSConfig is a tls.Config configuration to be used with the listener. See examples folder for basic and mutual-tls use. TLSConfig *tls.Config } diff --git a/vendor/github.com/mochi-mqtt/server/v2/listeners/mock.go b/vendor/github.com/mochi-mqtt/server/v2/listeners/mock.go index 826f80c3..1a67d896 100644 --- a/vendor/github.com/mochi-mqtt/server/v2/listeners/mock.go +++ b/vendor/github.com/mochi-mqtt/server/v2/listeners/mock.go @@ -12,6 +12,8 @@ import ( "log/slog" ) +const TypeMock = "mock" + // MockEstablisher is a function signature which can be used in testing. func MockEstablisher(id string, c net.Conn) error { return nil diff --git a/vendor/github.com/mochi-mqtt/server/v2/listeners/tcp.go b/vendor/github.com/mochi-mqtt/server/v2/listeners/tcp.go index 60bd44a2..014a1822 100644 --- a/vendor/github.com/mochi-mqtt/server/v2/listeners/tcp.go +++ b/vendor/github.com/mochi-mqtt/server/v2/listeners/tcp.go @@ -13,26 +13,24 @@ import ( "log/slog" ) +const TypeTCP = "tcp" + // TCP is a listener for establishing client connections on basic TCP protocol. type TCP struct { // [MQTT-4.2.0-1] sync.RWMutex id string // the internal id of the listener address string // the network address to bind to listen net.Listener // a net.Listener which will listen for new clients - config *Config // configuration values for the listener + config Config // configuration values for the listener log *slog.Logger // server logger end uint32 // ensure the close methods are only called once } -// NewTCP initialises and returns a new TCP listener, listening on an address. -func NewTCP(id, address string, config *Config) *TCP { - if config == nil { - config = new(Config) - } - +// NewTCP initializes and returns a new TCP listener, listening on an address. +func NewTCP(config Config) *TCP { return &TCP{ - id: id, - address: address, + id: config.ID, + address: config.Address, config: config, } } diff --git a/vendor/github.com/mochi-mqtt/server/v2/listeners/unixsock.go b/vendor/github.com/mochi-mqtt/server/v2/listeners/unixsock.go index 5892fc94..23df1304 100644 --- a/vendor/github.com/mochi-mqtt/server/v2/listeners/unixsock.go +++ b/vendor/github.com/mochi-mqtt/server/v2/listeners/unixsock.go @@ -13,21 +13,25 @@ import ( "log/slog" ) +const TypeUnix = "unix" + // UnixSock is a listener for establishing client connections on basic UnixSock protocol. type UnixSock struct { sync.RWMutex id string // the internal id of the listener. address string // the network address to bind to. + config Config // configuration values for the listener listen net.Listener // a net.Listener which will listen for new clients. log *slog.Logger // server logger end uint32 // ensure the close methods are only called once. } -// NewUnixSock initialises and returns a new UnixSock listener, listening on an address. -func NewUnixSock(id, address string) *UnixSock { +// NewUnixSock initializes and returns a new UnixSock listener, listening on an address. +func NewUnixSock(config Config) *UnixSock { return &UnixSock{ - id: id, - address: address, + id: config.ID, + address: config.Address, + config: config, } } diff --git a/vendor/github.com/mochi-mqtt/server/v2/listeners/websocket.go b/vendor/github.com/mochi-mqtt/server/v2/listeners/websocket.go index 58bf74c8..267daf6b 100644 --- a/vendor/github.com/mochi-mqtt/server/v2/listeners/websocket.go +++ b/vendor/github.com/mochi-mqtt/server/v2/listeners/websocket.go @@ -19,6 +19,8 @@ import ( "github.com/gorilla/websocket" ) +const TypeWS = "ws" + var ( // ErrInvalidMessage indicates that a message payload was not valid. ErrInvalidMessage = errors.New("message type not binary") @@ -29,7 +31,7 @@ type Websocket struct { // [MQTT-4.2.0-1] sync.RWMutex id string // the internal id of the listener address string // the network address to bind to - config *Config // configuration values for the listener + config Config // configuration values for the listener listen *http.Server // a http server for serving websocket connections log *slog.Logger // server logger establish EstablishFn // the server's establish connection handler @@ -37,15 +39,11 @@ type Websocket struct { // [MQTT-4.2.0-1] end uint32 // ensure the close methods are only called once } -// NewWebsocket initialises and returns a new Websocket listener, listening on an address. -func NewWebsocket(id, address string, config *Config) *Websocket { - if config == nil { - config = new(Config) - } - +// NewWebsocket initializes and returns a new Websocket listener, listening on an address. +func NewWebsocket(config Config) *Websocket { return &Websocket{ - id: id, - address: address, + id: config.ID, + address: config.Address, config: config, upgrader: &websocket.Upgrader{ Subprotocols: []string{"mqtt"}, diff --git a/vendor/github.com/mochi-mqtt/server/v2/packets/tpackets.go b/vendor/github.com/mochi-mqtt/server/v2/packets/tpackets.go index 267721ee..79a6f583 100644 --- a/vendor/github.com/mochi-mqtt/server/v2/packets/tpackets.go +++ b/vendor/github.com/mochi-mqtt/server/v2/packets/tpackets.go @@ -201,6 +201,7 @@ const ( TDisconnect TDisconnectTakeover TDisconnectMqtt5 + TDisconnectMqtt5DisconnectWithWillMessage TDisconnectSecondConnect TDisconnectReceiveMaximum TDisconnectDropProperties @@ -3781,6 +3782,31 @@ var TPacketData = map[byte]TPacketCases{ }, }, }, + { + Case: TDisconnectMqtt5DisconnectWithWillMessage, + Desc: "mqtt5 disconnect with will message", + Primary: true, + RawBytes: append([]byte{ + Disconnect << 4, 38, // fixed header + CodeDisconnectWillMessage.Code, // Reason Code + 36, // Properties Length + 17, 0, 0, 0, 120, // Session Expiry Interval (17) + 31, 0, 28, // Reason String (31) + }, []byte(CodeDisconnectWillMessage.Reason)...), + Packet: &Packet{ + ProtocolVersion: 5, + FixedHeader: FixedHeader{ + Type: Disconnect, + Remaining: 22, + }, + ReasonCode: CodeDisconnectWillMessage.Code, + Properties: Properties{ + ReasonString: CodeDisconnectWillMessage.Reason, + SessionExpiryInterval: 120, + SessionExpiryIntervalFlag: true, + }, + }, + }, { Case: TDisconnectSecondConnect, Desc: "second connect packet mqtt5", diff --git a/vendor/github.com/mochi-mqtt/server/v2/server.go b/vendor/github.com/mochi-mqtt/server/v2/server.go index e525b161..ffffc5ca 100644 --- a/vendor/github.com/mochi-mqtt/server/v2/server.go +++ b/vendor/github.com/mochi-mqtt/server/v2/server.go @@ -14,6 +14,7 @@ import ( "runtime" "sort" "strconv" + "strings" "sync/atomic" "time" @@ -26,7 +27,7 @@ import ( ) const ( - Version = "2.4.6" // the current server version. + Version = "2.6.5" // the current server version. defaultSysTopicInterval int64 = 1 // the interval between $SYS topic publishes LocalListener = "local" InlineClientId = "inline" @@ -39,30 +40,33 @@ var ( ErrListenerIDExists = errors.New("listener id already exists") // a listener with the same id already exists ErrConnectionClosed = errors.New("connection not open") // connection is closed ErrInlineClientNotEnabled = errors.New("please set Options.InlineClient=true to use this feature") // inline client is not enabled by default + ErrOptionsUnreadable = errors.New("unable to read options from bytes") ) // Capabilities indicates the capabilities and features provided by the server. type Capabilities struct { - MaximumMessageExpiryInterval int64 // maximum message expiry if message expiry is 0 or over - MaximumClientWritesPending int32 // maximum number of pending message writes for a client - MaximumSessionExpiryInterval uint32 // maximum number of seconds to keep disconnected sessions - MaximumPacketSize uint32 // maximum packet size, no limit if 0 - maximumPacketID uint32 // unexported, used for testing only - ReceiveMaximum uint16 // maximum number of concurrent qos messages per client - MaximumInflight uint16 // maximum number of qos > 0 messages can be stored, 0(=8192)-65535 - TopicAliasMaximum uint16 // maximum topic alias value - SharedSubAvailable byte // support of shared subscriptions - MinimumProtocolVersion byte // minimum supported mqtt version - Compatibilities Compatibilities - MaximumQos byte // maximum qos value available to clients - RetainAvailable byte // support of retain messages - WildcardSubAvailable byte // support of wildcard subscriptions - SubIDAvailable byte // support of subscription identifiers + MaximumClients int64 `yaml:"maximum_clients" json:"maximum_clients"` // maximum number of connected clients + MaximumMessageExpiryInterval int64 `yaml:"maximum_message_expiry_interval" json:"maximum_message_expiry_interval"` // maximum message expiry if message expiry is 0 or over + MaximumClientWritesPending int32 `yaml:"maximum_client_writes_pending" json:"maximum_client_writes_pending"` // maximum number of pending message writes for a client + MaximumSessionExpiryInterval uint32 `yaml:"maximum_session_expiry_interval" json:"maximum_session_expiry_interval"` // maximum number of seconds to keep disconnected sessions + MaximumPacketSize uint32 `yaml:"maximum_packet_size" json:"maximum_packet_size"` // maximum packet size, no limit if 0 + maximumPacketID uint32 // unexported, used for testing only + ReceiveMaximum uint16 `yaml:"receive_maximum" json:"receive_maximum"` // maximum number of concurrent qos messages per client + MaximumInflight uint16 `yaml:"maximum_inflight" json:"maximum_inflight"` // maximum number of qos > 0 messages can be stored, 0(=8192)-65535 + TopicAliasMaximum uint16 `yaml:"topic_alias_maximum" json:"topic_alias_maximum"` // maximum topic alias value + SharedSubAvailable byte `yaml:"shared_sub_available" json:"shared_sub_available"` // support of shared subscriptions + MinimumProtocolVersion byte `yaml:"minimum_protocol_version" json:"minimum_protocol_version"` // minimum supported mqtt version + Compatibilities Compatibilities `yaml:"compatibilities" json:"compatibilities"` // version compatibilities the server provides + MaximumQos byte `yaml:"maximum_qos" json:"maximum_qos"` // maximum qos value available to clients + RetainAvailable byte `yaml:"retain_available" json:"retain_available"` // support of retain messages + WildcardSubAvailable byte `yaml:"wildcard_sub_available" json:"wildcard_sub_available"` // support of wildcard subscriptions + SubIDAvailable byte `yaml:"sub_id_available" json:"sub_id_available"` // support of subscription identifiers } // NewDefaultServerCapabilities defines the default features and capabilities provided by the server. func NewDefaultServerCapabilities() *Capabilities { return &Capabilities{ + MaximumClients: math.MaxInt64, // maximum number of connected clients MaximumMessageExpiryInterval: 60 * 60 * 24, // maximum message expiry if message expiry is 0 or over MaximumClientWritesPending: 1024 * 8, // maximum number of pending message writes for a client MaximumSessionExpiryInterval: math.MaxUint32, // maximum number of seconds to keep disconnected sessions @@ -82,43 +86,49 @@ func NewDefaultServerCapabilities() *Capabilities { // Compatibilities provides flags for using compatibility modes. type Compatibilities struct { - ObscureNotAuthorized bool // return unspecified errors instead of not authorized - PassiveClientDisconnect bool // don't disconnect the client forcefully after sending disconnect packet (paho - spec violation) - AlwaysReturnResponseInfo bool // always return response info (useful for testing) - RestoreSysInfoOnRestart bool // restore system info from store as if server never stopped - NoInheritedPropertiesOnAck bool // don't allow inherited user properties on ack (paho - spec violation) + ObscureNotAuthorized bool `yaml:"obscure_not_authorized" json:"obscure_not_authorized"` // return unspecified errors instead of not authorized + PassiveClientDisconnect bool `yaml:"passive_client_disconnect" json:"passive_client_disconnect"` // don't disconnect the client forcefully after sending disconnect packet (paho - spec violation) + AlwaysReturnResponseInfo bool `yaml:"always_return_response_info" json:"always_return_response_info"` // always return response info (useful for testing) + RestoreSysInfoOnRestart bool `yaml:"restore_sys_info_on_restart" json:"restore_sys_info_on_restart"` // restore system info from store as if server never stopped + NoInheritedPropertiesOnAck bool `yaml:"no_inherited_properties_on_ack" json:"no_inherited_properties_on_ack"` // don't allow inherited user properties on ack (paho - spec violation) } // Options contains configurable options for the server. type Options struct { + // Listeners specifies any listeners which should be dynamically added on serve. Used when setting listeners by config. + Listeners []listeners.Config `yaml:"listeners" json:"listeners"` + + // Hooks specifies any hooks which should be dynamically added on serve. Used when setting hooks by config. + Hooks []HookLoadConfig `yaml:"hooks" json:"hooks"` + // Capabilities defines the server features and behaviour. If you only wish to modify // several of these values, set them explicitly - e.g. // server.Options.Capabilities.MaximumClientWritesPending = 16 * 1024 - Capabilities *Capabilities + Capabilities *Capabilities `yaml:"capabilities" json:"capabilities"` // ClientNetWriteBufferSize specifies the size of the client *bufio.Writer write buffer. - ClientNetWriteBufferSize int + ClientNetWriteBufferSize int `yaml:"client_net_write_buffer_size" json:"client_net_write_buffer_size"` // ClientNetReadBufferSize specifies the size of the client *bufio.Reader read buffer. - ClientNetReadBufferSize int + ClientNetReadBufferSize int `yaml:"client_net_read_buffer_size" json:"client_net_read_buffer_size"` - // Logger specifies a custom configured implementation of zerolog to override + // Logger specifies a custom configured implementation of log/slog to override // the servers default logger configuration. If you wish to change the log level, - // of the default logger, you can do so by setting - // server := mqtt.New(nil) + // of the default logger, you can do so by setting: + // server := mqtt.New(nil) // level := new(slog.LevelVar) // server.Slog = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ // Level: level, // })) // level.Set(slog.LevelDebug) - Logger *slog.Logger + Logger *slog.Logger `yaml:"-" json:"-"` // SysTopicResendInterval specifies the interval between $SYS topic updates in seconds. - SysTopicResendInterval int64 + SysTopicResendInterval int64 `yaml:"sys_topic_resend_interval" json:"sys_topic_resend_interval"` // Enable Inline client to allow direct subscribing and publishing from the parent codebase, // with negligible performance difference (disabled by default to prevent confusion in statistics). - InlineClient bool + InlineClient bool `yaml:"inline_client" json:"inline_client"` } // Server is an MQTT broker server. It should be created with server.New() @@ -262,6 +272,17 @@ func (s *Server) AddHook(hook Hook, config any) error { return s.hooks.Add(hook, config) } +// AddHooksFromConfig adds hooks to the server which were specified in the hooks config (usually from a config file). +// New built-in hooks should be added to this list. +func (s *Server) AddHooksFromConfig(hooks []HookLoadConfig) error { + for _, h := range hooks { + if err := s.AddHook(h.Hook, h.Config); err != nil { + return err + } + } + return nil +} + // AddListener adds a new network listener to the server, for receiving incoming client connections. func (s *Server) AddListener(l listeners.Listener) error { if _, ok := s.Listeners.Get(l.ID()); ok { @@ -280,12 +301,55 @@ func (s *Server) AddListener(l listeners.Listener) error { return nil } +// AddListenersFromConfig adds listeners to the server which were specified in the listeners config (usually from a config file). +// New built-in listeners should be added to this list. +func (s *Server) AddListenersFromConfig(configs []listeners.Config) error { + for _, conf := range configs { + var l listeners.Listener + switch strings.ToLower(conf.Type) { + case listeners.TypeTCP: + l = listeners.NewTCP(conf) + case listeners.TypeWS: + l = listeners.NewWebsocket(conf) + case listeners.TypeUnix: + l = listeners.NewUnixSock(conf) + case listeners.TypeHealthCheck: + l = listeners.NewHTTPHealthCheck(conf) + case listeners.TypeSysInfo: + l = listeners.NewHTTPStats(conf, s.Info) + case listeners.TypeMock: + l = listeners.NewMockListener(conf.ID, conf.Address) + default: + s.Log.Error("listener type unavailable by config", "listener", conf.Type) + continue + } + if err := s.AddListener(l); err != nil { + return err + } + } + return nil +} + // Serve starts the event loops responsible for establishing client connections // on all attached listeners, publishing the system topics, and starting all hooks. func (s *Server) Serve() error { s.Log.Info("mochi mqtt starting", "version", Version) defer s.Log.Info("mochi mqtt server started") + if len(s.Options.Listeners) > 0 { + err := s.AddListenersFromConfig(s.Options.Listeners) + if err != nil { + return err + } + } + + if len(s.Options.Hooks) > 0 { + err := s.AddHooksFromConfig(s.Options.Hooks) + if err != nil { + return err + } + } + if s.hooks.Provides( StoredClients, StoredInflightMessages, @@ -352,6 +416,16 @@ func (s *Server) attachClient(cl *Client, listener string) error { } cl.ParseConnect(listener, pk) + if atomic.LoadInt64(&s.Info.ClientsConnected) >= s.Options.Capabilities.MaximumClients { + if cl.Properties.ProtocolVersion < 5 { + s.SendConnack(cl, packets.ErrServerUnavailable, false, nil) + } else { + s.SendConnack(cl, packets.ErrServerBusy, false, nil) + } + + return packets.ErrServerBusy + } + code := s.validateConnect(cl, pk) // [MQTT-3.1.4-1] [MQTT-3.1.4-2] if code != packets.CodeSuccess { if err := s.SendConnack(cl, code, false, nil); err != nil { @@ -486,7 +560,7 @@ func (s *Server) validateConnect(cl *Client, pk packets.Packet) packets.Code { // connection ID. If clean is true, the state of any previously existing client // session is abandoned. func (s *Server) inheritClientSession(pk packets.Packet, cl *Client) bool { - if existing, ok := s.Clients.Get(pk.Connect.ClientIdentifier); ok { + if existing, ok := s.Clients.Get(cl.ID); ok { _ = s.DisconnectClient(existing, packets.ErrSessionTakenOver) // [MQTT-3.1.4-3] if pk.Connect.Clean || (existing.Properties.Clean && existing.Properties.ProtocolVersion < 5) { // [MQTT-3.1.2-4] [MQTT-3.1.4-4] s.UnsubscribeClient(existing) @@ -1319,6 +1393,10 @@ func (s *Server) processDisconnect(cl *Client, pk packets.Packet) error { cl.Properties.Props.SessionExpiryIntervalFlag = true } + if pk.ReasonCode == packets.CodeDisconnectWillMessage.Code { // [MQTT-3.1.2.5] Non-normative comment + return packets.CodeDisconnectWillMessage + } + s.loop.willDelayed.Delete(cl.ID) // [MQTT-3.1.3-9] [MQTT-3.1.2-8] cl.Stop(packets.CodeDisconnect) // [MQTT-3.14.4-2] @@ -1611,7 +1689,7 @@ func (s *Server) loadRetained(v []storage.Message) { // than their given expiry intervals. func (s *Server) clearExpiredClients(dt int64) { for id, client := range s.Clients.GetAll() { - disconnected := atomic.LoadInt64(&client.State.disconnected) + disconnected := client.StopTime() if disconnected == 0 { continue } diff --git a/vendor/github.com/mochi-mqtt/server/v2/topics.go b/vendor/github.com/mochi-mqtt/server/v2/topics.go index 63f704d2..5d599e00 100644 --- a/vendor/github.com/mochi-mqtt/server/v2/topics.go +++ b/vendor/github.com/mochi-mqtt/server/v2/topics.go @@ -514,7 +514,7 @@ func (x *TopicsIndex) seek(filter string, d int) *particle { // trim removes empty filter particles from the index. func (x *TopicsIndex) trim(n *particle) { - for n.parent != nil && n.retainPath == "" && n.particles.len()+n.subscriptions.Len()+n.shared.Len() == 0 { + for n.parent != nil && n.retainPath == "" && n.particles.len()+n.subscriptions.Len()+n.shared.Len()+n.inlineSubscriptions.Len() == 0 { key := n.key n = n.parent n.particles.delete(key) diff --git a/vendor/modules.txt b/vendor/modules.txt index 9861df79..65c5cb51 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -122,7 +122,7 @@ github.com/mailru/easyjson/jlexer github.com/mailru/easyjson/jwriter # github.com/moby/term v0.0.0-20221205130635-1aeaba878587 ## explicit; go 1.18 -# github.com/mochi-mqtt/server/v2 v2.4.6 +# github.com/mochi-mqtt/server/v2 v2.6.5 ## explicit; go 1.21 github.com/mochi-mqtt/server/v2 github.com/mochi-mqtt/server/v2/hooks/storage @@ -224,8 +224,6 @@ go.uber.org/zap/internal/exit go.uber.org/zap/internal/pool go.uber.org/zap/internal/stacktrace go.uber.org/zap/zapcore -# golang.org/x/exp v0.0.0-20230522175609-2e198f4a06a1 -## explicit; go 1.20 # golang.org/x/net v0.23.0 ## explicit; go 1.18 golang.org/x/net/html