diff --git a/src/server/dfly_bench.cc b/src/server/dfly_bench.cc index 79207c2b0edf..8d0d974a7222 100644 --- a/src/server/dfly_bench.cc +++ b/src/server/dfly_bench.cc @@ -205,7 +205,8 @@ class Driver { private: void PopRequest(); void ReceiveFb(); - void ParseRESP(facade::RedisParser* parser, io::IoBuf* io_buf); + void ParseRESP(); + void ParseMC(); struct Req { uint64_t start; @@ -219,6 +220,9 @@ class Driver { fb2::Fiber receive_fb_; queue reqs_; fb2::CondVarAny cnd_; + + facade::RedisParser parser_{1 << 16, false}; + io::IoBuf io_buf_{512}; }; // Per thread client. @@ -333,6 +337,19 @@ void Driver::Connect(unsigned index, const tcp::endpoint& ep) { int yes = 1; CHECK_EQ(0, setsockopt(socket_->native_handle(), IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes))); } + + // TCP Connect does not ensure that the connection was indeed accepted by the server. + // if server backlog is too short the connection will get stuck in the accept queue. + // Therefore, we send a ping command to ensure that every connection got connected. + ec = socket_->Write(io::Buffer("ping\r\n")); + CHECK(!ec); + + uint8_t buf[128]; + auto res_sz = socket_->Recv(io::MutableBytes(buf)); + CHECK(res_sz) << res_sz.error().message(); + string_view resp = io::View(io::Bytes(buf, *res_sz)); + CHECK(absl::EndsWith(resp, "\r\n")) << resp; + receive_fb_ = MakeFiber(fb2::Launch::dispatch, [this] { ReceiveFb(); }); } @@ -385,7 +402,7 @@ void Driver::Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen) { CHECK(!ec) << ec.message(); } - const int finish = absl::GetCurrentTimeNanos(); + int64_t finish = absl::GetCurrentTimeNanos(); VLOG(1) << "Done queuing " << num_reqs_ << " requests, which took " << StrFormat("%.1fs", double(finish - start) / 1000000000) << ". Waiting for server processing"; @@ -426,71 +443,40 @@ void Driver::PopRequest() { } void Driver::ReceiveFb() { - facade::RedisParser parser{1 << 16, false}; - io::IoBuf io_buf{512}; - - unsigned blob_len = 0; - while (true) { - io_buf.EnsureCapacity(256); - auto buf = io_buf.AppendBuffer(); + io_buf_.EnsureCapacity(256); + auto buf = io_buf_.AppendBuffer(); VLOG(2) << "Socket read: " << reqs_.size(); ::io::Result recv_sz = socket_->Recv(buf); if (!recv_sz && FiberSocketBase::IsConnClosed(recv_sz.error())) { + LOG_IF(DFATAL, !reqs_.empty()) + << "Broke with " << reqs_.size() << " requests, received: " << received_; + // clear reqs - to prevent Driver::Run block on them indefinitely. + decltype(reqs_)().swap(reqs_); break; } + CHECK(recv_sz) << recv_sz.error().message(); - io_buf.CommitWrite(*recv_sz); + io_buf_.CommitWrite(*recv_sz); if (protocol == RESP) { - ParseRESP(&parser, &io_buf); + ParseRESP(); } else { // MC_TEXT - while (true) { - string_view line = FindLine(io_buf.InputBuffer()); - if (line.empty()) - break; - CHECK_EQ(line.back(), '\n'); - if (line == "STORED\r\n" || line == "END\r\n") { - PopRequest(); - blob_len = 0; - } else if (absl::StartsWith(line, "VALUE")) { - // last token is a blob length. - auto it = line.rbegin(); - while (it != line.rend() && *it != ' ') - ++it; - size_t len = it - line.rbegin() - 2; - const char* start = &(*it) + 1; - if (!absl::SimpleAtoi(string(start, len), &blob_len)) { - LOG(ERROR) << "Invalid blob len " << line; - return; - } - ++stats_.hit_count; - } else if (absl::StartsWith(line, "SERVER_ERROR")) { - ++stats_.num_errors; - PopRequest(); - blob_len = 0; - } else { - auto handle = socket_->native_handle(); - CHECK_EQ(blob_len + 2, line.size()) << line; - blob_len = 0; - VLOG(2) << "Got line " << handle << ": " << line; - } - io_buf.ConsumeInput(line.size()); - } + ParseMC(); } } VLOG(1) << "ReceiveFb done"; } -void Driver::ParseRESP(facade::RedisParser* parser, io::IoBuf* io_buf) { +void Driver::ParseRESP() { uint32_t consumed = 0; RedisParser::Result result = RedisParser::OK; RespVec parse_args; do { - result = parser->Parse(io_buf->InputBuffer(), &consumed, &parse_args); + result = parser_.Parse(io_buf_.InputBuffer(), &consumed, &parse_args); if (result == RedisParser::OK && !parse_args.empty()) { if (parse_args[0].type == facade::RespExpr::ERROR) { ++stats_.num_errors; @@ -500,10 +486,48 @@ void Driver::ParseRESP(facade::RedisParser* parser, io::IoBuf* io_buf) { parse_args.clear(); PopRequest(); } - io_buf->ConsumeInput(consumed); + io_buf_.ConsumeInput(consumed); } while (result == RedisParser::OK); } +void Driver::ParseMC() { + unsigned blob_len = 0; + + while (true) { + string_view line = FindLine(io_buf_.InputBuffer()); + if (line.empty()) + break; + + CHECK_EQ(line.back(), '\n'); + if (line == "STORED\r\n" || line == "END\r\n") { + PopRequest(); + blob_len = 0; + } else if (absl::StartsWith(line, "VALUE")) { + // last token is a blob length. + auto it = line.rbegin(); + while (it != line.rend() && *it != ' ') + ++it; + size_t len = it - line.rbegin() - 2; + const char* start = &(*it) + 1; + if (!absl::SimpleAtoi(string(start, len), &blob_len)) { + LOG(ERROR) << "Invalid blob len " << line; + return; + } + ++stats_.hit_count; + } else if (absl::StartsWith(line, "SERVER_ERROR")) { + ++stats_.num_errors; + PopRequest(); + blob_len = 0; + } else { + auto handle = socket_->native_handle(); + CHECK_EQ(blob_len + 2, line.size()) << line; + blob_len = 0; + VLOG(2) << "Got line " << handle << ": " << line; + } + io_buf_.ConsumeInput(line.size()); + } +} + void TLocalClient::Connect(tcp::endpoint ep) { VLOG(2) << "Connecting client..."; vector fbs(drivers_.size()); diff --git a/src/server/dfly_main.cc b/src/server/dfly_main.cc index 015b9814be50..9ec89c94881a 100644 --- a/src/server/dfly_main.cc +++ b/src/server/dfly_main.cc @@ -77,7 +77,7 @@ ABSL_FLAG(bool, force_epoll, false, ABSL_FLAG(bool, version_check, true, "If true, Will monitor for new releases on Dragonfly servers once a day."); -ABSL_FLAG(uint16_t, tcp_backlog, 128, "TCP listen(2) backlog parameter."); +ABSL_FLAG(uint16_t, tcp_backlog, 256, "TCP listen(2) backlog parameter."); using namespace util; using namespace facade;