Skip to content

Commit

Permalink
add reponse led sorting for result vector, refactor cql stitcher for …
Browse files Browse the repository at this point in the history
…clarity

Signed-off-by: Benjamin Kilimnik <[email protected]>
  • Loading branch information
benkilimnik committed Oct 6, 2023
1 parent ec2b12e commit 3fd200a
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

#pragma once

#include <deque>
#include <absl/container/flat_hash_map.h>
#include <deque>
#include <string>
#include <vector>

Expand Down
108 changes: 47 additions & 61 deletions src/stirling/source_connectors/socket_tracer/protocols/cql/stitcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include "src/stirling/source_connectors/socket_tracer/protocols/cql/stitcher.h"

#include <algorithm>
#include <deque>
#include <string>
#include <utility>
Expand Down Expand Up @@ -343,25 +344,21 @@ StatusOr<Record> ProcessReqRespPair(Frame* req_frame, Frame* resp_frame) {
return r;
}

StatusOr<Record> ProcessSolitaryResp(Frame* resp_frame) {
Record r;

Status ProcessSolitaryResp(Frame* resp_frame, Record* r) {
// For now, Event is the only supported solitary response.
// If this ever changes, the code below needs to be adapted.
CTX_DCHECK(resp_frame->hdr.opcode == Opcode::kEvent);

// Make a fake request to go along with the response.
// - Use REGISTER op, since that was what set up the events in the first place.
// - Use response timestamp, so any calculated latencies are reported as 0.
r.req.op = ReqOp::kRegister;
r.req.msg = "-";
r.req.timestamp_ns = resp_frame->timestamp_ns;
r->req.op = ReqOp::kRegister;
r->req.msg = "-";
r->req.timestamp_ns = resp_frame->timestamp_ns;

// A little inefficient because it will go through a switch statement again,
// when we actually know the op. But keep it this way for consistency.
PX_RETURN_IF_ERROR(ProcessResp(resp_frame, &r.resp));

return r;
return ProcessResp(resp_frame, &r->resp);
}

// Currently StitchFrames() uses a response-led matching algorithm.
Expand All @@ -378,34 +375,18 @@ RecordsWithErrorCount<Record> StitchFrames(
std::vector<Record> entries;
int error_count = 0;

// TODO: record time for loop with different maps
// 1. std::map
// 2. absl::

// use the python demo that loops through streams
// consider using prometheus to log this data

// One thing to note is that the order in which records for a particular stream
// are appended to the result vector is not guaranteed.
// Even if the map was sorted, for protocols like Mongo, the key could be any number,
// so we'd iterate over it in an arbitrary order. Within a particular transaction/stream,
// order by timestamp is preserved. But each transaction would show up in any order.

// iterate through all deques of responses associated with a specific streamID and find the
// matching request
// for (auto& [stream_id, resp_dequeue] : responses) {
for (auto it = responses->begin(); it != responses->end(); it++) {
cass::stream_id_t stream_id = it->first;
std::deque<cass::Frame>& resp_frames = it->second;

for (auto& [stream_id, resp_deque] : *responses) {
bool kEventHandled = false;
for (cass::Frame& resp_frame : resp_frames) {
for (cass::Frame& resp_frame : resp_deque) {
// Event responses are special: they have no request.
if (resp_frame.hdr.opcode == Opcode::kEvent) {
kEventHandled = true;
StatusOr<Record> record_status = ProcessSolitaryResp(&resp_frame);
Record record;
Status record_status = ProcessSolitaryResp(&resp_frame, &record);
if (record_status.ok()) {
entries.push_back(record_status.ValueOrDie());
entries.push_back(std::move(record));
} else {
VLOG(1) << record_status.ToString();
++error_count;
Expand All @@ -418,7 +399,7 @@ RecordsWithErrorCount<Record> StitchFrames(
VLOG(1) << absl::Substitute("Could not find any requests for stream = $0", stream_id);
// if we don't find a matching request, we can't do anything with this response
// so clean it up
resp_frames.clear();
resp_deque.clear();
if (!kEventHandled) {
++error_count;
}
Expand All @@ -427,31 +408,31 @@ RecordsWithErrorCount<Record> StitchFrames(

// we found a potential set of requests for this stream ID
std::deque<cass::Frame>& req_frames = pos->second;
std::deque<uint64_t> req_timestamps = std::deque<uint64_t>();
// get just the timestamps for matching with responses
for (cass::Frame& req_frame : req_frames) {
req_timestamps.push_back(req_frame.timestamp_ns);
}

uint64_t latest_resp_ts = 0;
// go through the responses for this stream ID and check for requests
for (cass::Frame& resp_frame : resp_frames) {
for (cass::Frame& resp_frame : resp_deque) {
latest_resp_ts = resp_frame.timestamp_ns;
// This returns the first request timestamp that is JUST BEFORE the response timestamp
// Upper bound returns the first request timestamps GREATER than the response timestamp; we
// want to get the one right before
auto stream_it =
std::upper_bound(req_timestamps.begin(), req_timestamps.end(), resp_frame.timestamp_ns) -
std::upper_bound(
req_frames.begin(), req_frames.end(), resp_frame.timestamp_ns,
[](const uint64_t ts, const cass::Frame& frame) { return ts < frame.timestamp_ns; }) -
1;
const size_t req_index = stream_it - req_timestamps.begin();

// Responses should always have a more recent timestamp than the first request. If this
// condition is triggered we should not attempt to match this frame. Since responses are
// cleared during StitchFrames this will get cleaned up during the current iteration.
if (stream_it + 1 == req_timestamps.begin()) {
VLOG(1) << "Warning: Unable to find request that is earlier than response: " << resp_frame.ToString();
if (stream_it == req_frames.begin() && stream_it->timestamp_ns > resp_frame.timestamp_ns) {
VLOG(1) << "Warning: Unable to find request that is earlier than response: "
<< resp_frame.ToString();
continue;
}
cass::Frame& req_frame = req_frames[req_index];

cass::Frame& req_frame = *stream_it; // dereference the iterator to get the frame

VLOG(2) << absl::Substitute("req_op=$0 msg=$1", magic_enum::enum_name(req_frame.hdr.opcode),
req_frame.msg);
StatusOr<Record> record_status = ProcessReqRespPair(&req_frame, &resp_frame);
Expand All @@ -468,29 +449,34 @@ RecordsWithErrorCount<Record> StitchFrames(
size_t delete_idx = req_frames.size();
bool found_unconsumed = false;
for (const auto& [idx, frame] : Enumerate(req_frames)) {
if (frame.consumed) {
continue;
}
// Mark requests as discarded that will never match. These frames will be deleted in future
// StitchFrames iterations once they bubble up to the front of the deque and form a contiguous
// range with the consumed frames. This is done to avoid the bookeeping necessary to delete
// multiple ranges of indices in the deque (which occur when responses are lost).
if (frame.discarded || frame.timestamp_ns < latest_resp_ts) {
error_count++;
frame.discarded = true;
} else if (!found_unconsumed) {
delete_idx = idx;
found_unconsumed = true;
break;
}
if (frame.consumed) {
continue;
}
// Mark requests as discarded that will never match. These frames will be deleted in future
// StitchFrames iterations once they bubble up to the front of the deque and form a contiguous
// range with the consumed frames. This is done to avoid the bookeeping necessary to delete
// multiple ranges of indices in the deque (which occur when responses are lost).
if (frame.discarded || frame.timestamp_ns < latest_resp_ts) {
error_count++;
frame.discarded = true;
} else if (!found_unconsumed) {
delete_idx = idx;
found_unconsumed = true;
break;
}
}
req_frames.erase(req_frames.begin(), req_frames.begin() + delete_idx);
resp_frames.clear();
resp_deque.clear();
}

// Sort the entries in the result vector to ensure correct ordering of transactions (i.e. streams).
// Sort in descending order of timestamp (earliest records come first)
// sort(entries.begin(), entries.end(), greater<int>());
// The order in which records for a particular stream are appended to the result vector is not
// guaranteed. Even if the map was ordered, the stream_ids may not be monotonically increasing.
// Within a particular stream, frames are ordered by timestamp, but transactions for different
// streams could be appended in any order. As a result, we sort the entries in descending order
// (most recent records come first). For CQL, we do response-led sorting.
std::sort(entries.begin(), entries.end(), [](const Record& a, const Record& b) {
return a.resp.timestamp_ns > b.resp.timestamp_ns;
});

return {entries, error_count};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,9 @@ namespace cass {
* @param resp_frames: deque of all response frames.
* @return A vector of entries to be appended to table store.
*/
RecordsWithErrorCount<Record> StitchFrames(absl::flat_hash_map<stream_id_t, std::deque<Frame>>* req_frames,
absl::flat_hash_map<stream_id_t, std::deque<Frame>>* resp_frames);
RecordsWithErrorCount<Record> StitchFrames(
absl::flat_hash_map<stream_id_t, std::deque<Frame>>* req_frames,
absl::flat_hash_map<stream_id_t, std::deque<Frame>>* resp_frames);

} // namespace cass

Expand Down

0 comments on commit 3fd200a

Please sign in to comment.