-
Notifications
You must be signed in to change notification settings - Fork 0
/
natslog-server.go
115 lines (103 loc) · 2.57 KB
/
natslog-server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
package main
import (
"io"
"log"
"net/http"
"os"
"regexp"
"strings"
stan "github.com/nats-io/go-nats-streaming"
)
func getEnv(key, fallback string) string {
value, exists := os.LookupEnv(key)
if !exists {
value = fallback
}
return value
}
func connectionCloser(c io.Closer) {
if err := c.Close(); err != nil {
log.Printf("close error: %s", err)
}
}
func main() {
log.Println("Started natslog")
httpEnabled := getEnv("HTTP_ENABLED", "true")
httpPort := getEnv("HTTP_PORT", "80")
log.Println("Starting httpd server")
if strings.EqualFold(httpEnabled, "true") {
go http.ListenAndServe(":"+httpPort, http.FileServer(http.Dir("/var/log")))
}
if err := run(); err != nil {
log.Fatal(err)
}
}
var lastProcessed map[string]uint64
var conn stan.Conn
func messageHandle(msg *stan.Msg) {
if msg.Sequence > lastProcessed[msg.Subject] {
fileflags := os.O_WRONLY | os.O_APPEND | os.O_CREATE
f, err := os.OpenFile("/var/log/"+msg.Subject+".log", fileflags, 0660)
if err != nil {
log.Fatalf("error opening file: %v", err)
}
defer f.Close()
_, ferr := f.Write(msg.Data)
if ferr != nil {
log.Fatalf("error writing file: %v", ferr)
}
lastProcessed[msg.Subject] = msg.Sequence
}
msg.Ack()
}
func registerNewHandle(msg *stan.Msg) {
reg, err := regexp.Compile("[^a-zA-Z0-9]+")
if err != nil {
log.Fatal(err)
}
subject := reg.ReplaceAllString(string(msg.Data), "")
natsDurableName := getEnv("NATS_DURABLE_NAME", "natslog-server") + subject
log.Printf("Subscribing to new service %s %s", subject, natsDurableName)
_, err = conn.Subscribe(
subject,
messageHandle,
stan.DurableName(natsDurableName),
stan.MaxInflight(1),
stan.SetManualAckMode(),
)
if err != nil {
log.Printf("error subscribing file: %v", err)
}
msg.Ack()
}
func run() error {
lastProcessed = make(map[string]uint64)
clusterName := getEnv("CLUSTER_NAME", "test-cluster")
natsServer := getEnv("NATS_SERVER", "nats://localhost:4222")
natsClientName := getEnv("NATS_CLIENT_NAME", "natslog-server")
natsDurableName := getEnv("NATS_DURABLE_NAME", "natslog-server")
natslogSubsribeSubject := getEnv("NATSLOG_SUBSCRIBE_SUBJECT", "natslog.subscribe")
log.Printf("Connecting to %s", natsServer)
c, err := stan.Connect(
clusterName,
natsClientName,
stan.NatsURL(natsServer),
)
if err != nil {
return err
}
conn = c
defer connectionCloser(conn)
sub, err := conn.Subscribe(
natslogSubsribeSubject,
registerNewHandle,
stan.DurableName(natsDurableName),
stan.MaxInflight(1),
stan.SetManualAckMode(),
)
if err != nil {
return err
}
defer connectionCloser(sub)
select {}
}