-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathbroker.go
110 lines (97 loc) · 2.89 KB
/
broker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package magina
import (
"crypto/tls"
"log"
"net"
"time"
"github.com/streadway/amqp"
)
// AuthenticateFunc AuthorizePublishFunc AuthorizeSubscribeFunc callback functions for authentication and authorization
type AuthenticateFunc func(client *Client, username string, password string) bool
// AuthorizePublishFunc callback functions for authentication and authorization
type AuthorizePublishFunc func(client *Client, topic string) bool
// AuthorizeSubscribeFunc callback functions for authentication and authorization
type AuthorizeSubscribeFunc func(client *Client, topic string) bool
// OnClientOnlineCB callback function when client connect success
type OnClientOnlineCB func(client *Client)
// OnClientOfflineCB callback function when losing heartbeat from client
type OnClientOfflineCB func(client *Client)
// OnClientHeartbeatCB callback function when receive a client heartbeat
type OnClientHeartbeatCB func(client *Client)
// Broker is MQTT main service
type Broker struct {
// server address to listen
Addr string
// rabbit uri
RabbitURI string
// extend the broker to suport RPC. (WARNNING: NOT standard MQTT feature)
SuportRPC bool
// rabbitmq connection
RabbitConnection *amqp.Connection
// if use mqtts, set this
TLSConfig *tls.Config
// callbacks
Authenticate AuthenticateFunc
AuthorizePublish AuthorizePublishFunc
AuthorizeSubscribe AuthorizeSubscribeFunc
OnClientOnline OnClientOnlineCB
OnClientOffline OnClientOfflineCB
OnClientHeartbeat OnClientHeartbeatCB
}
// InitRabbitConn init rabbitmq connection.
func (b *Broker) InitRabbitConn() {
if b.RabbitConnection == nil {
conn, err := amqp.Dial(b.RabbitURI)
if err != nil {
time.Sleep(time.Second * 3)
failOnError(err)
}
b.RabbitConnection = conn
}
// check for disconnection
go func() {
for {
// Waits here for the connection to be closed
log.Printf("connection closing: %s\n", <-b.RabbitConnection.NotifyClose(make(chan *amqp.Error)))
// it's time to reconnect
for {
conn, err := amqp.Dial(b.RabbitURI)
if err != nil {
log.Printf("reconnect error: %v, try again after 3 seconds...\n", err)
time.Sleep(time.Second * 3)
} else {
log.Printf("reconnect rabbitmq success! \n")
b.RabbitConnection = conn
break
}
}
}
}()
}
func (b *Broker) handleConnection(conn net.Conn) {
client := &Client{
Conn: conn,
Broker: b,
}
client.Serve()
}
// ListenAndServe serves for mqtt connections.
func (b *Broker) ListenAndServe() {
b.InitRabbitConn()
log.Println("listen and serve mqtt broker on " + b.Addr)
var listener net.Listener
var err error
if b.TLSConfig != nil {
listener, err = tls.Listen("tcp", b.Addr, b.TLSConfig)
} else {
listener, err = net.Listen("tcp", b.Addr)
}
failOnError(err)
for {
conn, err := listener.Accept()
if err != nil {
log.Println("error accepting new connection: " + err.Error())
}
go b.handleConnection(conn)
}
}