diff --git a/be/src/util/brpc_client_cache.h b/be/src/util/brpc_client_cache.h index 290f2cc3e04747..ebef80f4a6bdfb 100644 --- a/be/src/util/brpc_client_cache.h +++ b/be/src/util/brpc_client_cache.h @@ -126,6 +126,7 @@ class BrpcClientCache { options.connection_group = connection_group; } options.connect_timeout_ms = 2000; + options.timeout_ms = 2000; options.max_retry = 10; std::unique_ptr channel(new brpc::Channel()); diff --git a/thirdparty/patches/brpc-1.9.0-add-gc-on-abalist-and-adjust-gc-size.patch b/thirdparty/patches/brpc-1.9.0-add-gc-on-abalist-and-adjust-gc-size.patch new file mode 100644 index 00000000000000..7287b4a536d18c --- /dev/null +++ b/thirdparty/patches/brpc-1.9.0-add-gc-on-abalist-and-adjust-gc-size.patch @@ -0,0 +1,280 @@ +From fb53ab9245e8570d44a2eeea2ab536841a7876ec Mon Sep 17 00:00:00 2001 +From: BiteTheDDDDt +Date: Tue, 16 Jul 2024 14:21:21 +0800 +Subject: [PATCH] add gc on abalist and adjust gc size + +--- + src/bthread/bthread.cpp | 3 +- + src/bthread/id.cpp | 1 + + src/bthread/list_of_abafree_id.h | 161 ++++++++++++++++++++++++++++--- + 3 files changed, 151 insertions(+), 14 deletions(-) + +diff --git a/src/bthread/bthread.cpp b/src/bthread/bthread.cpp +index 5ac0c3b1..86fd2c90 100644 +--- a/src/bthread/bthread.cpp ++++ b/src/bthread/bthread.cpp +@@ -147,7 +147,8 @@ start_from_non_worker(bthread_t* __restrict tid, + + struct TidTraits { + static const size_t BLOCK_SIZE = 63; +- static const size_t MAX_ENTRIES = 65536; ++ static const size_t MAX_ENTRIES = 655360; ++ static const size_t INIT_GC_SIZE = 65536; + static const bthread_t ID_INIT; + static bool exists(bthread_t id) { return bthread::TaskGroup::exists(id); } + }; +diff --git a/src/bthread/id.cpp b/src/bthread/id.cpp +index 41c49a3f..ba77580a 100644 +--- a/src/bthread/id.cpp ++++ b/src/bthread/id.cpp +@@ -291,6 +291,7 @@ void id_pool_status(std::ostream &os) { + struct IdTraits { + static const size_t BLOCK_SIZE = 63; + static const size_t MAX_ENTRIES = 100000; ++ static const size_t INIT_GC_SIZE = 4096; + static const bthread_id_t ID_INIT; + static bool exists(bthread_id_t id) + { return bthread::id_exists_with_true_negatives(id); } +diff --git a/src/bthread/list_of_abafree_id.h b/src/bthread/list_of_abafree_id.h +index ac2b2234..45043acc 100644 +--- a/src/bthread/list_of_abafree_id.h ++++ b/src/bthread/list_of_abafree_id.h +@@ -22,8 +22,10 @@ + #ifndef BTHREAD_LIST_OF_ABAFREE_ID_H + #define BTHREAD_LIST_OF_ABAFREE_ID_H + +-#include + #include ++#include ++ ++#include "butil/macros.h" + + namespace bthread { + +@@ -48,6 +50,9 @@ namespace bthread { + // // Max #entries. Often has close relationship with concurrency, 65536 + // // is "huge" for most apps. + // static const size_t MAX_ENTRIES = 65536; ++// // Initial GC length, when the number of blocks reaches this length, ++// // start to initiate list GC operation. It will release useless blocks ++// static const size_t INIT_GC_SIZE = 4096; + // + // // Initial value of id. Id with the value is treated as invalid. + // static const Id ID_INIT = ...; +@@ -64,13 +69,14 @@ class ListOfABAFreeId { + public: + ListOfABAFreeId(); + ~ListOfABAFreeId(); +- ++ + // Add an identifier into the list. + int add(Id id); +- ++ + // Apply fn(id) to all identifiers. +- template void apply(const Fn& fn); +- ++ template ++ void apply(const Fn& fn); ++ + // Put #entries of each level into `counts' + // Returns #levels. + size_t get_sizes(size_t* counts, size_t n); +@@ -82,19 +88,31 @@ private: + IdBlock* next; + }; + void forward_index(); ++ ++ struct TempIdBlock { ++ IdBlock* block; ++ uint32_t index; ++ uint32_t nblock; ++ }; ++ ++ int gc(); ++ int add_to_temp_list(TempIdBlock* temp_list, Id id); ++ template ++ int for_each(const Fn& fn); ++ void free_list(IdBlock* block); ++ + IdBlock* _cur_block; + uint32_t _cur_index; + uint32_t _nblock; + IdBlock _head_block; ++ uint32_t _next_gc_size; + }; + + // [impl.] + + template + ListOfABAFreeId::ListOfABAFreeId() +- : _cur_block(&_head_block) +- , _cur_index(0) +- , _nblock(1) { ++ : _cur_block(&_head_block), _cur_index(0), _nblock(1), _next_gc_size(IdTraits::INIT_GC_SIZE) { + for (size_t i = 0; i < IdTraits::BLOCK_SIZE; ++i) { + _head_block.ids[i] = IdTraits::ID_INIT; + } +@@ -140,6 +158,30 @@ int ListOfABAFreeId::add(Id id) { + } + saved_pos[i] = pos; + } ++ // If we don't expect a GC to occur in abalist, then an error is reported and EAGAIN is returned. ++ if (_nblock * IdTraits::BLOCK_SIZE > IdTraits::MAX_ENTRIES) { ++ return EAGAIN; ++ } ++ // If the number of blocks exceeds the minimum GC length, start the GC operation ++ if (_nblock * IdTraits::BLOCK_SIZE > _next_gc_size) { ++ uint32_t before_gc_blocks = _nblock; ++ int rc = gc(); ++ // To avoid frequent GC operations, we only let the GC be effective enough to continue the GC. ++ // otherwise we let the next GC occur length * 2. ++ // ++ // Condition for a GC to be sufficiently efficient: the number of blocks ++ // retained after the GC is 1/4 of the previous one. ++ if ((before_gc_blocks - _nblock) * IdTraits::BLOCK_SIZE < (_next_gc_size - (_next_gc_size >> 2))) { ++ _next_gc_size <<= 1; ++ // We want to make sure that GC must occur before MAX_ENTRIES. ++ static_assert(IdTraits::MAX_ENTRIES > IdTraits::BLOCK_SIZE * 2, "MAX_ENTRIES should be greater than 2 * IdTraits::BLOCK_SIZE"); ++ if (_next_gc_size >= IdTraits::MAX_ENTRIES) { ++ _next_gc_size = IdTraits::MAX_ENTRIES - IdTraits::BLOCK_SIZE * 2; ++ } ++ } ++ ++ return rc; ++ } + // The list is considered to be "crowded", add a new block and scatter + // the conflict identifiers by inserting an empty entry after each of + // them, so that even if the identifiers are still valid when we walk +@@ -152,9 +194,6 @@ int ListOfABAFreeId::add(Id id) { + // + // [..xxxx....] -> [......yyyy] -> [..........] + // block A new block block B +- if (_nblock * IdTraits::BLOCK_SIZE > IdTraits::MAX_ENTRIES) { +- return EAGAIN; +- } + IdBlock* new_block = new (std::nothrow) IdBlock; + if (NULL == new_block) { + return ENOMEM; +@@ -188,6 +227,93 @@ int ListOfABAFreeId::add(Id id) { + return 0; + } + ++template ++int ListOfABAFreeId::gc() { ++ IdBlock* new_block = new (std::nothrow) IdBlock; ++ if (NULL == new_block) { ++ return ENOMEM; ++ } ++ // reset head block ++ for (size_t i = 0; i < IdTraits::BLOCK_SIZE; ++i) { ++ new_block->ids[i] = IdTraits::ID_INIT; ++ } ++ new_block->next = NULL; ++ ++ TempIdBlock tmp_id_block; ++ tmp_id_block.block = new_block; ++ tmp_id_block.nblock = 1; ++ tmp_id_block.index = 0; ++ ++ // Add each element of the old list to the new list ++ int rc = for_each([&](Id id) { ++ int rc; ++ rc = add_to_temp_list(&tmp_id_block, id); ++ if (rc != 0) { ++ return rc; ++ } ++ rc = add_to_temp_list(&tmp_id_block, IdTraits::ID_INIT); ++ if (rc != 0) { ++ return rc; ++ } ++ return 0; ++ }); ++ ++ if (rc != 0) { ++ free_list(new_block); ++ return rc; ++ } ++ ++ // reset head block ++ for (size_t i = 0; i < IdTraits::BLOCK_SIZE; ++i) { ++ _head_block.ids[i] = IdTraits::ID_INIT; ++ } ++ ++ free_list(_head_block.next); ++ _cur_block = tmp_id_block.block; ++ _cur_index = tmp_id_block.index; ++ // nblock and head_block ++ _nblock = tmp_id_block.nblock + 1; ++ _head_block.next = new_block; ++ ++ return 0; ++} ++ ++template ++int ListOfABAFreeId::add_to_temp_list(TempIdBlock* block_list, Id id) { ++ block_list->block->ids[block_list->index++] = id; ++ // add new list ++ if (block_list->index == IdTraits::BLOCK_SIZE) { ++ block_list->index = 0; ++ block_list->nblock++; ++ block_list->block->next = new (std::nothrow) IdBlock; ++ if (NULL == block_list->block->next) { ++ return ENOMEM; ++ } ++ block_list->block = block_list->block->next; ++ for (size_t i = 0; i < IdTraits::BLOCK_SIZE; ++i) { ++ block_list->block->ids[i] = IdTraits::ID_INIT; ++ } ++ block_list->block->next = NULL; ++ } ++ return 0; ++} ++ ++template ++template ++int ListOfABAFreeId::for_each(const Fn& fn) { ++ for (IdBlock* p = &_head_block; p != NULL; p = p->next) { ++ for (size_t i = 0; i < IdTraits::BLOCK_SIZE; ++i) { ++ if (p->ids[i] != IdTraits::ID_INIT && IdTraits::exists(p->ids[i])) { ++ int rc = fn(p->ids[i]); ++ if (rc != 0) { ++ return rc; ++ } ++ } ++ } ++ } ++ return 0; ++} ++ + template + template + void ListOfABAFreeId::apply(const Fn& fn) { +@@ -200,6 +326,15 @@ void ListOfABAFreeId::apply(const Fn& fn) { + } + } + ++template ++void ListOfABAFreeId::free_list(IdBlock* p) { ++ for (; p != NULL;) { ++ IdBlock* saved_next = p->next; ++ delete p; ++ p = saved_next; ++ } ++} ++ + template + size_t ListOfABAFreeId::get_sizes(size_t* cnts, size_t n) { + if (n == 0) { +@@ -210,6 +345,6 @@ size_t ListOfABAFreeId::get_sizes(size_t* cnts, size_t n) { + return 1; + } + +-} // namespace bthread ++} // namespace bthread + +-#endif // BTHREAD_LIST_OF_ABAFREE_ID_H ++#endif // BTHREAD_LIST_OF_ABAFREE_ID_H +-- +2.31.1 +