forked from influxdata/telegraf
-
Notifications
You must be signed in to change notification settings - Fork 2
/
client.go
140 lines (125 loc) · 3.06 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
package amqp
import (
"crypto/tls"
"errors"
"fmt"
"log"
"math/rand"
"net"
"time"
"github.com/streadway/amqp"
)
type ClientConfig struct {
brokers []string
exchange string
exchangeType string
exchangePassive bool
exchangeDurable bool
exchangeArguments amqp.Table
encoding string
headers amqp.Table
deliveryMode uint8
tlsConfig *tls.Config
timeout time.Duration
auth []amqp.Authentication
}
type client struct {
conn *amqp.Connection
channel *amqp.Channel
config *ClientConfig
}
// Connect opens a connection to one of the brokers at random
func Connect(config *ClientConfig) (*client, error) {
client := &client{
config: config,
}
p := rand.Perm(len(config.brokers))
for _, n := range p {
broker := config.brokers[n]
log.Printf("D! Output [amqp] connecting to %q", broker)
conn, err := amqp.DialConfig(
broker, amqp.Config{
TLSClientConfig: config.tlsConfig,
SASL: config.auth, // if nil, it will be PLAIN taken from url
Dial: func(network, addr string) (net.Conn, error) {
return net.DialTimeout(network, addr, config.timeout)
},
})
if err == nil {
client.conn = conn
log.Printf("D! Output [amqp] connected to %q", broker)
break
}
log.Printf("D! Output [amqp] error connecting to %q - %s", broker, err.Error())
}
if client.conn == nil {
return nil, errors.New("could not connect to any broker")
}
channel, err := client.conn.Channel()
if err != nil {
return nil, fmt.Errorf("error opening channel: %v", err)
}
client.channel = channel
err = client.DeclareExchange()
if err != nil {
return nil, err
}
return client, nil
}
func (c *client) DeclareExchange() error {
if c.config.exchange == "" {
return nil
}
var err error
if c.config.exchangePassive {
err = c.channel.ExchangeDeclarePassive(
c.config.exchange,
c.config.exchangeType,
c.config.exchangeDurable,
false, // delete when unused
false, // internal
false, // no-wait
c.config.exchangeArguments,
)
} else {
err = c.channel.ExchangeDeclare(
c.config.exchange,
c.config.exchangeType,
c.config.exchangeDurable,
false, // delete when unused
false, // internal
false, // no-wait
c.config.exchangeArguments,
)
}
if err != nil {
return fmt.Errorf("error declaring exchange: %v", err)
}
return nil
}
func (c *client) Publish(key string, body []byte) error {
// Note that since the channel is not in confirm mode, the absence of
// an error does not indicate successful delivery.
return c.channel.Publish(
c.config.exchange, // exchange
key, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
Headers: c.config.headers,
ContentType: "text/plain",
ContentEncoding: c.config.encoding,
Body: body,
DeliveryMode: c.config.deliveryMode,
})
}
func (c *client) Close() error {
if c.conn == nil {
return nil
}
err := c.conn.Close()
if err != nil && err != amqp.ErrClosed {
return err
}
return nil
}