Skip to content

Commit

Permalink
Merge pull request #81 from hengyoush/feature/overview-2
Browse files Browse the repository at this point in the history
feat. add overview subcommand
  • Loading branch information
hengyoush authored Oct 21, 2024
2 parents ed305f0 + f4cdad7 commit 71b5905
Show file tree
Hide file tree
Showing 8 changed files with 305 additions and 76 deletions.
51 changes: 43 additions & 8 deletions agent/analysis/analysis.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,20 @@ import (
type aggregator struct {
*analysis_common.AnalysisOptions
*ConnStat
isSub bool
}

func createAggregatorWithHumanReadableClassId(humanReadableClassId string,
classId analysis_common.ClassId, aggregateOption *analysis_common.AnalysisOptions) *aggregator {
aggregator := createAggregator(classId, aggregateOption)
classId analysis_common.ClassId,
aggregateOption *analysis_common.AnalysisOptions, isSub bool) *aggregator {
aggregator := createAggregator(classId, aggregateOption, isSub)
aggregator.HumanReadbleClassId = humanReadableClassId
return aggregator
}

func createAggregator(classId analysis_common.ClassId, aggregateOption *analysis_common.AnalysisOptions) *aggregator {
func createAggregator(classId analysis_common.ClassId, aggregateOption *analysis_common.AnalysisOptions, isSub bool) *aggregator {
aggregator := aggregator{}
aggregator.isSub = isSub
aggregator.reset(classId, aggregateOption)
return &aggregator
}
Expand All @@ -34,6 +37,10 @@ func (a *aggregator) reset(classId analysis_common.ClassId, aggregateOption *ana
a.ConnStat = &ConnStat{
ClassId: classId,
ClassfierType: aggregateOption.ClassfierType,
IsSub: a.isSub,
}
if a.isSub {
a.ConnStat.ClassfierType = aggregateOption.SubClassfierType
}
a.SamplesMap = make(map[analysis_common.MetricType][]*analysis_common.AnnotatedRecord)
a.PercentileCalculators = make(map[analysis_common.MetricType]*PercentileCalculator)
Expand Down Expand Up @@ -65,9 +72,14 @@ func (a *aggregator) receive(record *analysis_common.AnnotatedRecord) error {
metricType := analysis_common.MetricType(rawMetricType)

if enabled {
samples := a.SamplesMap[metricType]
MetricExtract := analysis_common.GetMetricExtractFunc[float64](metricType)
a.SamplesMap[metricType] = AddToSamples(samples, record, MetricExtract, a.AnalysisOptions.SampleLimit)
samples := a.SamplesMap[metricType]
// only sample if aggregator is sub or no sub classfier
if a.isSub || a.SubClassfierType == analysis_common.None {
a.SamplesMap[metricType] = AddToSamples(samples, record, MetricExtract, a.AnalysisOptions.SampleLimit)
} else {
a.SamplesMap[metricType] = AddToSamples(samples, record, MetricExtract, 1)
}

metricValue := MetricExtract(record)

Expand Down Expand Up @@ -102,6 +114,7 @@ func AddToSamples[T analysis_common.MetricValueType](samples []*analysis_common.

type Analyzer struct {
Classfier
subClassfier Classfier
*analysis_common.AnalysisOptions
common.SideEnum // 那一边的统计指标TODO 根据参数自动推断
Aggregators map[analysis_common.ClassId]*aggregator
Expand All @@ -120,7 +133,7 @@ func CreateAnalyzer(recordsChannel <-chan *analysis_common.AnnotatedRecord, opts
// ac.AddToFastStopper(stopper)
opts.Init()
analyzer := &Analyzer{
Classfier: getClassfier(opts.ClassfierType),
Classfier: getClassfier(opts.ClassfierType, *opts),
recordsChannel: recordsChannel,
Aggregators: make(map[analysis_common.ClassId]*aggregator),
AnalysisOptions: opts,
Expand All @@ -129,6 +142,9 @@ func CreateAnalyzer(recordsChannel <-chan *analysis_common.AnnotatedRecord, opts
renderStopper: renderStopper,
ctx: ctx,
}
if opts.SubClassfierType != analysis_common.None {
analyzer.subClassfier = getClassfier(opts.SubClassfierType, *opts)
}
opts.CurrentReceivedSamples = func() int {
return analyzer.recordReceived
}
Expand Down Expand Up @@ -189,14 +205,33 @@ func (a *Analyzer) analyze(record *analysis_common.AnnotatedRecord) {
if ok {
humanReadableClassId := humanReadableFunc(record)
a.Aggregators[class] = createAggregatorWithHumanReadableClassId(humanReadableClassId,
class, a.AnalysisOptions)
class, a.AnalysisOptions, false)
} else {
a.Aggregators[class] = createAggregator(class, a.AnalysisOptions)
a.Aggregators[class] = createAggregator(class, a.AnalysisOptions, false)
}

aggregator = a.Aggregators[class]
}
aggregator.receive(record)

if a.subClassfier != nil {
subClassId, err := a.subClassfier(record)
if err == nil {
fullClassId := class + "||" + subClassId
subAggregator, exists := a.Aggregators[fullClassId]
if !exists {
subHumanReadableFunc, ok := getClassIdHumanReadableFunc(a.AnalysisOptions.SubClassfierType, *a.AnalysisOptions)
if ok {
subHumanReadableClassId := subHumanReadableFunc(record)
a.Aggregators[fullClassId] = createAggregatorWithHumanReadableClassId(subHumanReadableClassId, fullClassId, a.AnalysisOptions, true)
} else {
a.Aggregators[fullClassId] = createAggregator(fullClassId, a.AnalysisOptions, true)
}
subAggregator = a.Aggregators[fullClassId]
}
subAggregator.receive(record)
}
}
} else {
common.DefaultLog.Warnf("classify error: %v\n", err)
}
Expand Down
49 changes: 47 additions & 2 deletions agent/analysis/classfier.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,25 @@ func init() {
}
}

classfierMap[anc.ProtocolAdaptive] = func(ar *anc.AnnotatedRecord) (anc.ClassId, error) {
redisReq, ok := ar.Record.Request().(*protocol.RedisMessage)
if !ok {
return "_not_a_redis_req_", nil
} else {
return anc.ClassId(redisReq.Command()), nil
}
}

classIdHumanReadableMap = make(map[anc.ClassfierType]ClassIdAsHumanReadable)
classIdHumanReadableMap[anc.RemoteIp] = func(ar *anc.AnnotatedRecord) string {
return ar.ConnDesc.RemoteAddr.String()
}
classIdHumanReadableMap[anc.RemotePort] = func(ar *anc.AnnotatedRecord) string {
return fmt.Sprintf("%d", ar.ConnDesc.RemotePort)
}
classIdHumanReadableMap[anc.LocalPort] = func(ar *anc.AnnotatedRecord) string {
return fmt.Sprintf("%d", ar.ConnDesc.LocalPort)
}
classIdHumanReadableMap[anc.Conn] = func(ar *anc.AnnotatedRecord) string {
return ar.ConnDesc.SimpleString()
}
Expand All @@ -73,6 +91,33 @@ func init() {
}
}

func getClassfier(classfierType anc.ClassfierType) Classfier {
return classfierMap[classfierType]
func getClassfier(classfierType anc.ClassfierType, options anc.AnalysisOptions) Classfier {
if classfierType == anc.ProtocolAdaptive {
return func(ar *anc.AnnotatedRecord) (anc.ClassId, error) {
c, ok := options.ProtocolSpecificClassfiers[bpf.AgentTrafficProtocolT(ar.Protocol)]
if !ok {
return classfierMap[anc.RemoteIp](ar)
} else {
return classfierMap[c](ar)
}
}
} else {
return classfierMap[classfierType]
}
}

func getClassIdHumanReadableFunc(classfierType anc.ClassfierType, options anc.AnalysisOptions) (ClassIdAsHumanReadable, bool) {
if classfierType == anc.ProtocolAdaptive {
return func(ar *anc.AnnotatedRecord) string {
c, ok := options.ProtocolSpecificClassfiers[bpf.AgentTrafficProtocolT(ar.Protocol)]
if !ok {
return classIdHumanReadableMap[anc.RemoteIp](ar)
} else {
return classIdHumanReadableMap[c](ar)
}
}, true
} else {
f, ok := classIdHumanReadableMap[classfierType]
return f, ok
}
}
21 changes: 12 additions & 9 deletions agent/analysis/common/classfier.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
package common

var ClassfierTypeNames = map[ClassfierType]string{
None: "none",
Conn: "conn",
RemotePort: "remote-port",
LocalPort: "local-port",
RemoteIp: "remote-ip",
Protocol: "protocol",
HttpPath: "http-path",
RedisCommand: "redis-command",
Default: "default",
None: "none",
Conn: "conn",
RemotePort: "remote-port",
LocalPort: "local-port",
RemoteIp: "remote-ip",
Protocol: "protocol",
HttpPath: "http-path",
RedisCommand: "redis-command",
ProtocolAdaptive: "protocol-adaptive",
Default: "default",
}

const (
Expand All @@ -26,6 +27,8 @@ const (

// Redis
RedisCommand

ProtocolAdaptive
)

type ClassId string
8 changes: 7 additions & 1 deletion agent/analysis/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package common
import (
"fmt"
"kyanos/agent/protocol"
"kyanos/bpf"
"kyanos/common"
ac "kyanos/common"

Expand All @@ -14,7 +15,9 @@ type AnalysisOptions struct {
SampleLimit int
Side ac.SideEnum
ClassfierType
CleanWhenHarvest bool
SubClassfierType ClassfierType
ProtocolSpecificClassfiers map[bpf.AgentTrafficProtocolT]ClassfierType
CleanWhenHarvest bool

// Fast Inspect Options
SlowMode bool
Expand All @@ -23,6 +26,9 @@ type AnalysisOptions struct {
TargetSamples int
CurrentReceivedSamples func() int
HavestSignal chan struct{}

// overview mode
Overview bool
}

func (a *AnalysisOptions) Init() {
Expand Down
6 changes: 5 additions & 1 deletion agent/analysis/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type ConnStat struct {
ClassId anc.ClassId
HumanReadbleClassId string
ClassfierType anc.ClassfierType
IsSub bool
}

func (c *ConnStat) ClassIdAsHumanReadable(classId anc.ClassId) string {
Expand All @@ -31,10 +32,13 @@ func (c *ConnStat) ClassIdAsHumanReadable(classId anc.ClassId) string {
case anc.LocalPort:
fallthrough
case anc.RemoteIp:
return string(classId)
return c.HumanReadbleClassId
case anc.Protocol:
return c.HumanReadbleClassId
default:
if c.HumanReadbleClassId != "" {
return c.HumanReadbleClassId
}
return string(classId)
}
}
Expand Down
Loading

0 comments on commit 71b5905

Please sign in to comment.