diff --git a/src/stirling/source_connectors/socket_tracer/protocols/mongodb/BUILD.bazel b/src/stirling/source_connectors/socket_tracer/protocols/mongodb/BUILD.bazel index 15d285bc86e..7ef0756110b 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/mongodb/BUILD.bazel +++ b/src/stirling/source_connectors/socket_tracer/protocols/mongodb/BUILD.bazel @@ -46,3 +46,9 @@ pl_cc_test( srcs = ["parse_test.cc"], deps = [":cc_library"], ) + +pl_cc_test( + name = "stitcher_test", + srcs = ["stitcher_test.cc"], + deps = [":cc_library"], +) diff --git a/src/stirling/source_connectors/socket_tracer/protocols/mongodb/stitcher.cc b/src/stirling/source_connectors/socket_tracer/protocols/mongodb/stitcher.cc new file mode 100644 index 00000000000..869d43047b7 --- /dev/null +++ b/src/stirling/source_connectors/socket_tracer/protocols/mongodb/stitcher.cc @@ -0,0 +1,209 @@ +/* + * Copyright 2018- The Pixie Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "src/stirling/source_connectors/socket_tracer/protocols/mongodb/stitcher.h" + +#include +#include +#include + +#include +#include +#include "src/common/base/base.h" + +#include "src/stirling/source_connectors/socket_tracer/protocols/common/interface.h" +#include "src/stirling/source_connectors/socket_tracer/protocols/mongodb/types.h" +#include "src/stirling/utils/binary_decoder.h" + +namespace px { +namespace stirling { +namespace protocols { +namespace mongodb { + +void FindMoreToComeResponses( + absl::flat_hash_map>* resps, int* error_count, + mongodb::Frame* resp_frame, uint64_t* latest_resp_ts) { + // In a more to come message, the response frame's responseTo will be the requestID of the prior + // response frame. + + // Find and insert all of the more to come frame(s) section data to the head response frame. + auto curr_resp = resp_frame; + + while (curr_resp->more_to_come) { + // Find the next response's deque. + auto next_resp_deque_it = resps->find(curr_resp->request_id); + if (next_resp_deque_it == resps->end()) { + VLOG(1) << absl::Substitute( + "Did not find a response deque extending the prior more to come response. " + "requestID: $0", + curr_resp->request_id); + (*error_count)++; + return; + } + + // Response deque containing the next more to come response frame. + auto& next_resp_deque = next_resp_deque_it->second; + + // Find the next response frame from the deque with a timestamp just greater than the + // current response frame's timestamp. + auto next_resp_it = std::upper_bound( + next_resp_deque.begin(), next_resp_deque.end(), *latest_resp_ts, + [](const uint64_t ts, const mongodb::Frame& frame) { return ts < frame.timestamp_ns; }); + if (next_resp_it->timestamp_ns < *latest_resp_ts) { + VLOG(1) << absl::Substitute( + "Did not find a response extending the prior more to come response. RequestID: $0", + curr_resp->request_id); + (*error_count)++; + return; + } + + // Insert the next response's section data to the head of the more to come response. + mongodb::Frame& next_resp = *next_resp_it; + resp_frame->sections.insert(std::end(resp_frame->sections), std::begin(next_resp.sections), + std::end(next_resp.sections)); + next_resp.consumed = true; + *latest_resp_ts = next_resp.timestamp_ns; + curr_resp = &next_resp; + } + + // TODO(kpattaswamy): In the case of "missing" more to come middle/tail frames, determine whether + // they are truly missing or have not been parsed yet. +} + +void FlattenSections(mongodb::Frame* frame) { + // Flatten the vector of sections containing vector of documents into a single string. + for (const auto& section : frame->sections) { + for (const auto& doc : section.documents) { + frame->frame_body.append(doc).append(" "); + } + } + frame->sections.clear(); +} + +RecordsWithErrorCount StitchFrames( + absl::flat_hash_map>* reqs, + absl::flat_hash_map>* resps, State* state) { + std::vector records; + int error_count = 0; + + for (auto& stream_id_pair : state->stream_order) { + auto stream_id = stream_id_pair.first; + + // Find the stream ID's response deque. + auto resp_it = resps->find(stream_id); + if (resp_it == resps->end()) { + VLOG(1) << absl::Substitute("Did not find a response deque with the stream ID: $0", + stream_id); + continue; + } + + // Response deque for the stream ID. + auto& resp_deque = resp_it->second; + + // Find the stream ID's request deque. + auto req_it = reqs->find(stream_id); + // The request deque should exist in the reqs map since the state contained the stream ID. + CTX_DCHECK(req_it != reqs->end()); + + // Request deque for the stream ID. + auto& req_deque = req_it->second; + + // Track the latest response timestamp to compare against request frame's timestamp later. + uint64_t latest_resp_ts = 0; + + // Stitch the first frame in the response deque with the corresponding request frame. + for (auto& resp_frame : resp_deque) { + if (resp_frame.consumed) { + continue; + } + + latest_resp_ts = resp_frame.timestamp_ns; + + // Find the corresponding request frame for the head response frame. + auto req_frame_it = std::upper_bound( + req_deque.begin(), req_deque.end(), latest_resp_ts, + [](const uint64_t ts, const mongodb::Frame& frame) { return ts < frame.timestamp_ns; }); + + if (req_frame_it != req_deque.begin()) { + --req_frame_it; + } + + if (req_frame_it->timestamp_ns > latest_resp_ts) { + VLOG(1) << absl::Substitute( + "Did not find a request frame that is earlier than the response. Response's " + "responseTo: $0", + resp_frame.response_to); + resp_frame.consumed = true; + error_count++; + break; + } + + mongodb::Frame& req_frame = *req_frame_it; + + FindMoreToComeResponses(resps, &error_count, &resp_frame, &latest_resp_ts); + + // Stitch the request/response and add it to the records. + req_frame.consumed = true; + resp_frame.consumed = true; + FlattenSections(&req_frame); + FlattenSections(&resp_frame); + records.push_back({std::move(req_frame), std::move(resp_frame)}); + break; + } + + auto erase_until_iter = req_deque.begin(); + while (erase_until_iter != req_deque.end() && + (erase_until_iter->consumed || erase_until_iter->timestamp_ns < latest_resp_ts)) { + if (!erase_until_iter->consumed) { + error_count++; + } + ++erase_until_iter; + } + + req_deque.erase(req_deque.begin(), erase_until_iter); + stream_id_pair.second = true; + } + + // Clear the response deques. + for (auto it = resps->begin(); it != resps->end(); it++) { + auto& resp_deque = it->second; + for (auto& resp : resp_deque) { + if (!resp.consumed) { + error_count++; + } + } + resp_deque.clear(); + } + + // Clear the state. + auto it = state->stream_order.begin(); + while (it != state->stream_order.end()) { + if (it->second) { + it = state->stream_order.erase(it); + } else { + it++; + } + } + + return {records, error_count}; +} + +} // namespace mongodb +} // namespace protocols +} // namespace stirling +} // namespace px diff --git a/src/stirling/source_connectors/socket_tracer/protocols/mongodb/stitcher.h b/src/stirling/source_connectors/socket_tracer/protocols/mongodb/stitcher.h new file mode 100644 index 00000000000..ec35b98a1df --- /dev/null +++ b/src/stirling/source_connectors/socket_tracer/protocols/mongodb/stitcher.h @@ -0,0 +1,56 @@ +/* + * Copyright 2018- The Pixie Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#pragma once + +#include +#include +#include +#include + +#include "src/common/base/base.h" +#include "src/stirling/source_connectors/socket_tracer/protocols/common/interface.h" +#include "src/stirling/source_connectors/socket_tracer/protocols/mongodb/types.h" + +namespace px { +namespace stirling { +namespace protocols { +namespace mongodb { + +void FindMoreToComeResponses( + absl::flat_hash_map>* resps, int* error_count, + mongodb::Frame* resp_frame, uint64_t* latest_resp_ts); + +void FlattenSections(mongodb::Frame* frame); + +RecordsWithErrorCount StitchFrames( + absl::flat_hash_map>* reqs, + absl::flat_hash_map>* resps, State* state); +} // namespace mongodb + +template <> +inline RecordsWithErrorCount StitchFrames( + absl::flat_hash_map>* reqs, + absl::flat_hash_map>* resps, + mongodb::StateWrapper* state) { + return mongodb::StitchFrames(reqs, resps, &state->global); +} + +} // namespace protocols +} // namespace stirling +} // namespace px diff --git a/src/stirling/source_connectors/socket_tracer/protocols/mongodb/stitcher_test.cc b/src/stirling/source_connectors/socket_tracer/protocols/mongodb/stitcher_test.cc new file mode 100644 index 00000000000..21749cd6c85 --- /dev/null +++ b/src/stirling/source_connectors/socket_tracer/protocols/mongodb/stitcher_test.cc @@ -0,0 +1,349 @@ +/* + * Copyright 2018- The Pixie Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include "src/stirling/source_connectors/socket_tracer/protocols/mongodb/stitcher.h" + +#include + +#include +#include + +#include "src/common/testing/testing.h" +#include "src/stirling/source_connectors/socket_tracer/protocols/common/test_utils.h" + +namespace px { +namespace stirling { +namespace protocols { +namespace mongodb { + +using ::testing::IsEmpty; +using ::testing::SizeIs; + +class MongoDBStitchFramesTest : public ::testing::Test {}; + +Frame CreateMongoDBFrame(uint64_t ts_ns, int32_t request_id, int32_t response_to, bool more_to_come, + std::string doc = "") { + mongodb::Frame frame; + frame.timestamp_ns = ts_ns; + frame.request_id = request_id; + frame.response_to = response_to; + frame.more_to_come = more_to_come; + + mongodb::Section section; + section.documents.push_back(doc); + + frame.sections.push_back(section); + + return frame; +} + +TEST_F(MongoDBStitchFramesTest, VerifyStitchingWithReusedStreams) { + absl::flat_hash_map> reqs; + absl::flat_hash_map> resps; + + // Add requests to map. + reqs[1].push_back(CreateMongoDBFrame(0, 1, 0, false)); + reqs[3].push_back(CreateMongoDBFrame(2, 3, 0, false)); + reqs[5].push_back(CreateMongoDBFrame(4, 5, 0, false)); + + reqs[1].push_back(CreateMongoDBFrame(6, 1, 0, false)); + reqs[3].push_back(CreateMongoDBFrame(8, 3, 0, false)); + reqs[5].push_back(CreateMongoDBFrame(10, 5, 0, false)); + reqs[5].push_back(CreateMongoDBFrame(12, 5, 0, false)); // Unmatched Request + + // Add responses to map. + resps[1].push_back(CreateMongoDBFrame(1, 2, 1, false)); + resps[3].push_back(CreateMongoDBFrame(3, 4, 3, false)); + resps[5].push_back(CreateMongoDBFrame(5, 6, 5, false)); + + resps[1].push_back(CreateMongoDBFrame(7, 8, 1, false)); + resps[3].push_back(CreateMongoDBFrame(9, 10, 3, false)); + resps[3].push_back(CreateMongoDBFrame(13, 13, 3, false)); // Response with no request + resps[5].push_back(CreateMongoDBFrame(11, 12, 5, false)); + + // Add the order in which the transactions's streamID's were found. + State state = {}; + state.stream_order.push_back({1, false}); + state.stream_order.push_back({3, false}); + state.stream_order.push_back({5, false}); + state.stream_order.push_back({1, false}); + state.stream_order.push_back({3, false}); + state.stream_order.push_back({5, false}); + state.stream_order.push_back({5, false}); + + RecordsWithErrorCount result = mongodb::StitchFrames(&reqs, &resps, &state); + EXPECT_EQ(result.error_count, 1); + EXPECT_THAT(result.records, SizeIs(6)); + EXPECT_EQ(result.records[0].req.timestamp_ns, 0); + EXPECT_EQ(result.records[0].resp.timestamp_ns, 1); + EXPECT_EQ(result.records[5].req.timestamp_ns, 10); + EXPECT_EQ(result.records[5].resp.timestamp_ns, 11); + + EXPECT_EQ(TotalDequeSize(reqs), 1); + EXPECT_TRUE(AreAllDequesEmpty(resps)); + EXPECT_THAT(state.stream_order, SizeIs(0)); +} + +TEST_F(MongoDBStitchFramesTest, VerifyOnetoOneStitching) { + absl::flat_hash_map> reqs; + absl::flat_hash_map> resps; + + // Add requests to map. + reqs[1].push_back(CreateMongoDBFrame(0, 1, 0, false)); + reqs[3].push_back(CreateMongoDBFrame(2, 3, 0, false)); + reqs[5].push_back(CreateMongoDBFrame(4, 5, 0, false)); + reqs[7].push_back(CreateMongoDBFrame(6, 7, 0, false)); + reqs[9].push_back(CreateMongoDBFrame(8, 9, 0, false)); + reqs[11].push_back(CreateMongoDBFrame(10, 11, 0, false)); + reqs[13].push_back(CreateMongoDBFrame(12, 13, 0, false)); + reqs[15].push_back(CreateMongoDBFrame(14, 15, 0, false)); + + // Add responses to map. + resps[1].push_back(CreateMongoDBFrame(1, 2, 1, false)); + resps[3].push_back(CreateMongoDBFrame(3, 4, 3, false)); + resps[5].push_back(CreateMongoDBFrame(5, 6, 5, false)); + resps[7].push_back(CreateMongoDBFrame(7, 8, 7, false)); + resps[9].push_back(CreateMongoDBFrame(9, 10, 9, false)); + resps[11].push_back(CreateMongoDBFrame(11, 12, 11, false)); + resps[13].push_back(CreateMongoDBFrame(13, 14, 13, false)); + resps[15].push_back(CreateMongoDBFrame(15, 16, 15, false)); + + // Add the order in which the transactions's streamID's were found. + State state = {}; + state.stream_order.push_back({1, false}); + state.stream_order.push_back({3, false}); + state.stream_order.push_back({5, false}); + state.stream_order.push_back({7, false}); + state.stream_order.push_back({9, false}); + state.stream_order.push_back({11, false}); + state.stream_order.push_back({13, false}); + state.stream_order.push_back({15, false}); + + RecordsWithErrorCount result = mongodb::StitchFrames(&reqs, &resps, &state); + EXPECT_EQ(result.error_count, 0); + EXPECT_THAT(result.records, SizeIs(8)); + EXPECT_TRUE(AreAllDequesEmpty(reqs)); + EXPECT_TRUE(AreAllDequesEmpty(resps)); + EXPECT_THAT(state.stream_order, SizeIs(0)); +} + +TEST_F(MongoDBStitchFramesTest, VerifyOnetoNStitching) { + absl::flat_hash_map> reqs; + absl::flat_hash_map> resps; + + // Add requests to map. + reqs[1].push_back(CreateMongoDBFrame(0, 1, 0, false)); + reqs[3].push_back(CreateMongoDBFrame(2, 3, 0, false)); + + // Request frame corresponding to multi frame response message. + reqs[5].push_back(CreateMongoDBFrame(4, 5, 0, false, "request frame body")); + + reqs[9].push_back(CreateMongoDBFrame(8, 9, 0, false)); + reqs[11].push_back(CreateMongoDBFrame(10, 11, 0, false)); + reqs[13].push_back(CreateMongoDBFrame(12, 13, 0, false)); + reqs[15].push_back(CreateMongoDBFrame(14, 15, 0, false)); + reqs[17].push_back(CreateMongoDBFrame(16, 17, 0, false)); + + // Add responses to map. + resps[1].push_back(CreateMongoDBFrame(1, 2, 1, false)); + resps[3].push_back(CreateMongoDBFrame(3, 4, 3, false)); + + // Multi frame response message. + resps[5].push_back(CreateMongoDBFrame(5, 6, 5, true, "response")); + resps[6].push_back(CreateMongoDBFrame(6, 7, 6, true, "frame")); + resps[7].push_back(CreateMongoDBFrame(7, 8, 7, false, "body")); + + resps[9].push_back(CreateMongoDBFrame(9, 10, 9, false)); + resps[11].push_back(CreateMongoDBFrame(11, 12, 11, false)); + resps[13].push_back(CreateMongoDBFrame(13, 14, 13, false)); + resps[15].push_back(CreateMongoDBFrame(15, 16, 15, false)); + resps[17].push_back(CreateMongoDBFrame(17, 18, 17, false)); + + // Add the order in which the transactions's streamID's were found. + State state = {}; + state.stream_order.push_back({1, false}); + state.stream_order.push_back({3, false}); + state.stream_order.push_back({5, false}); + state.stream_order.push_back({9, false}); + state.stream_order.push_back({11, false}); + state.stream_order.push_back({13, false}); + state.stream_order.push_back({15, false}); + state.stream_order.push_back({17, false}); + + RecordsWithErrorCount result = mongodb::StitchFrames(&reqs, &resps, &state); + + EXPECT_EQ(result.error_count, 0); + EXPECT_EQ(result.records[2].req.frame_body, "request frame body "); + EXPECT_EQ(result.records[2].resp.frame_body, "response frame body "); + EXPECT_THAT(result.records, SizeIs(8)); + + EXPECT_TRUE(AreAllDequesEmpty(reqs)); + EXPECT_TRUE(AreAllDequesEmpty(resps)); + EXPECT_THAT(state.stream_order, SizeIs(0)); +} + +TEST_F(MongoDBStitchFramesTest, UnmatchedResponsesAreHandled) { + absl::flat_hash_map> reqs; + absl::flat_hash_map> resps; + + // Add requests to map. + // Missing request frame + reqs[2].push_back(CreateMongoDBFrame(1, 2, 0, false)); + + // Add responses to map; + resps[10].push_back(CreateMongoDBFrame(0, 1, 10, false)); + resps[2].push_back(CreateMongoDBFrame(2, 3, 2, false)); + + // Add the order in which the streamID's were found. + State state = {}; + state.stream_order.push_back({2, false}); + + RecordsWithErrorCount result = mongodb::StitchFrames(&reqs, &resps, &state); + + EXPECT_EQ(result.error_count, 1); + EXPECT_EQ(result.records.size(), 1); + EXPECT_EQ(result.records[0].req.request_id, 2); + EXPECT_EQ(result.records[0].resp.response_to, 2); + + EXPECT_TRUE(AreAllDequesEmpty(reqs)); + EXPECT_TRUE(AreAllDequesEmpty(resps)); + EXPECT_THAT(state.stream_order, SizeIs(0)); +} + +TEST_F(MongoDBStitchFramesTest, UnmatchedRequestsAreNotCleanedUp) { + absl::flat_hash_map> reqs; + absl::flat_hash_map> resps; + + // Add requests to map. + reqs[1].push_back(CreateMongoDBFrame(0, 1, 0, false)); + reqs[2].push_back(CreateMongoDBFrame(1, 2, 0, false)); + reqs[4].push_back(CreateMongoDBFrame(3, 4, 0, false)); + + // Add responses to map. + resps[2].push_back(CreateMongoDBFrame(2, 3, 2, false)); + resps[4].push_back(CreateMongoDBFrame(4, 5, 4, false)); + + State state = {}; + state.stream_order.push_back({1, false}); + state.stream_order.push_back({2, false}); + state.stream_order.push_back({4, false}); + + RecordsWithErrorCount result = mongodb::StitchFrames(&reqs, &resps, &state); + + EXPECT_EQ(result.error_count, 0); + EXPECT_THAT(result.records, SizeIs(2)); + EXPECT_EQ(result.records[0].req.request_id, 2); + EXPECT_EQ(result.records[1].req.request_id, 4); + + EXPECT_EQ(TotalDequeSize(reqs), 1); + EXPECT_TRUE(AreAllDequesEmpty(resps)); + EXPECT_THAT(state.stream_order, SizeIs(1)); +} + +TEST_F(MongoDBStitchFramesTest, MissingHeadFrameInNResponses) { + absl::flat_hash_map> reqs; + absl::flat_hash_map> resps; + + // Add requests to map. + reqs[1].push_back(CreateMongoDBFrame(0, 1, 0, false)); + reqs[6].push_back(CreateMongoDBFrame(5, 6, 0, false)); + + // Add responses to map. + // Missing head frame in the N responses + resps[2].push_back(CreateMongoDBFrame(2, 3, 2, true)); + resps[3].push_back(CreateMongoDBFrame(3, 4, 3, false)); + resps[6].push_back(CreateMongoDBFrame(6, 7, 6, false)); + + State state = {}; + state.stream_order.push_back({1, false}); + state.stream_order.push_back({6, false}); + + RecordsWithErrorCount result = mongodb::StitchFrames(&reqs, &resps, &state); + + EXPECT_EQ(result.error_count, 2); + EXPECT_EQ(result.records.size(), 1); + + EXPECT_EQ(TotalDequeSize(reqs), 1); + EXPECT_TRUE(AreAllDequesEmpty(resps)); + EXPECT_THAT(state.stream_order, SizeIs(1)); +} + +TEST_F(MongoDBStitchFramesTest, MissingFrameInNResponses) { + absl::flat_hash_map> reqs; + absl::flat_hash_map> resps; + + // Add requests to map. + reqs[1].push_back(CreateMongoDBFrame(0, 1, 0, false)); + reqs[6].push_back(CreateMongoDBFrame(5, 6, 0, false)); + + // Add responses to map. + resps[1].push_back(CreateMongoDBFrame(1, 2, 1, true, "frame 1")); + resps[2].push_back(CreateMongoDBFrame(2, 3, 2, true, "frame 2")); + // Missing middle frame in the N responses. + resps[4].push_back(CreateMongoDBFrame(4, 5, 4, false, "frame 4")); + resps[6].push_back(CreateMongoDBFrame(6, 7, 6, false)); + + State state = {}; + state.stream_order.push_back({1, false}); + state.stream_order.push_back({6, false}); + + RecordsWithErrorCount result = mongodb::StitchFrames(&reqs, &resps, &state); + + EXPECT_EQ(result.error_count, 2); + EXPECT_EQ(result.records.size(), 2); + EXPECT_EQ(result.records[0].resp.frame_body, "frame 1 frame 2 "); + + EXPECT_TRUE(AreAllDequesEmpty(reqs)); + EXPECT_TRUE(AreAllDequesEmpty(resps)); + EXPECT_THAT(state.stream_order, SizeIs(0)); +} + +TEST_F(MongoDBStitchFramesTest, MissingTailFrameInNResponses) { + absl::flat_hash_map> reqs; + absl::flat_hash_map> resps; + + // Add requests to map. + reqs[1].push_back(CreateMongoDBFrame(0, 1, 0, false)); + reqs[6].push_back(CreateMongoDBFrame(5, 6, 0, false)); + + // Add responses to map. + resps[1].push_back(CreateMongoDBFrame(1, 2, 1, true, "frame 1")); + resps[2].push_back(CreateMongoDBFrame(2, 3, 2, true, "frame 2")); + resps[3].push_back(CreateMongoDBFrame(3, 4, 3, true, "frame 3")); + // Missing tail frame in the N responses + resps[6].push_back(CreateMongoDBFrame(6, 7, 6, false)); + + State state = {}; + state.stream_order.push_back({1, false}); + state.stream_order.push_back({6, false}); + + RecordsWithErrorCount result = mongodb::StitchFrames(&reqs, &resps, &state); + + EXPECT_EQ(result.error_count, 1); + EXPECT_EQ(result.records.size(), 2); + EXPECT_EQ(result.records[0].resp.frame_body, "frame 1 frame 2 frame 3 "); + + EXPECT_TRUE(AreAllDequesEmpty(reqs)); + EXPECT_TRUE(AreAllDequesEmpty(resps)); + EXPECT_THAT(state.stream_order, SizeIs(0)); +} + +} // namespace mongodb +} // namespace protocols +} // namespace stirling +} // namespace px diff --git a/src/stirling/source_connectors/socket_tracer/protocols/mongodb/types.h b/src/stirling/source_connectors/socket_tracer/protocols/mongodb/types.h index 4ba02858466..b33457ce608 100644 --- a/src/stirling/source_connectors/socket_tracer/protocols/mongodb/types.h +++ b/src/stirling/source_connectors/socket_tracer/protocols/mongodb/types.h @@ -20,7 +20,6 @@ #include #include - #include #include "src/stirling/source_connectors/socket_tracer/protocols/common/event_parser.h" @@ -134,6 +133,7 @@ struct Frame : public FrameBase { bool exhaust_allowed = false; std::vector
sections; std::string op_msg_type; + std::string frame_body; uint32_t checksum = 0; bool consumed = false;