Skip to content

Commit

Permalink
feat(tail/log): dedup same log string in scanner (#165)
Browse files Browse the repository at this point in the history
* test(query/log/tail): more unit tests for scan

Signed-off-by: Gyuho Lee <[email protected]>

* feat(query/log/tail): simplify scan line processing logic

Signed-off-by: Gyuho Lee <[email protected]>

* test(query/log/tail): add benchmarks

Signed-off-by: Gyuho Lee <[email protected]>

* feat(query/log/tail): support dedup in scanner

Signed-off-by: Gyuho Lee <[email protected]>

* test(log/tail): add more unit tests

Signed-off-by: Gyuho Lee <[email protected]>

* feat(diagnose): scan logs with dedup

Signed-off-by: Gyuho Lee <[email protected]>

* feat(dmesg, diagnose): dedup by default

Signed-off-by: Gyuho Lee <[email protected]>

* fix imports

Signed-off-by: Gyuho Lee <[email protected]>

---------

Signed-off-by: Gyuho Lee <[email protected]>
  • Loading branch information
gyuho authored Nov 13, 2024
1 parent fabe1f0 commit 66720c8
Show file tree
Hide file tree
Showing 8 changed files with 810 additions and 46 deletions.
1 change: 1 addition & 0 deletions cmd/gpud/command/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func cmdLogs(cliContext *cli.Context) error {
lines := make([]string, 0, tailLines)
_, err := tail.Scan(
rootCtx,
tail.WithDedup(true),
tail.WithFile(logFile),
tail.WithLinesToTail(tailLines),
tail.WithPerLineFunc(func(line []byte) {
Expand Down
1 change: 1 addition & 0 deletions components/diagnose/diagnose.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ func run(ctx context.Context, dir string, opts ...OpOption) error {
}
matched, err := query_log_tail.Scan(
ctx,
query_log_tail.WithDedup(true),
query_log_tail.WithCommands(defaultDmesgCfg.Log.Scan.Commands),
query_log_tail.WithLinesToTail(5000),
query_log_tail.WithSelectFilter(defaultDmesgCfg.Log.SelectFilters...),
Expand Down
1 change: 1 addition & 0 deletions components/diagnose/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ func Scan(ctx context.Context, opts ...OpOption) error {
}
matched, err := query_log_tail.Scan(
ctx,
query_log_tail.WithDedup(true),
query_log_tail.WithCommands(defaultDmesgCfg.Log.Scan.Commands),
query_log_tail.WithLinesToTail(op.lines),
query_log_tail.WithSelectFilter(defaultDmesgCfg.Log.SelectFilters...),
Expand Down
1 change: 1 addition & 0 deletions components/dmesg/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func (c *Component) TailScan() (*State, error) {
if c.cfg != nil && c.cfg.Log.Scan != nil {
items, err := c.logPoller.TailScan(
c.rootCtx,
query_log_tail.WithDedup(true),
query_log_tail.WithFile(c.cfg.Log.Scan.File),
query_log_tail.WithCommands(c.cfg.Log.Scan.Commands),
query_log_tail.WithLinesToTail(c.cfg.Log.Scan.LinesToTail),
Expand Down
10 changes: 10 additions & 0 deletions components/query/log/tail/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type Op struct {
commands [][]string

linesToTail int
dedup bool

perLineFunc func([]byte)

Expand Down Expand Up @@ -83,6 +84,15 @@ func WithLinesToTail(n int) OpOption {
}
}

// If true, dedup lines by the log line string.
// This is useful for logs that have the same message
// repeated multiple times with the same timestamp.
func WithDedup(dedup bool) OpOption {
return func(op *Op) {
op.dedup = dedup
}
}

// Called for each line.
func WithPerLineFunc(f func([]byte)) OpOption {
return func(op *Op) {
Expand Down
119 changes: 73 additions & 46 deletions components/query/log/tail/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,18 @@ import (
"errors"
"io"
"os"
"sync"

"github.com/leptonai/gpud/log"
"github.com/leptonai/gpud/pkg/process"
)

var dedupMapPool = sync.Pool{
New: func() interface{} {
return make(map[string]struct{}, 200)
},
}

// Scan scans the file or commands output from the end of the file
// and return the number of matched lines.
// It returns the lines in the reverse order that evaluates true
Expand Down Expand Up @@ -70,14 +77,62 @@ func Scan(ctx context.Context, opts ...OpOption) (int, error) {
}
fileSize := stat.Size()

// pre-allocate buffers
// use regular buffers for chunk and line reading
chunkBuf := make([]byte, 4096)
lineBuf := make([]byte, 0, 256)

// read backwards from the end of the file
scannedLines := 0
matchedLines := 0

var dedupedLines map[string]struct{}
if op.dedup {
// only use sync.Pool for dedup map
dedupedLines = dedupMapPool.Get().(map[string]struct{})
defer func() {
// clear the map before returning it to pool
for k := range dedupedLines {
delete(dedupedLines, k)
}
dedupMapPool.Put(dedupedLines)
}()
}

processLine := func(buf []byte) error {
reverse(buf)
scannedLines++

if op.perLineFunc != nil {
op.perLineFunc(buf)
}

shouldInclude, matchedFilter, err := op.applyFilter(buf)
if err != nil {
return err
}
if !shouldInclude {
return nil
}

if op.dedup {
if _, ok := dedupedLines[string(buf)]; ok {
// skip duplicate
return nil
}

dedupedLines[string(buf)] = struct{}{}
}

matchedLines++
parsedTime, err := op.parseTime(buf)
if err != nil {
return err
}
op.processMatched(buf, parsedTime, matchedFilter)

return nil
}

defer func() {
log.Logger.Debugw("scanned lines", "lines", scannedLines, "matched", matchedLines)
}()
Expand All @@ -96,61 +151,33 @@ func Scan(ctx context.Context, opts ...OpOption) (int, error) {
}

for i := chunkSize - 1; i >= 0; i-- {
if chunkBuf[i] == '\n' {
if len(lineBuf) > 0 {
reverse(lineBuf)
scannedLines++

if op.perLineFunc != nil {
op.perLineFunc(lineBuf)
}

shouldInclude, matchedFilter, err := op.applyFilter(lineBuf)
if err != nil {
return 0, err
}
if shouldInclude {
matchedLines++

parsedTime, err := op.parseTime(lineBuf)
if err != nil {
return 0, err
}
op.processMatched(lineBuf, parsedTime, matchedFilter)
}

lineBuf = lineBuf[:0]
}
} else {
if scannedLines == op.linesToTail {
return matchedLines, nil
}

// still processing a line
if chunkBuf[i] != '\n' {
lineBuf = append(lineBuf, chunkBuf[i])
continue
}

if scannedLines == op.linesToTail {
return matchedLines, nil
// end of a line but no content
if len(lineBuf) == 0 {
continue
}
}
}

if len(lineBuf) > 0 && scannedLines < op.linesToTail {
reverse(lineBuf)
if err := processLine(lineBuf); err != nil {
return 0, err
}

if op.perLineFunc != nil {
op.perLineFunc(lineBuf)
lineBuf = lineBuf[:0]
}
}

shouldInclude, matchedFilter, err := op.applyFilter(lineBuf)
if err != nil {
if len(lineBuf) > 0 && scannedLines < op.linesToTail {
if err := processLine(lineBuf); err != nil {
return 0, err
}
if shouldInclude {
matchedLines++

parsedTime, err := op.parseTime(lineBuf)
if err != nil {
return 0, err
}
op.processMatched(lineBuf, parsedTime, matchedFilter)
}
}

return matchedLines, nil
Expand Down
113 changes: 113 additions & 0 deletions components/query/log/tail/scan_benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package tail

import (
"context"
"testing"
"time"

query_log_common "github.com/leptonai/gpud/components/query/log/common"

"k8s.io/utils/ptr"
)

// go test -bench=BenchmarkScan -benchmem
// go test -bench=BenchmarkScan_DmesgLog -benchmem
func BenchmarkScan_DmesgLog(b *testing.B) {
ctx := context.Background()

benchmarks := []struct {
name string
linesToTail int
withFilter bool
dedup bool
}{
{"Tail100NoFilter", 100, false, false},
{"Tail1000NoFilter", 1000, false, false},
{"Tail100WithFilter", 100, true, false},
{"Tail1000WithFilter", 1000, true, false},

{"Tail100NoFilterWithDedup", 100, false, true},
{"Tail1000NoFilterWithDedup", 1000, false, true},
{"Tail100WithFilterWithDedup", 100, true, true},
{"Tail1000WithFilterWithDedup", 1000, true, true},
}

for _, bm := range benchmarks {
b.Run(bm.name, func(b *testing.B) {
var opts []OpOption
opts = append(opts,
WithFile("testdata/dmesg.0.log"),
WithLinesToTail(bm.linesToTail),
WithParseTime(func(line []byte) (time.Time, error) {
return time.Time{}, nil
}),
WithProcessMatched(func(line []byte, _ time.Time, _ *query_log_common.Filter) {}),
)

if bm.withFilter {
opts = append(opts, WithSelectFilter(&query_log_common.Filter{
Substring: ptr.To("error"),
}))
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := Scan(ctx, opts...)
if err != nil {
b.Fatal(err)
}
}
})
}
}

// go test -bench=BenchmarkScan -benchmem
// go test -bench=BenchmarkScan_KubeletLog -benchmem
func BenchmarkScan_KubeletLog(b *testing.B) {
ctx := context.Background()

benchmarks := []struct {
name string
linesToTail int
withFilter bool
dedup bool
}{
{"Tail100NoFilter", 100, false, false},
{"Tail1000NoFilter", 1000, false, false},
{"Tail100WithFilter", 100, true, false},
{"Tail1000WithFilter", 1000, true, false},

{"Tail100NoFilterWithDedup", 100, false, true},
{"Tail1000NoFilterWithDedup", 1000, false, true},
{"Tail100WithFilterWithDedup", 100, true, true},
{"Tail1000WithFilterWithDedup", 1000, true, true},
}

for _, bm := range benchmarks {
b.Run(bm.name, func(b *testing.B) {
var opts []OpOption
opts = append(opts,
WithFile("testdata/kubelet.0.log"),
WithLinesToTail(bm.linesToTail),
WithParseTime(func(line []byte) (time.Time, error) {
return time.Time{}, nil
}),
WithProcessMatched(func(line []byte, _ time.Time, _ *query_log_common.Filter) {}),
)

if bm.withFilter {
opts = append(opts, WithSelectFilter(&query_log_common.Filter{
Substring: ptr.To("error"),
}))
}

b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := Scan(ctx, opts...)
if err != nil {
b.Fatal(err)
}
}
})
}
}
Loading

0 comments on commit 66720c8

Please sign in to comment.