Skip to content

Commit

Permalink
[Refactor] add a config to ignore rpc overcrowded problem (#47332)
Browse files Browse the repository at this point in the history
Signed-off-by: stdpain <[email protected]>

Signed-off-by: stdpain <[email protected]>
  • Loading branch information
stdpain committed Jul 11, 2024
1 parent e3363f6 commit dca5344
Show file tree
Hide file tree
Showing 4 changed files with 15 additions and 1 deletion.
6 changes: 6 additions & 0 deletions be/src/common/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -602,6 +602,12 @@ CONF_Int64(brpc_max_body_size, "2147483648");
CONF_Int64(brpc_socket_max_unwritten_bytes, "1073741824");
// brpc connection types, "single", "pooled", "short".
CONF_String_enum(brpc_connection_type, "single", "single,pooled,short");
// If the amount of data to be sent by a single channel of brpc exceeds brpc_socket_max_unwritten_bytes
// it will cause rpc to report an error. We add configuration to ignore rpc overload.
// This may cause process memory usage to rise.
// In the future we need to count the memory on each channel of the rpc. To do application layer rpc flow limiting.
CONF_mBool(brpc_query_ignore_overcrowded, "false");
CONF_mBool(brpc_load_ignore_overcrowded, "true");

// Max number of txns for every txn_partition_map in txn manager.
// this is a self-protection to avoid too many txns saving in manager.
Expand Down
1 change: 1 addition & 0 deletions be/src/exec/pipeline/exchange/sink_buffer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,7 @@ Status SinkBuffer::_try_to_send_rpc(const TUniqueId& instance_id, const std::fun

closure->cntl.Reset();
closure->cntl.set_timeout_ms(_brpc_timeout_ms);
SET_IGNORE_OVERCROWDED(closure->cntl, query);

Status st;
if (bthread_self()) {
Expand Down
7 changes: 7 additions & 0 deletions be/src/service/brpc.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,10 @@
#include <butil/strings/string_piece.h>

#include "common/compiler_util.h"
#include "common/config.h"

// ignore brpc overcrowded error
#define SET_IGNORE_OVERCROWDED(ctnl, module) \
if (config::brpc_##module##_ignore_overcrowded) { \
ctnl.ignore_eovercrowded(); \
}
2 changes: 1 addition & 1 deletion be/src/storage/segment_replicate_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ void ReplicateChannel::_send_request(SegmentPB* segment, butil::IOBuf& data, boo
_closure->ref();
_closure->reset();
_closure->cntl.set_timeout_ms(_opt->timeout_ms);
_closure->cntl.ignore_eovercrowded();
SET_IGNORE_OVERCROWDED(_closure->cntl, load);

if (segment != nullptr) {
request.set_allocated_segment(segment);
Expand Down

0 comments on commit dca5344

Please sign in to comment.