-
Notifications
You must be signed in to change notification settings - Fork 7
/
README
123 lines (89 loc) · 5.27 KB
/
README
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
Kafka::KafkaWriter
=================================
Kafka Writer plugin for Bro. This plugin will connect to a Kafka server, and will send JSON-format
logs to the topic specified.
Aside from sending bro logs to Kafka, it will also add two fields to each JSON message, and rename
5 of the fields from the bro native names.
The added fields are:
a "sensor" name as specified in the config, used to identify which bro server originated this log.
and a "type" field, populated with the log type (dns, ssl, conn, etc).
The renamed fields are:
"ts" will become "timestamp",
"id.orig_h"becomes "source_ip",
"id.orig_p" becomes "source_port",
"id.resp_h" becomes "dest_ip", and
"id.resp_p" becomes "dest_port".
Users can override the sensor name as a standard redef in their config. Changing the type field names can
be done by editing the init.bro script for the KafkaLogger (though I don't recommend it). I would like
to make the field renaming configurable in the future, but had problems with the bro table structure
in C++, so those are hard-coded in the C++ for the moment.
(note: timestamps are sent as millisecond timestamps I.e., a UNIX-format timestamp that includees
the milliseconds aka epoch * 1000)
Configuration
================================
In your bro policy, include the logs-to-kafka bro script, and define the particular logs you want
to send to bro, like this:
@load Kafka/KafkaWriter/logs-to-kafka
redef KafkaLogger::logs_to_send = set(HTTP::LOG, Conn::LOG, DNS::LOG, SMTP::LOG);
see the Kafka init.bro file for the full list of logs you can flag to be sent to Kafka. (that file
also contains the clean names that the log files will be called in the "type" field.)
You can also redefine how the bro client will connect to kafka with the following variables:
redef KafkaLogger::topic_name = "logs";
which Kafka topic to send logs to. All logs from bro will be sent to this topic.
redef KafkaLogger::broker_name = "hostname:port,hostname:port";
the name of the kafka broker to send messages to.
This is a comma-separated list of kafka brokers.
format: hostname1:port,hostname2:port
no spaces.
redef KafkaLogger::sensor_name = "brosensor";
The name to add to each message to identify
this sensor in the logs. It will be added to a "sensor"
field in the submitted json.
redef KafkaLogger::client_id = "bro";
This is used internally by Kafka to trace requests and
errors. Kafka advises setting it to identify the
application making the requests/produces. the log name
will be appended to this in the Kafka logs.
redef KafkaLogger::compression_codec = "0";
Kafka can compress messages on the wire. This specifies
which compression codec to use. 0=none, 1=gzip, 2=snappy.
redef KafkaLogger::max_batch_size = "1000";
redef KafkaLogger::max_batch_interval = "10000";
how often to send logs (number of messages, bytes, and seconds).
If you use the Kerberised version of kafka, you'll need to switch this from PLAINTEXT to SASL_PLAINTEXT or SASL_SSL,
make sure you've setup the keytab properly see
https://github.com/edenhill/librdkafka/wiki/Using-SASL-with-librdkafka for more info on how to perform
that, you will also need a version of librdkafka built with ssl and kerberos support.
redef KafkaLogger::security_protocol = "SASL_PLAINTEXT"
redef KafkaLogger::kerberos_service_name = "kafka";
Needs to be redifined to match the service name for Kafka.
redef KafkaLogger::kerberos_keytab = "/etc/security/keytabs/bro.keytab";
Needs to be redefined if your keytab file is stored elsewhere.
redef KafkaLogger::kerberos_principal = "kafka/[email protected]"
Note: the <kafkabroker> must exactly match the hostname part of the broker principal.
If after setting up you get an error about "no common mechs", you proably need to install additionnal modules, on Debian and derivatives
'''
apt-get install libsasl2-modules-gssapi-heimda
'''
Should do the trick.
Lastly, if you are consuming the events with logstash, you will need to set
redef KafkaLogger::logstash_style_timestamp = T;
This will set quotes around the "timestamp" field. Logstash is apparently
looking for the timestamp to be a string, even if you configure it to be
looking for unix timestamp values. (BTW, you will want to configure
logstash with a filter that says:
filter { date { match => ["timestamp", "UNIX_MS"] } } }
in order to get it to parse the timestamps properly.
Requirements
================================
You will need to install the librdkafka library independently of this plugin. The system when building will
attempt to find the librdkafka libraries, but if you put them somewhere unusual, you will need to tell
configure where to find it with the --with-librdkafka option.
Notes & thank yous
===============================
Much of this plugin is based on earlier code from Kurt Grutzmacher who wrote a bro 2.2 logger for kafka.
Also, many thanks for Robin Sommer for answering my beginner-level questions as I started this process.
Lastly, this plugin will create a new connection from Bro to kafka for each log type to be sent to
kafka. This may cause a lot of connections. At the moment, the plugin does not try to re-connect if
it loses all connectivity to kafka, but the librdkafka library should handle failover between brokers
transparently.