-
Notifications
You must be signed in to change notification settings - Fork 49
/
Copy pathringbuf.go
119 lines (101 loc) · 2.68 KB
/
ringbuf.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
package manager
import (
"errors"
"fmt"
"github.com/cilium/ebpf/ringbuf"
"github.com/gojue/ebpfmanager/kernel"
)
// RingbufMapOptions - Perf map specific options
type RingbufMapOptions struct {
// Watermark - The reader will start processing samples once their sizes in the perf ring buffer
// exceed this value. Must be smaller than PerfRingBufferSize. Defaults to the manager value if not set.
Watermark int
// PerfErrChan - Perf reader error channel
PerfErrChan chan error
// DataHandler - Callback function called when a new sample was retrieved from the perf
// ring buffer.
DataHandler func(CPU int, data []byte, perfMap *RingbufMap, manager *Manager)
// PerfMapStats - Perf map statistics event like nr Read errors, lost samples,
// RawSamples bytes count. Need to be initialized via manager.NewPerfMapStats()
PerfMapStats *PerfMapStats
// DumpHandler - Callback function called when manager.Dump() is called
// and dump the current state (human readable)
DumpHandler func(perfMap *PerfMap, manager *Manager) string
}
// RingbufMap - ring buffer reader wrapper
type RingbufMap struct {
manager *Manager
ringBufReader *ringbuf.Reader
// Map - A PerfMap has the same features as a normal Map
Map
RingbufMapOptions
}
func (m *RingbufMap) Init(manager *Manager) error {
kv, err := kernel.HostVersion()
if err != nil {
// nothing to do.
}
if kv < kernel.VersionCode(5, 8, 0) {
return ErrRingbufNotSupported
}
m.manager = manager
if m.DataHandler == nil {
return fmt.Errorf("no DataHandler set for %s", m.Name)
}
if m.Watermark == 0 {
m.Watermark = manager.options.DefaultWatermark
}
// Initialize the underlying map structure
if err := m.Map.Init(manager); err != nil {
return err
}
return nil
}
func (m *RingbufMap) Start() error {
m.stateLock.Lock()
defer m.stateLock.Unlock()
if m.state == running {
return nil
}
if m.state < initialized {
return ErrMapNotInitialized
}
var err error
m.ringBufReader, err = ringbuf.NewReader(m.array)
if err != nil {
return err
}
go func() {
m.manager.wg.Add(1)
for {
record, err := m.ringBufReader.Read()
if err != nil {
if errors.Is(err, ringbuf.ErrClosed) {
m.manager.wg.Done()
return
}
}
m.DataHandler(0, record.RawSample, m, m.manager)
}
}()
m.state = running
return nil
}
func (m *RingbufMap) Stop(cleanup MapCleanupType) error {
m.stateLock.Lock()
defer m.stateLock.Unlock()
if m.state < running {
return nil
}
err := m.ringBufReader.Close()
m.state = initialized
// close underlying map
if errTmp := m.Map.close(cleanup); errTmp != nil {
if err == nil {
err = errTmp
} else {
err = fmt.Errorf("error%v, %s", errTmp, err.Error())
}
}
return err
}