-
Notifications
You must be signed in to change notification settings - Fork 121
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support rocketMQ #231
base: main
Are you sure you want to change the base?
Changes from 9 commits
1e0025c
fc38964
28798f3
f677d60
537a08a
fc80653
5b99af1
d8f722c
810f58e
6f32563
541008b
63345cf
2e22617
47d46ca
94a107e
1b7d62b
1b874e7
c4d4adc
f0bf0d4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
package rocketmq | ||
|
||
import ( | ||
"kyanos/agent/protocol" | ||
"kyanos/bpf" | ||
) | ||
|
||
type Filter struct { | ||
} | ||
|
||
func (m Filter) Filter(req protocol.ParsedMessage, resp protocol.ParsedMessage) bool { | ||
return true | ||
} | ||
|
||
func (m Filter) FilterByProtocol(p bpf.AgentTrafficProtocolT) bool { | ||
return p == bpf.AgentTrafficProtocolTKProtocolRocketMQ | ||
} | ||
|
||
func (m Filter) FilterByRequest() bool { | ||
return false | ||
} | ||
|
||
func (m Filter) FilterByResponse() bool { | ||
return false | ||
} | ||
|
||
var _ protocol.ProtocolFilter = Filter{} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,171 @@ | ||
package rocketmq | ||
|
||
import ( | ||
"encoding/binary" | ||
"errors" | ||
"fmt" | ||
"kyanos/agent/buffer" | ||
"kyanos/agent/protocol" | ||
"kyanos/bpf" | ||
"kyanos/common" | ||
) | ||
|
||
func init() { | ||
protocol.ParsersMap[bpf.AgentTrafficProtocolTKProtocolRocketMQ] = func() protocol.ProtocolStreamParser { | ||
return &RocketMQStreamParser{} | ||
} | ||
} | ||
|
||
func (r *RocketMQMessage) FormatToString() string { | ||
return fmt.Sprintf("base=[%s] command=[%s] payload=[%s]", r.FrameBase.String(), "todo", r.Body) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the TODO here expected? I mean because it's a draft PR |
||
} | ||
|
||
func (r *RocketMQMessage) IsReq() bool { | ||
return r.isReq | ||
} | ||
|
||
func (r *RocketMQStreamParser) ParseStream(streamBuffer *buffer.StreamBuffer, messageType protocol.MessageType) protocol.ParseResult { | ||
buffer := streamBuffer.Head().Buffer() | ||
common.ProtocolParserLog.Debugf("ParseStream received buffer length: %d", len(buffer)) | ||
common.ProtocolParserLog.Debugln("==============", buffer) | ||
|
||
if len(buffer) < 8 { | ||
common.ProtocolParserLog.Warn("Buffer too small for header, needs more data.") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For cases where there's an issue parsing protocol data or more data is required, set the log level to info or lower. |
||
return protocol.ParseResult{ | ||
ParseState: protocol.NeedsMoreData, | ||
} | ||
} | ||
|
||
frameSize := int(binary.BigEndian.Uint32(buffer[:4])) | ||
if frameSize <= 0 { | ||
common.ProtocolParserLog.Warnf("Invalid frame size: %d", frameSize) | ||
return protocol.ParseResult{ParseState: protocol.Invalid, ReadBytes: 4} | ||
} | ||
|
||
if frameSize > len(buffer) { | ||
common.ProtocolParserLog.Debugf("Frame size %d exceeds buffer length %d, needs more data.", frameSize, len(buffer)) | ||
return protocol.ParseResult{ParseState: protocol.NeedsMoreData} | ||
} | ||
|
||
headerLength := binary.BigEndian.Uint32(buffer[4:8]) | ||
headerDataLen := headerLength & 0xFFFFFF | ||
serializedType := byte((headerLength >> 24) & 0xFF) | ||
|
||
if 8+int(headerDataLen) > frameSize || len(buffer) < 8+int(headerDataLen) { | ||
common.ProtocolParserLog.Warnf("Incomplete header detected: headerDataLen=%d, frameSize=%d.", headerDataLen, frameSize) | ||
return protocol.ParseResult{ParseState: protocol.NeedsMoreData} | ||
} | ||
|
||
headerBody := buffer[8 : 8+headerDataLen] | ||
body := buffer[8+headerDataLen : frameSize] | ||
|
||
message, err := r.parseHeader(headerBody, serializedType) | ||
if err != nil { | ||
common.ProtocolParserLog.Errorf("Failed to parse header: %v", err) | ||
return protocol.ParseResult{ParseState: protocol.Invalid, ReadBytes: int(frameSize)} | ||
} | ||
|
||
message.Body = body | ||
message.isReq = messageType == protocol.Request | ||
fb, ok := protocol.CreateFrameBase(streamBuffer, frameSize) | ||
|
||
if !ok { | ||
common.ProtocolParserLog.Warnf("Failed to create FrameBase for frameSize=%d", frameSize) | ||
return protocol.ParseResult{ | ||
ParseState: protocol.Ignore, | ||
ReadBytes: frameSize, | ||
} | ||
} | ||
|
||
common.ProtocolParserLog.Debugf("Successfully parsed message: %+v", message) | ||
message.FrameBase = fb | ||
return protocol.ParseResult{ | ||
ParseState: protocol.Success, | ||
ReadBytes: frameSize, | ||
ParsedMessages: []protocol.ParsedMessage{message}, | ||
} | ||
|
||
} | ||
|
||
func (parser *RocketMQStreamParser) parseHeader(headerBody []byte, serializedType byte) (*RocketMQMessage, error) { | ||
fmt.Println(serializedType) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This one looks alike a left behind debug |
||
message := &RocketMQMessage{} | ||
if serializedType == 0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Return the error currently in the else here if serializedType != 0 And simplify the dozen line if statement |
||
if len(headerBody) < 18 { | ||
return nil, errors.New("invalid header size") | ||
} | ||
|
||
message.RequestCode = int16(binary.BigEndian.Uint16(headerBody[:2])) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use a constant instead of a hard-coded value. |
||
message.LanguageFlag = headerBody[2] | ||
message.VersionFlag = int16(binary.BigEndian.Uint16(headerBody[3:5])) | ||
message.Opaque = int32(binary.BigEndian.Uint32(headerBody[5:9])) | ||
message.RequestFlag = int32(binary.BigEndian.Uint32(headerBody[9:13])) | ||
message.RemarkLength = int32(binary.BigEndian.Uint32(headerBody[13:17])) | ||
|
||
if int(message.RemarkLength) > len(headerBody[17:]) { | ||
return nil, errors.New("invalid remark length") | ||
} | ||
message.Remark = headerBody[17 : 17+message.RemarkLength] | ||
propertiesStart := 17 + message.RemarkLength | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here about magical number 3,5,13,17 |
||
if len(headerBody[propertiesStart:]) < 4 { | ||
return nil, errors.New("invalid properties length") | ||
} | ||
message.PropertiesLen = int32(binary.BigEndian.Uint32(headerBody[propertiesStart:])) | ||
message.Properties = headerBody[propertiesStart+4 : propertiesStart+4+message.PropertiesLen] | ||
} else { | ||
return nil, errors.New("unsupported serialization type") | ||
} | ||
return message, nil | ||
} | ||
|
||
func (r *RocketMQStreamParser) FindBoundary(streamBuffer *buffer.StreamBuffer, messageType protocol.MessageType, startPos int) int { | ||
XmchxUp marked this conversation as resolved.
Show resolved
Hide resolved
|
||
buffer := streamBuffer.Head().Buffer() | ||
common.ProtocolParserLog.Debugf("FindBoundary starting at position: %d, buffer length: %d", startPos, len(buffer)) | ||
|
||
for i := startPos; i <= len(buffer)-8; i++ { | ||
frameSize := int(binary.BigEndian.Uint32(buffer[i : i+4])) | ||
|
||
if frameSize <= 0 || frameSize > len(buffer)-i { | ||
common.ProtocolParserLog.Warnf("Skipping invalid frameSize=%d at position=%d", frameSize, i) | ||
continue | ||
} | ||
|
||
if i+frameSize <= len(buffer) { | ||
common.ProtocolParserLog.Debugf("Found boundary at position=%d with frameSize=%d", i, frameSize) | ||
return i | ||
} | ||
} | ||
|
||
common.ProtocolParserLog.Warn("No valid boundary found, returning -1.") | ||
return -1 | ||
} | ||
|
||
func (r *RocketMQStreamParser) Match(reqStream *[]protocol.ParsedMessage, respStream *[]protocol.ParsedMessage) []protocol.Record { | ||
common.ProtocolParserLog.Debugf("Matching %d requests with %d responses.", len(*reqStream), len(*respStream)) | ||
records := []protocol.Record{} | ||
|
||
reqMap := make(map[int32]*RocketMQMessage) | ||
for _, msg := range *reqStream { | ||
req := msg.(*RocketMQMessage) | ||
reqMap[req.Opaque] = req | ||
} | ||
|
||
for _, msg := range *respStream { | ||
resp := msg.(*RocketMQMessage) | ||
if req, ok := reqMap[resp.Opaque]; ok { | ||
records = append(records, protocol.Record{ | ||
Req: req, | ||
Resp: resp, | ||
}) | ||
delete(reqMap, resp.Opaque) | ||
} else { | ||
common.ProtocolParserLog.Warnf("No matching request found for response Opaque=%d", resp.Opaque) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Change log level to debug. |
||
} | ||
} | ||
|
||
if len(reqMap) > 0 { | ||
common.ProtocolParserLog.Warnf("Unmatched requests remain: %d", len(reqMap)) | ||
} | ||
|
||
return records | ||
XmchxUp marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
package rocketmq | ||
|
||
import ( | ||
"kyanos/agent/protocol" | ||
) | ||
|
||
var _ protocol.ParsedMessage = &RocketMQMessage{} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here you are validating the struct respect the interface. This one and the previous one you do in the other files could be in tests file. I'm unsure the compiler clean them in the final binary There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
||
type RocketMQMessage struct { | ||
protocol.FrameBase | ||
RequestCode int16 | ||
LanguageFlag byte | ||
VersionFlag int16 | ||
Opaque int32 | ||
RequestFlag int32 | ||
RemarkLength int32 | ||
Remark []byte | ||
PropertiesLen int32 | ||
Properties []byte | ||
Body []byte | ||
isReq bool | ||
} | ||
|
||
var _ protocol.ProtocolStreamParser = &RocketMQStreamParser{} | ||
|
||
type RocketMQStreamParser struct { | ||
} |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
package cmd | ||
|
||
import ( | ||
"kyanos/agent/protocol/rocketmq" | ||
|
||
"github.com/spf13/cobra" | ||
) | ||
|
||
var rocketmqCmd *cobra.Command = &cobra.Command{ | ||
Use: "rocketmq", | ||
Short: "watch RocketMQ message", | ||
Run: func(cmd *cobra.Command, args []string) { | ||
options.MessageFilter = rocketmq.Filter{} | ||
options.LatencyFilter = initLatencyFilter(cmd) | ||
options.SizeFilter = initSizeFilter(cmd) | ||
startAgent() | ||
}, | ||
} | ||
|
||
func init() { | ||
rocketmqCmd.PersistentFlags().SortFlags = false | ||
copy := *rocketmqCmd | ||
watchCmd.AddCommand(©) | ||
copy2 := *rocketmqCmd | ||
statCmd.AddCommand(©2) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should add some filter options and implement the filter logic here.