Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support rocketMQ #231

Draft
wants to merge 17 commits into
base: main
Choose a base branch
from

Conversation

XmchxUp
Copy link
Contributor

@XmchxUp XmchxUp commented Dec 30, 2024

Closes #189

Copy link

vercel bot commented Dec 30, 2024

The latest updates on your projects. Learn more about Vercel for Git ↗︎

Name Status Preview Comments Updated (UTC)
kyanos ✅ Ready (Inspect) Visit Preview 💬 Add feedback Jan 3, 2025 6:32am

@XmchxUp
Copy link
Contributor Author

XmchxUp commented Dec 31, 2024

@hengyoush I am currently encountering problems while executing, and there may be issues with the changes I made.

Can you help me take a look?

log.txt

sudo ./kyanos watch rocketmq --debug-output rocketmq --remote-ports 9876,8080 --protocol-log-level 5 
ERRO[0000] load gotls uprobe failed: load program: invalid argument:
        0: R1=ctx() R10=fp0
        ; int probe_entry_tls_conn_read(struct pt_regs* ctx) {
        0: (bf) r8 = r1                       ; R1=ctx() R8_w=ctx()
        ; uint64_t id = bpf_get_current_pid_tgid();
        1: (85) call bpf_get_current_pid_tgid#14      ; R0_w=scalar()
        ; uint32_t tgid = id >> 32;
        2: (77) r0 >>= 32                     ; R0_w=scalar(smin=0,smax=umax=0xffffffff,var_off=(0x0; 0xffffffff))
        ; uint32_t tgid = id >> 32;

@XmchxUp
Copy link
Contributor Author

XmchxUp commented Dec 31, 2024

I forgot to regenerate the BPF directory.

@hengyoush
Copy link
Owner

hengyoush commented Dec 31, 2024

Have you modified the eBPF code?
Is your code up to date?

image

this bug has been fixed in main branch, you should update your branch.

@XmchxUp
Copy link
Contributor Author

XmchxUp commented Dec 31, 2024

Have you modified eBPF code? Is your code up to date?

yes,I need try to regenerate it.

spencercjh and others added 4 commits December 31, 2024 13:26
…oush#221)

* docs: introduce prettier and md-padding to format docs

Signed-off-by: spencercjh <[email protected]>

* fix: make github alerts work

Signed-off-by: spencercjh <[email protected]>

* fix: make all markdown extensions work

Signed-off-by: spencercjh <[email protected]>

* fix: reformat new codes from main

Signed-off-by: spencercjh <[email protected]>

---------

Signed-off-by: spencercjh <[email protected]>
…d in server side (hengyoush#232)

fix: add fallback logic to calculate totaltime when nicin event missed in server side (hengyoush#232)
@XmchxUp
Copy link
Contributor Author

XmchxUp commented Dec 31, 2024

@hengyoush After updated and still have a problem, did my changes affect it?

sudo ./kyanos watch

log.txt

@hengyoush
Copy link
Owner

hengyoush commented Dec 31, 2024

No worries. I think your changes should not cause this error, this error is only the gotls probe loading failure, will not affect the function you are developing now, I will check this problem later. @XmchxUp

What your kernel version?

@XmchxUp
Copy link
Contributor Author

XmchxUp commented Dec 31, 2024

What your kernel version?

Linux ultraman-desktop 6.8.0-49-generic #49~22.04.1-Ubuntu SMP PREEMPT_DYNAMIC Wed Nov 6 17:42:15 UTC 2 x86_64 x86_64 x86_64 GNU/Linux

@hengyoush
Copy link
Owner

I tested your branch on my ubuntu24.04(kernel version: Linux VM-28-8-ubuntu 6.8.0-49-generic #49-Ubuntu SMP PREEMPT_DYNAMIC Mon Nov 4 02:06:24 UTC 2024 x86_64 x86_64 x86_64 GNU/Linux) , but it works fine and gotls probe load successfully. Take a look at assign_arg function in bpf/gotls.bpf.c , is it looks like below ?

static __inline void assign_arg(void* arg, size_t arg_size, struct location_t loc, const void* sp,
                                uint64_t* regs) {
  if (loc.type == kLocationTypeStack) {
    bpf_probe_read(arg, arg_size, sp + loc.offset);
  } else if (loc.type == kLocationTypeRegisters) {
    if (loc.offset >= 0) {
      if (loc.offset <= (1<<10)) {
        bpf_probe_read(arg, arg_size, (char*)regs + loc.offset);
      }
    }
  }
}

}

func (r *RocketMQMessage) FormatToString() string {
return fmt.Sprintf("base=[%s] command=[%s] payload=[%s]", r.FrameBase.String(), "todo", r.Body)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the TODO here expected?

I mean because it's a draft PR

Comment on lines 58 to 70
if !ok {
return protocol.ParseResult{
ParseState: protocol.Ignore,
ReadBytes: frameSize,
}
} else {
message.FrameBase = fb
return protocol.ParseResult{
ParseState: protocol.Success,
ReadBytes: frameSize,
ParsedMessages: []protocol.ParsedMessage{message},
}
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It can be simplified

Suggested change
if !ok {
return protocol.ParseResult{
ParseState: protocol.Ignore,
ReadBytes: frameSize,
}
} else {
message.FrameBase = fb
return protocol.ParseResult{
ParseState: protocol.Success,
ReadBytes: frameSize,
ParsedMessages: []protocol.ParsedMessage{message},
}
}
if !ok {
return protocol.ParseResult{
ParseState: protocol.Ignore,
ReadBytes: frameSize,
}
}
message.FrameBase = fb
return protocol.ParseResult{
ParseState: protocol.Success,
ReadBytes: frameSize,
ParsedMessages: []protocol.ParsedMessage{message},
}

}

func (parser *RocketMQStreamParser) parseHeader(headerBody []byte, serializedType byte) (*RocketMQMessage, error) {
fmt.Println(serializedType)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This one looks alike a left behind debug

func (parser *RocketMQStreamParser) parseHeader(headerBody []byte, serializedType byte) (*RocketMQMessage, error) {
fmt.Println(serializedType)
message := &RocketMQMessage{}
if serializedType == 0 {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Return the error currently in the else here if serializedType != 0

And simplify the dozen line if statement

Comment on lines 28 to 43
if len(buffer) < 8 {
return protocol.ParseResult{
ParseState: protocol.NeedsMoreData,
}
}

frameSize := int(binary.BigEndian.Uint32(buffer[:4]))
if frameSize > len(buffer) {
return protocol.ParseResult{ParseState: protocol.NeedsMoreData}
}

headerLength := binary.BigEndian.Uint32(buffer[4:8])
headerDataLen := headerLength & 0xFFFFFF
serializedType := byte((headerLength >> 24) & 0xFF)

if len(buffer) < 8+int(headerDataLen) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thr 4 and 8 are magical numbers here. I would like to suggest you to add constants

Comment on lines 77 to 92
if len(headerBody) < 18 {
return nil, errors.New("invalid header size")
}

message.RequestCode = int16(binary.BigEndian.Uint16(headerBody[:2]))
message.LanguageFlag = headerBody[2]
message.VersionFlag = int16(binary.BigEndian.Uint16(headerBody[3:5]))
message.Opaque = int32(binary.BigEndian.Uint32(headerBody[5:9]))
message.RequestFlag = int32(binary.BigEndian.Uint32(headerBody[9:13]))
message.RemarkLength = int32(binary.BigEndian.Uint32(headerBody[13:17]))

if int(message.RemarkLength) > len(headerBody[17:]) {
return nil, errors.New("invalid remark length")
}
message.Remark = headerBody[17 : 17+message.RemarkLength]
propertiesStart := 17 + message.RemarkLength

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here about magical number 3,5,13,17

"kyanos/agent/protocol"
)

var _ protocol.ParsedMessage = &RocketMQMessage{}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here you are validating the struct respect the interface.

This one and the previous one you do in the other files could be in tests file.

I'm unsure the compiler clean them in the final binary

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @Zxilly

Do you know if we could check that?

Thanks

@ccoVeille
Copy link

I know Screenshot_20241214_105816_Firefox.jpg

but I did it anyway 😅

@XmchxUp
Copy link
Contributor Author

XmchxUp commented Jan 1, 2025

@ccoVeille Thanks for the REVIEW, but currently in the stage of learning as I go along with the implementation 😅

bpf/protocol_inference.h Outdated Show resolved Hide resolved
bpf/protocol_inference.h Outdated Show resolved Hide resolved
common.ProtocolParserLog.Warnf("Unmatched requests remain: %d", len(reqMap))
}

return records
Copy link
Owner

@hengyoush hengyoush Jan 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the end of Match, you need to clean up reqStream and respStream, otherwise, reqStream/respStream will contain requests or responses that have already been matched.:

  • It is recommended to clear all respStream entries (and also the map) at the end of the Match function because the request is unlikely to appear again.
  • For reqStream, clean up those that have already been successfully matched.
  • Additionally, consider removing a request from reqStream if it has not received a response for a long time, such as if it exceeds a certain time without a matched response.

return message, nil
}

func (r *RocketMQStreamParser) FindBoundary(streamBuffer *buffer.StreamBuffer, messageType protocol.MessageType, startPos int) int {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The judgment here is too simple. There are two suggestions:

  1. Can you add logic similar to protocol inference?
  2. Generally, the response's Opaque corresponds to a request with the same Opaque. So, when finding the boundary, you can check if the response's Opaque has appeared in previous request parsing (you can add a Map in RocketMQStreamParser to record the Opaque of requests that have already appeared, and then clean it in Match). If it has not appeared, it is considered illegal and should continue searching.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add support for RocketMQ protocol
4 participants