-
Notifications
You must be signed in to change notification settings - Fork 121
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support rocketMQ #231
base: main
Are you sure you want to change the base?
Changes from 7 commits
1e0025c
fc38964
28798f3
f677d60
537a08a
fc80653
5b99af1
d8f722c
810f58e
6f32563
541008b
63345cf
2e22617
47d46ca
94a107e
1b7d62b
1b874e7
c4d4adc
f0bf0d4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
package rocketmq | ||
|
||
import ( | ||
"kyanos/agent/protocol" | ||
"kyanos/bpf" | ||
) | ||
|
||
type Filter struct { | ||
} | ||
|
||
func (m Filter) Filter(req protocol.ParsedMessage, resp protocol.ParsedMessage) bool { | ||
return true | ||
} | ||
|
||
func (m Filter) FilterByProtocol(p bpf.AgentTrafficProtocolT) bool { | ||
return p == bpf.AgentTrafficProtocolTKProtocolRocketMQ | ||
} | ||
|
||
func (m Filter) FilterByRequest() bool { | ||
return false | ||
} | ||
|
||
func (m Filter) FilterByResponse() bool { | ||
return false | ||
} | ||
|
||
var _ protocol.ProtocolFilter = Filter{} |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,140 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
package rocketmq | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
import ( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
"encoding/binary" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
"errors" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
"fmt" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
"kyanos/agent/buffer" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
"kyanos/agent/protocol" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
"kyanos/bpf" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
func init() { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
protocol.ParsersMap[bpf.AgentTrafficProtocolTKProtocolRocketMQ] = func() protocol.ProtocolStreamParser { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
return &RocketMQStreamParser{} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
func (r *RocketMQMessage) FormatToString() string { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
return fmt.Sprintf("base=[%s] command=[%s] payload=[%s]", r.FrameBase.String(), "todo", r.Body) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is the TODO here expected? I mean because it's a draft PR |
||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
func (r *RocketMQMessage) IsReq() bool { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
return r.isReq | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
func (r *RocketMQStreamParser) ParseStream(streamBuffer *buffer.StreamBuffer, messageType protocol.MessageType) protocol.ParseResult { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
buffer := streamBuffer.Head().Buffer() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
if len(buffer) < 8 { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
return protocol.ParseResult{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
ParseState: protocol.NeedsMoreData, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
frameSize := int(binary.BigEndian.Uint32(buffer[:4])) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
if frameSize > len(buffer) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
return protocol.ParseResult{ParseState: protocol.NeedsMoreData} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
headerLength := binary.BigEndian.Uint32(buffer[4:8]) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
headerDataLen := headerLength & 0xFFFFFF | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
serializedType := byte((headerLength >> 24) & 0xFF) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
if len(buffer) < 8+int(headerDataLen) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thr 4 and 8 are magical numbers here. I would like to suggest you to add constants |
||||||||||||||||||||||||||||||||||||||||||||||||||||||
return protocol.ParseResult{ParseState: protocol.NeedsMoreData} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
headerBody := buffer[8 : 8+headerDataLen] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
body := buffer[8+headerDataLen : frameSize] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
message, err := r.parseHeader(headerBody, serializedType) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
if err != nil { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
return protocol.ParseResult{ParseState: protocol.Invalid, ReadBytes: int(frameSize)} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
message.Body = body | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
message.isReq = messageType == protocol.Request | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
fb, ok := protocol.CreateFrameBase(streamBuffer, frameSize) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
if !ok { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
return protocol.ParseResult{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
ParseState: protocol.Ignore, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
ReadBytes: frameSize, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
message.FrameBase = fb | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
return protocol.ParseResult{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
ParseState: protocol.Success, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
ReadBytes: frameSize, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
ParsedMessages: []protocol.ParsedMessage{message}, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It can be simplified
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
func (parser *RocketMQStreamParser) parseHeader(headerBody []byte, serializedType byte) (*RocketMQMessage, error) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
fmt.Println(serializedType) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This one looks alike a left behind debug |
||||||||||||||||||||||||||||||||||||||||||||||||||||||
message := &RocketMQMessage{} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
if serializedType == 0 { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Return the error currently in the else here if serializedType != 0 And simplify the dozen line if statement |
||||||||||||||||||||||||||||||||||||||||||||||||||||||
if len(headerBody) < 18 { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
return nil, errors.New("invalid header size") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
message.RequestCode = int16(binary.BigEndian.Uint16(headerBody[:2])) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use a constant instead of a hard-coded value. |
||||||||||||||||||||||||||||||||||||||||||||||||||||||
message.LanguageFlag = headerBody[2] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
message.VersionFlag = int16(binary.BigEndian.Uint16(headerBody[3:5])) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
message.Opaque = int32(binary.BigEndian.Uint32(headerBody[5:9])) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
message.RequestFlag = int32(binary.BigEndian.Uint32(headerBody[9:13])) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
message.RemarkLength = int32(binary.BigEndian.Uint32(headerBody[13:17])) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
if int(message.RemarkLength) > len(headerBody[17:]) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
return nil, errors.New("invalid remark length") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
message.Remark = headerBody[17 : 17+message.RemarkLength] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
propertiesStart := 17 + message.RemarkLength | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same here about magical number 3,5,13,17 |
||||||||||||||||||||||||||||||||||||||||||||||||||||||
if len(headerBody[propertiesStart:]) < 4 { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
return nil, errors.New("invalid properties length") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
message.PropertiesLen = int32(binary.BigEndian.Uint32(headerBody[propertiesStart:])) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
message.Properties = headerBody[propertiesStart+4 : propertiesStart+4+message.PropertiesLen] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} else { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
return nil, errors.New("unsupported serialization type") | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
return message, nil | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
func (r *RocketMQStreamParser) FindBoundary(streamBuffer *buffer.StreamBuffer, messageType protocol.MessageType, startPos int) int { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
XmchxUp marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
buffer := streamBuffer.Head().Buffer()[startPos:] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
for i := range buffer { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
if len(buffer[i:]) < 8 { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
return -1 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
frameSize := binary.BigEndian.Uint32(buffer[i : i+4]) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
if int(frameSize) <= len(buffer[i:]) { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
return startPos + i | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
return -1 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
func (r *RocketMQStreamParser) Match(reqStream *[]protocol.ParsedMessage, respStream *[]protocol.ParsedMessage) []protocol.Record { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
records := []protocol.Record{} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
reqMap := make(map[int32]*RocketMQMessage) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
for _, msg := range *reqStream { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
req := msg.(*RocketMQMessage) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
reqMap[req.Opaque] = req | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
for _, msg := range *respStream { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
resp := msg.(*RocketMQMessage) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
if req, ok := reqMap[resp.Opaque]; ok { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
records = append(records, protocol.Record{ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
Req: req, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
Resp: resp, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
delete(reqMap, resp.Opaque) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
return records | ||||||||||||||||||||||||||||||||||||||||||||||||||||||
XmchxUp marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
package rocketmq | ||
|
||
import ( | ||
"kyanos/agent/protocol" | ||
) | ||
|
||
var _ protocol.ParsedMessage = &RocketMQMessage{} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here you are validating the struct respect the interface. This one and the previous one you do in the other files could be in tests file. I'm unsure the compiler clean them in the final binary There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
|
||
type RocketMQMessage struct { | ||
protocol.FrameBase | ||
RequestCode int16 | ||
LanguageFlag byte | ||
VersionFlag int16 | ||
Opaque int32 | ||
RequestFlag int32 | ||
RemarkLength int32 | ||
Remark []byte | ||
PropertiesLen int32 | ||
Properties []byte | ||
Body []byte | ||
isReq bool | ||
} | ||
|
||
var _ protocol.ProtocolStreamParser = &RocketMQStreamParser{} | ||
|
||
type RocketMQStreamParser struct { | ||
} |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
package cmd | ||
|
||
import ( | ||
"kyanos/agent/protocol/rocketmq" | ||
|
||
"github.com/spf13/cobra" | ||
) | ||
|
||
var rocketmqCmd *cobra.Command = &cobra.Command{ | ||
Use: "rocketmq", | ||
Short: "watch RocketMQ message", | ||
Run: func(cmd *cobra.Command, args []string) { | ||
options.MessageFilter = rocketmq.Filter{} | ||
options.LatencyFilter = initLatencyFilter(cmd) | ||
options.SizeFilter = initSizeFilter(cmd) | ||
startAgent() | ||
|
||
}, | ||
} | ||
|
||
func init() { | ||
rocketmqCmd.PersistentFlags().SortFlags = false | ||
copy := *rocketmqCmd | ||
watchCmd.AddCommand(©) | ||
copy2 := *rocketmqCmd | ||
statCmd.AddCommand(©2) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should add some filter options and implement the filter logic here.