-
Notifications
You must be signed in to change notification settings - Fork 23
/
daemon.go
85 lines (75 loc) · 1.96 KB
/
daemon.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
package xray
import (
"sync"
"time"
)
// Daemon is background daemon for sending segments.
// This struct stores segments and sends segment to AWS X-Ray in each checkpoint timing.
type Daemon struct {
flushSegments func([]*Segment) error
spoolMu sync.Mutex
spool []*Segment
checkpointSize int
checkpointInterval time.Duration
stopSignal chan struct{}
}
// NewDaemon creates new Daemon.
// size is number of segments to send AWS API in single checkpoint.
// interval is the time of checkpoint interval.
// fn is function called in each checkpoint, to sends segments to AWS API.
func NewDaemon(size int, interval time.Duration, fn func([]*Segment) error) *Daemon {
if size < 1 {
size = 10
}
if interval == 0 {
interval = 1 * time.Second
}
return &Daemon{
spool: make([]*Segment, 0, 4096),
checkpointSize: size,
checkpointInterval: interval,
stopSignal: make(chan struct{}),
flushSegments: fn,
}
}
// Add adds segment data into daemon.
func (d *Daemon) Add(segments ...*Segment) {
d.spoolMu.Lock()
d.spool = append(d.spool, segments...)
d.spoolMu.Unlock()
}
// Flush gets segments from the internal spool and execute flushSegments function.
func (d *Daemon) Flush() {
d.spoolMu.Lock()
var segments []*Segment
segments, d.spool = shiftSegment(d.spool, d.checkpointSize)
d.spoolMu.Unlock()
d.flushSegments(segments)
}
// shiftSegment retrieves segments.
func shiftSegment(slice []*Segment, size int) (part []*Segment, all []*Segment) {
l := len(slice)
if l <= size {
return slice, slice[:0]
}
return slice[:size], slice[size:]
}
// Run sets timer to flush data in each checkpoint as a background daemon.
func (d *Daemon) Run() {
ticker := time.NewTicker(d.checkpointInterval)
go func() {
for {
select {
case <-ticker.C:
d.Flush()
case <-d.stopSignal:
ticker.Stop()
return
}
}
}()
}
// Stop stops daemon.
func (d *Daemon) Stop() {
d.stopSignal <- struct{}{}
}