-
Notifications
You must be signed in to change notification settings - Fork 1
/
ekanite.slide
249 lines (192 loc) · 7.28 KB
/
ekanite.slide
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
Ekanite
The syslog server with built-in search
Philip O'Toole
GoSF October 19th 2016
http://www.philipotoole.com/
@general_order24
* About me
- Director of Data Platform Engineering at Percolate.
- Previously Director of Engineering and core developer with InfluxDB.
- Led the backend team that built Loggly's 2nd generation indexing and search platform.
- Big fan of Go, databases, and distributed systems.
* About Ekanite
The goal of Ekanite is to do a couple of things, and do them well -- accept log messages over the network, and make it easy to search the messages. What it lacks in feature, it makes up for in focus.
You can find the source code at [[https://github.com/ekanite/ekanite]].
The current release is v1.1.1.
* Why?
* Why another log search system?
- Built very large scale log search systems in the past.
- It can be quite involved - networking, indexing, search, retention, sharding, and performance.
- Could it all be done in a single program?
- Single binary Go program would make deployment really easy.
- A detailed demonstration of building a search system.
- New use case for bleve.
* What is bleve?
* bleve
- bleve is an indexing and full-text search library for Go.
- Supports text analysis and faceting.
- Straightforward to use.
func main() {
// create a new index...ignoring all errors.
mapping := bleve.NewIndexMapping()
index, _ := bleve.New("example.bleve", mapping)
// index some data
index.Index(identifier, "hello, world!")
// search for some text
query := bleve.NewMatchQuery("world")
search := bleve.NewSearchRequest(query)
searchResults, _ := index.Search(search)
}
- bleve is to Go what Lucene is to Java.
* The design of a log search system
* Ekanite architecture
.image images/ekanite-arch.png _ 600
- The Ekanite engine receives log messages and routes to indexes.
- The engine also receives queries, performs searches, and returns results.
* Indexing by time
.image images/ekanite-indexing.png _ 600
- And index is a _logical_ concept, grouping physical bleve-based _shards_.
- Indexing by time makes search quicker, and retention easier to enforce.
- This diagram shows 3 particular hours, but time extends in both directions forever.
* Why do we batch? Why do we shard?
.image images/bleve-sharding.jpeg _ 900
* Demo
* Go patterns in action
* Decoupling input and indexing
- Input subsystem accepts a buffered channel, to which it sends parsed log lines.
// input/collector.go
func (s *TCPCollector) handleConnection(conn net.Conn, c chan<- *Event) {
.
.
// Log line available?
if match {
stats.Add("tcpEventsRx", 1)
if s.parser.Parse(bytes.NewBufferString(log).Bytes()) {
c <- &Event{
Text: string(s.parser.Raw),
Parsed: s.parser.Result,
ReceptionTime: time.Now().UTC(),
Sequence: atomic.AddInt64(&sequenceNumber, 1),
SourceIP: conn.RemoteAddr().String(),
}
}
}
.
.
}
* Parallel indexing
// engine.go
func (e *Engine) Index(events []*Event) error {
.
.
// De-multiplex the batch into sub-batches, one sub-batch for each Index.
subBatches := make(map[*Index][]Document, 0)
.
.
// Index each batch in parallel.
for index, subBatch := range subBatches {
wg.Add(1)
go func(i *Index, b []Document) {
defer wg.Done()
i.Index(b)
}(index, subBatch)
}
wg.Wait()
.
.
}
* Parallel sharding
// index.go
// Index indexes the slice of documents into the index. It takes care of all shard routing.
func (i *Index) Index(documents []Document) error {
var wg sync.WaitGroup
shardBatches := make(map[*Shard][]Document, 0)
for _, d := range documents {
shard := i.Shard(d.ID())
shardBatches[shard] = append(shardBatches[shard], d)
}
// Index each batch in parallel.
for shard, batch := range shardBatches {
wg.Add(1)
go func(s *Shard, b []Document) {
defer wg.Done()
s.Index(b)
}(shard, batch)
}
wg.Wait()
return nil
}
* Results returned over a channel
// engine.go
// Search performs a search.
func (e *Engine) Search(query string) (<-chan string, error) {
e.mu.RLock()
defer e.mu.RUnlock()
c := make(chan string, 1)
go func() {
// Sequentially search each index, starting with the earliest in time.
// This could be done in parallel but more sorting would be required.
for i := len(e.indexes) - 1; i >= 0; i-- {
e.Logger.Printf("searching index %s", e.indexes[i].Path())
ids, _ := e.indexes[i].Search(query)
for _, id := range ids {
b, _ := e.indexes[i].Document(id)
c <- string(b)
}
}
close(c)
}()
return c, nil
}
* Sorting hits by time
- Ekanite performs time-based sorting in the application.
- Earlier versions of bleve did not support sorting on custom fields.
- Newer versions now do.
- Complex sort method on _DocIDs_.
// Search performs a search of the index using the given query. Returns IDs of documents
// which satisfy all queries. Returns Doc IDs in sorted order, ascending.
func (i *Index) Search(q string) (DocIDs, error) {
query := bleve.NewQueryStringQuery(q)
searchRequest := bleve.NewSearchRequest(query)
searchRequest.Size = maxSearchHitSize
searchResults, _ := i.Alias.Search(searchRequest)
docIDs := make(DocIDs, 0, len(searchResults.Hits))
for _, d := range searchResults.Hits {
docIDs = append(docIDs, DocID(d.ID))
}
sort.Sort(docIDs)
return docIDs, nil
}
* Retention enforcement is straightforward
// engine.go
// runRetentionEnforcement periodically runs retention enforcement.
func (e *Engine) runRetentionEnforcement() {
defer e.wg.Done()
for {
select {
case <-e.done:
return
case <-time.After(RetentionCheckInterval):
stats.Add("retentionEnforcementRun", 1)
e.enforceRetention()
}
}
}
- Shards are deleted from disk and references removed from the engine.
* Next steps
Ekanite is software, and software is never finished.
- Use storage engine other than BoltDB.
- Performance improvements, both CPU and RAM.
- Better query language support
- Better dependency management.
- A fully-featured CLI.
* What Ekanite can do
With it you've got an easy-to-deploy and maintain log search system.
* References
- [[http://www.philipotoole.com/designing-a-search-system-for-log-data-part-i]]
- [[https://github.com/ekanite/ekanite]]
- [[http://www.blevesearch.com]]
- [[https://github.com/blevesearch/bleve]]
- [[http://www.philipotoole.com/increasing-bleve-performance-sharding]]
- [[https://github.com/otoolep/bleve-bench]]
- [[https://github.com/ekanite/gosf-ekanite]]