-
Notifications
You must be signed in to change notification settings - Fork 0
/
beaver.go
125 lines (108 loc) · 3.26 KB
/
beaver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package main
import (
"crypto/tls"
"encoding/json"
"fmt"
"gopkg.in/yaml.v2"
"io/ioutil"
"log"
"time"
"github.com/confluentinc/confluent-kafka-go/kafka"
irc "github.com/fluffle/goirc/client"
)
type conf struct {
Server string `yaml:"server"`
Port int64 `yaml:"port"`
UseSSL bool `yaml:"useSSL"`
Channels []string `yaml:"channels"`
Nick string `yaml:"nick"`
BootstrapServers string `yaml:"bootstrapServers"` // Connection string for Kafka cluster
TopicPrefix string `yaml:"topicPrefix"` // Topic prefix; topics will be made in the form {prefix}_{channel}
}
type message struct {
Channel string
Nick string
Text string
Time int64
}
var p *kafka.Producer
func (c *conf) getConf() *conf {
yamlFile, err := ioutil.ReadFile("conf.yaml")
if err != nil {
log.Printf("yamlFile.Get err #%v ", err)
}
err = yaml.Unmarshal(yamlFile, c)
if err != nil {
log.Fatalf("Unmarshal: %v", err)
}
return c
}
func bootstrapKafkaProducer(cfg *kafka.ConfigMap) {
if p == nil {
cfg.Set("client.id=beaver")
var err error
p, err = kafka.NewProducer(cfg)
if err != nil {
panic(err)
}
}
}
func main() {
var beaverConf conf
beaverConf.getConf()
log.Printf("Loaded configuration for Beaver!")
blob, _ := yaml.Marshal(beaverConf)
log.Printf("Using configuration: \n%s", string(blob))
log.Printf("Starting Kafka producer")
bootstrapKafkaProducer(&kafka.ConfigMap{"bootstrap.servers": beaverConf.BootstrapServers})
defer p.Close()
log.Printf("Created Kafka producer")
cfg := irc.NewConfig(beaverConf.Nick)
cfg.SSL = beaverConf.UseSSL
cfg.SSLConfig = &tls.Config{ServerName: beaverConf.Server}
cfg.Server = fmt.Sprintf("%s:%d", beaverConf.Server, beaverConf.Port)
cfg.NewNick = func(n string) string { return n + "^" }
cfg.PingFreq = 60 * time.Second // Server's ping timeout thingy on irc.ocf is 120s
c := irc.Client(cfg)
c.HandleFunc(irc.CONNECTED, func(conn *irc.Conn, line *irc.Line) {
for _, channel := range beaverConf.Channels {
log.Printf("Joining channel %s", channel)
c.Join(channel)
}
})
deliveryChan := make(chan kafka.Event)
c.HandleFunc(irc.PRIVMSG, func(conn *irc.Conn, line *irc.Line) {
if line.Public() {
message := message{
Channel: line.Target(),
Nick: line.Nick,
Text: line.Text(),
Time: line.Time.Unix(),
}
blob, _ := json.Marshal(message)
log.Printf("Received message: %s", blob)
topic := fmt.Sprintf("%s_%s", beaverConf.TopicPrefix, line.Target()[1:])
log.Printf("Attempting to produce message to topic %s", topic)
p.Produce(&kafka.Message{
TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
Value: []byte(blob),
}, deliveryChan)
e := <-deliveryChan
m := e.(*kafka.Message)
if m.TopicPartition.Error != nil {
log.Printf("Delivery failed: %v\n", m.TopicPartition.Error)
} else {
log.Printf("Delivered message to topic %s [%d] at offset %v\n",
*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
}
}
})
quit := make(chan bool)
c.HandleFunc(irc.DISCONNECTED, func(conn *irc.Conn, line *irc.Line) {
quit <- true
})
if err := c.Connect(); err != nil {
log.Printf("Connection error: %s\n", err.Error())
}
<-quit
}