Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support rocketMQ #231

Open
wants to merge 19 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions agent/protocol/rocketmq/filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package rocketmq

import (
"kyanos/agent/protocol"
"kyanos/bpf"
)

type Filter struct {
}

func (m Filter) Filter(req protocol.ParsedMessage, resp protocol.ParsedMessage) bool {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add some filter options and implement the filter logic here.

return true
}

func (m Filter) FilterByProtocol(p bpf.AgentTrafficProtocolT) bool {
return p == bpf.AgentTrafficProtocolTKProtocolRocketMQ
}

func (m Filter) FilterByRequest() bool {
return false
}

func (m Filter) FilterByResponse() bool {
return false
}

var _ protocol.ProtocolFilter = Filter{}
105 changes: 105 additions & 0 deletions agent/protocol/rocketmq/language_code.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package rocketmq

import (
"errors"
"fmt"
)

type LanguageCode byte

const (
JAVA LanguageCode = iota // 0
CPP // 1
DOTNET // 2
PYTHON // 3
DELPHI // 4
ERLANG // 5
RUBY // 6
OTHER // 7
HTTP // 8
GO // 9
PHP // 10
OMS // 11
RUST // 12
NODE_JS // 13
UNKNOWN
)

// convertToLanguageCode converts a string to a LanguageCode.
func convertToLanguageCode(language string) (LanguageCode, error) {
switch language {
case "JAVA":
return JAVA, nil
case "CPP":
return CPP, nil
case "DOTNET":
return DOTNET, nil
case "PYTHON":
return PYTHON, nil
case "DELPHI":
return DELPHI, nil
case "ERLANG":
return ERLANG, nil
case "RUBY":
return RUBY, nil
case "OTHER":
return OTHER, nil
case "HTTP":
return HTTP, nil
case "GO":
return GO, nil
case "PHP":
return PHP, nil
case "OMS":
return OMS, nil
case "RUST":
return RUST, nil
case "NODE_JS":
return NODE_JS, nil
default:
return 13, errors.New("unknown language: " + language)
}
}

// convertToLanguageCodeFromByte converts a byte to a LanguageCode.
func convertToLanguageCodeFromByte(flag byte) (LanguageCode, error) {
if flag > 13 {
return 0, errors.New("unknown language flag: " + fmt.Sprint(flag))
}
return LanguageCode(flag), nil
}

func (lc LanguageCode) String() string {
switch lc {
case JAVA:
return "JAVA"
case CPP:
return "CPP"
case DOTNET:
return "DOTNET"
case PYTHON:
return "PYTHON"
case DELPHI:
return "DELPHI"
case ERLANG:
return "ERLANG"
case RUBY:
return "RUBY"
case OTHER:
return "OTHER"
case HTTP:
return "HTTP"
case GO:
return "GO"
case PHP:
return "PHP"
case OMS:
return "OMS"
case RUST:
return "RUST"
case NODE_JS:
return "NODE_JS"
default:
return "UNKNOWN"
}
}
240 changes: 240 additions & 0 deletions agent/protocol/rocketmq/rocketmq.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
package rocketmq

import (
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"kyanos/agent/buffer"
"kyanos/agent/protocol"
"kyanos/bpf"
"kyanos/common"
"strings"
)

func init() {
protocol.ParsersMap[bpf.AgentTrafficProtocolTKProtocolRocketMQ] = func() protocol.ProtocolStreamParser {
return &RocketMQStreamParser{}
}
}

func NewRocketMQMessage() *RocketMQMessage {
return &RocketMQMessage{
LanguageCode: UNKNOWN,
RemarkBuf: make([]byte, 0),
PropertiesBuf: make([]byte, 0),
BodyBuf: make([]byte, 0),
Properties: map[string]string{},
}
}

func (r *RocketMQMessage) FormatToString() string {
remark := string(r.RemarkBuf)
body := string(r.BodyBuf)

propertiesMap := string(r.PropertiesBuf)
if len(r.Properties) > 0 {
props := make([]string, 0, len(r.Properties))
for key, value := range r.Properties {
props = append(props, fmt.Sprintf("%s=%s", key, value))
}
propertiesMap = fmt.Sprintf("{%s}", strings.Join(props, ", "))
}

return fmt.Sprintf("base=[%s] detail=[code=%d, language=%s, version=%d, opaque=%d, flag=%d, remark=%s, extFields=%s, body=%s]",
r.FrameBase.String(),
r.RequestCode,
r.LanguageCode,
r.VersionFlag,
r.Opaque,
r.RequestFlag,
remark,
propertiesMap,
body,
)

}

func (r *RocketMQMessage) IsReq() bool {
return r.isReq
}

func (r *RocketMQStreamParser) ParseStream(streamBuffer *buffer.StreamBuffer, messageType protocol.MessageType) protocol.ParseResult {
buffer := streamBuffer.Head().Buffer()
common.ProtocolParserLog.Debugf("ParseStream received buffer length: %d", len(buffer))

if len(buffer) < 8 {
common.ProtocolParserLog.Warn("Buffer too small for header, needs more data.")
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For cases where there's an issue parsing protocol data or more data is required, set the log level to info or lower.

return protocol.ParseResult{
ParseState: protocol.NeedsMoreData,
}
}

frameSize := int(binary.BigEndian.Uint32(buffer[:4]))
if frameSize <= 0 {
common.ProtocolParserLog.Warnf("Invalid frame size: %d", frameSize)
return protocol.ParseResult{ParseState: protocol.Invalid, ReadBytes: 4}
}

if frameSize+4 > len(buffer) {
common.ProtocolParserLog.Debugf("Frame size %d exceeds buffer length %d, needs more data.", frameSize, len(buffer))
return protocol.ParseResult{ParseState: protocol.NeedsMoreData}
}

headerLength := binary.BigEndian.Uint32(buffer[4:8])
headerDataLen := headerLength & 0xFFFFFF
serializedType := byte((headerLength >> 24) & 0xFF)

if 4+int(headerDataLen) > frameSize || len(buffer) < 8+int(headerDataLen) {
common.ProtocolParserLog.Warnf("Incomplete header detected: headerDataLen=%d, frameSize=%d.", headerDataLen, frameSize)
return protocol.ParseResult{ParseState: protocol.NeedsMoreData}
}

headerBody := buffer[8 : 8+headerDataLen]

message, err := r.parseHeader(headerBody, serializedType)
if err != nil {
common.ProtocolParserLog.Errorf("Failed to parse header: %v", err)
return protocol.ParseResult{ParseState: protocol.Invalid, ReadBytes: int(frameSize)}
}

if frameSize > 4+int(headerDataLen) {
body := buffer[8+headerDataLen : frameSize]
message.BodyBuf = body
}

message.isReq = messageType == protocol.Request
fb, ok := protocol.CreateFrameBase(streamBuffer, frameSize)

if !ok {
common.ProtocolParserLog.Warnf("Failed to create FrameBase for frameSize=%d", frameSize)
return protocol.ParseResult{
ParseState: protocol.Ignore,
ReadBytes: frameSize,
}
}

common.ProtocolParserLog.Debugf("Successfully parsed message: %+v", message)
message.FrameBase = fb
return protocol.ParseResult{
ParseState: protocol.Success,
ReadBytes: frameSize,
ParsedMessages: []protocol.ParsedMessage{message},
}

}

func (parser *RocketMQStreamParser) parseHeader(headerBody []byte, serializedType byte) (*RocketMQMessage, error) {
message := NewRocketMQMessage()
switch serializedType {
case 0: // json
var temp struct {
RequestCode int16 `json:"code"`
Language string `json:"language"`
VersionFlag int16 `json:"version"`
Opaque int32 `json:"opaque"`
RequestFlag int32 `json:"flag"`
Remark string `json:"remark,omitempty"`
Properties map[string]string `json:"extFields,omitempty"`
}

if err := json.Unmarshal(headerBody, &temp); err != nil {
return nil, fmt.Errorf("failed to parse JSON header: %w", err)
}

message.RequestCode = temp.RequestCode
lFlag, _ := convertToLanguageCode(temp.Language)
message.LanguageCode = lFlag
message.VersionFlag = temp.VersionFlag
message.Opaque = temp.Opaque
message.RequestFlag = temp.RequestFlag
message.RemarkLength = int32(len(temp.Remark))
message.RemarkBuf = []byte(temp.Remark)
message.PropertiesLen = int32(len(temp.Properties))
message.Properties = temp.Properties

case 1: // ROCKETMQ
if len(headerBody) < 18 {
return nil, errors.New("invalid header size for private serialization")
}

message.RequestCode = int16(binary.BigEndian.Uint16(headerBody[:2]))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use a constant instead of a hard-coded value.

lCode, _ := convertToLanguageCodeFromByte(headerBody[2])
message.LanguageCode = lCode
message.VersionFlag = int16(binary.BigEndian.Uint16(headerBody[3:5]))
message.Opaque = int32(binary.BigEndian.Uint32(headerBody[5:9]))
message.RequestFlag = int32(binary.BigEndian.Uint32(headerBody[9:13]))
message.RemarkLength = int32(binary.BigEndian.Uint32(headerBody[13:17]))

if int(message.RemarkLength) > len(headerBody[17:]) {
return nil, errors.New("invalid remark length")
}

message.RemarkBuf = headerBody[17 : 17+message.RemarkLength]

propertiesStart := 17 + message.RemarkLength
if len(headerBody[propertiesStart:]) < 4 {
return nil, errors.New("invalid properties length")
}

message.PropertiesLen = int32(binary.BigEndian.Uint32(headerBody[propertiesStart:]))
message.PropertiesBuf = headerBody[propertiesStart+4 : propertiesStart+4+message.PropertiesLen]

default:
return nil, fmt.Errorf("unsupported serialization type: %d", serializedType)
}

return message, nil
}

func (r *RocketMQStreamParser) FindBoundary(streamBuffer *buffer.StreamBuffer, messageType protocol.MessageType, startPos int) int {
XmchxUp marked this conversation as resolved.
Show resolved Hide resolved
buffer := streamBuffer.Head().Buffer()
common.ProtocolParserLog.Debugf("FindBoundary starting at position: %d, buffer length: %d", startPos, len(buffer))

for i := startPos; i <= len(buffer)-4; i++ {
frameSize := int(binary.BigEndian.Uint32(buffer[i : i+4]))

if frameSize <= 0 || (frameSize+4) > len(buffer)-i {
common.ProtocolParserLog.Warnf("Skipping invalid frameSize=%d at position=%d", frameSize, i)
continue
}

if (i + frameSize + 4) <= len(buffer) {
common.ProtocolParserLog.Debugf("Found boundary at position=%d with frameSize=%d", i, frameSize)
return i
}
}

common.ProtocolParserLog.Warn("No valid boundary found, returning -1.")
return -1
}

func (r *RocketMQStreamParser) Match(reqStream *[]protocol.ParsedMessage, respStream *[]protocol.ParsedMessage) []protocol.Record {
common.ProtocolParserLog.Debugf("Matching %d requests with %d responses.", len(*reqStream), len(*respStream))
records := []protocol.Record{}

reqMap := make(map[int32]*RocketMQMessage)
for _, msg := range *reqStream {
req := msg.(*RocketMQMessage)
reqMap[req.Opaque] = req
}

for _, msg := range *respStream {
resp := msg.(*RocketMQMessage)
if req, ok := reqMap[resp.Opaque]; ok {
records = append(records, protocol.Record{
Req: req,
Resp: resp,
})
delete(reqMap, resp.Opaque)
} else {
common.ProtocolParserLog.Warnf("No matching request found for response Opaque=%d", resp.Opaque)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Change log level to debug.

}
}

if len(reqMap) > 0 {
common.ProtocolParserLog.Warnf("Unmatched requests remain: %d", len(reqMap))
}

return records
XmchxUp marked this conversation as resolved.
Show resolved Hide resolved
}
Loading
Loading