Skip to content

Commit

Permalink
Merge branch 'master' into niutaofan
Browse files Browse the repository at this point in the history
  • Loading branch information
niutaofan authored Dec 10, 2024
2 parents d905858 + 8a14454 commit 3fe644a
Show file tree
Hide file tree
Showing 52 changed files with 601 additions and 399 deletions.
39 changes: 0 additions & 39 deletions be/src/olap/hll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -367,43 +367,4 @@ int64_t HyperLogLog::estimate_cardinality() const {
return (int64_t)(estimate + 0.5);
}

void HllSetResolver::parse() {
// skip LengthValueType
char* pdata = _buf_ref;
_set_type = (HllDataType)pdata[0];
char* sparse_data = nullptr;
switch (_set_type) {
case HLL_DATA_EXPLICIT:
// first byte : type
// second~five byte : hash values's number
// five byte later : hash value
_explicit_num = (ExplicitLengthValueType)(pdata[sizeof(SetTypeValueType)]);
_explicit_value =
(uint64_t*)(pdata + sizeof(SetTypeValueType) + sizeof(ExplicitLengthValueType));
break;
case HLL_DATA_SPARSE:
// first byte : type
// second ~(2^HLL_COLUMN_PRECISION)/8 byte : bitmap mark which is not zero
// 2^HLL_COLUMN_PRECISION)/8 + 1以后value
_sparse_count = (SparseLengthValueType*)(pdata + sizeof(SetTypeValueType));
sparse_data = pdata + sizeof(SetTypeValueType) + sizeof(SparseLengthValueType);
for (int i = 0; i < *_sparse_count; i++) {
auto* index = (SparseIndexType*)sparse_data;
sparse_data += sizeof(SparseIndexType);
auto* value = (SparseValueType*)sparse_data;
_sparse_map[*index] = *value;
sparse_data += sizeof(SetTypeValueType);
}
break;
case HLL_DATA_FULL:
// first byte : type
// second byte later : hll register value
_full_value_position = pdata + sizeof(SetTypeValueType);
break;
default:
// HLL_DATA_EMPTY
break;
}
}

} // namespace doris
53 changes: 0 additions & 53 deletions be/src/olap/hll.h
Original file line number Diff line number Diff line change
Expand Up @@ -303,57 +303,4 @@ class HyperLogLog {
uint8_t* _registers = nullptr;
};

// todo(kks): remove this when dpp_sink class was removed
class HllSetResolver {
public:
HllSetResolver() = default;

~HllSetResolver() = default;

using SetTypeValueType = uint8_t;
using ExplicitLengthValueType = uint8_t;
using SparseLengthValueType = int32_t;
using SparseIndexType = uint16_t;
using SparseValueType = uint8_t;

// only save pointer
void init(char* buf, int len) {
this->_buf_ref = buf;
this->_buf_len = len;
}

// hll set type
HllDataType get_hll_data_type() { return _set_type; }

// explicit value num
int get_explicit_count() const { return (int)_explicit_num; }

// get explicit index value 64bit
uint64_t get_explicit_value(int index) {
if (index >= _explicit_num) {
return -1;
}
return _explicit_value[index];
}

// get full register value
char* get_full_value() { return _full_value_position; }

// get (index, value) map
std::map<SparseIndexType, SparseValueType>& get_sparse_map() { return _sparse_map; }

// parse set , call after copy() or init()
void parse();

private:
char* _buf_ref = nullptr; // set
int _buf_len {}; // set len
HllDataType _set_type {}; //set type
char* _full_value_position = nullptr;
uint64_t* _explicit_value = nullptr;
ExplicitLengthValueType _explicit_num {};
std::map<SparseIndexType, SparseValueType> _sparse_map;
SparseLengthValueType* _sparse_count;
};

} // namespace doris
6 changes: 2 additions & 4 deletions be/src/runtime/buffer_control_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,10 @@
#include <utility>
#include <vector>

#include "arrow/record_batch.h"
#include "arrow/type_fwd.h"
#include "pipeline/dependency.h"
#include "runtime/thread_context.h"
#include "util/runtime_profile.h"
#include "util/string_util.h"
#include "util/thrift_util.h"
#include "vec/core/block.h"

Expand Down Expand Up @@ -149,8 +147,8 @@ void GetArrowResultBatchCtx::on_data(
delete this;
}

BufferControlBlock::BufferControlBlock(const TUniqueId& id, int buffer_size, RuntimeState* state)
: _fragment_id(id),
BufferControlBlock::BufferControlBlock(TUniqueId id, int buffer_size, RuntimeState* state)
: _fragment_id(std::move(id)),
_is_close(false),
_is_cancelled(false),
_buffer_limit(buffer_size),
Expand Down
10 changes: 6 additions & 4 deletions be/src/runtime/buffer_control_block.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
#include <cctz/time_zone.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/Types_types.h>
#include <stdint.h>

#include <atomic>
#include <condition_variable>
#include <cstdint>
#include <deque>
#include <list>
#include <memory>
Expand All @@ -34,7 +34,6 @@
#include "common/status.h"
#include "runtime/query_statistics.h"
#include "runtime/runtime_state.h"
#include "util/hash_util.hpp"

namespace google::protobuf {
class Closure;
Expand Down Expand Up @@ -98,13 +97,15 @@ struct GetArrowResultBatchCtx {
// buffer used for result customer and producer
class BufferControlBlock {
public:
BufferControlBlock(const TUniqueId& id, int buffer_size, RuntimeState* state);
BufferControlBlock(TUniqueId id, int buffer_size, RuntimeState* state);
~BufferControlBlock();

Status init();
// try to consume _waiting_rpc or make data waiting in _fe_result_batch_queue. try to combine block to reduce rpc first.
Status add_batch(RuntimeState* state, std::unique_ptr<TFetchDataResult>& result);
Status add_arrow_batch(RuntimeState* state, std::shared_ptr<vectorized::Block>& result);

// if there's Block waiting in _fe_result_batch_queue, send it(by on_data). otherwise make a rpc wait in _waiting_rpc.
void get_batch(GetResultBatchCtx* ctx);
// for ArrowFlightBatchLocalReader
Status get_arrow_batch(std::shared_ptr<vectorized::Block>* result,
Expand Down Expand Up @@ -150,7 +151,7 @@ class BufferControlBlock {
const int _buffer_limit;
int64_t _packet_num;

// blocking queue for batch
// Producer. blocking queue for result batch waiting to sent to FE by _waiting_rpc.
FeResultQueue _fe_result_batch_queue;
ArrowFlightResultQueue _arrow_flight_result_batch_queue;
// for arrow flight
Expand All @@ -163,6 +164,7 @@ class BufferControlBlock {
// TODO, waiting for data will block pipeline, so use a request pool to save requests waiting for data.
std::condition_variable _arrow_data_arrival;

// Consumer. RPCs which FE waiting for result. when _fe_result_batch_queue filled, the rpc could be sent.
std::deque<GetResultBatchCtx*> _waiting_rpc;
std::deque<GetArrowResultBatchCtx*> _waiting_arrow_result_batch_rpc;

Expand Down
21 changes: 8 additions & 13 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,16 @@
#include <gen_cpp/Types_types.h>
#include <gen_cpp/internal_service.pb.h>
#include <pthread.h>
#include <stddef.h>
#include <sys/time.h>
#include <thrift/TApplicationException.h>
#include <thrift/Thrift.h>
#include <thrift/protocol/TDebugProtocol.h>
#include <thrift/transport/TTransportException.h>
#include <time.h>
#include <unistd.h>

#include <algorithm>
#include <atomic>
#include <cstddef>
#include <ctime>

#include "common/status.h"
// IWYU pragma: no_include <bits/chrono.h>
Expand All @@ -58,19 +57,16 @@
#include <unordered_set>
#include <utility>

#include "cloud/config.h"
#include "common/config.h"
#include "common/logging.h"
#include "common/object_pool.h"
#include "common/utils.h"
#include "gutil/strings/substitute.h"
#include "io/fs/stream_load_pipe.h"
#include "pipeline/pipeline_fragment_context.h"
#include "runtime/client_cache.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/frontend_info.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/primitive_type.h"
#include "runtime/query_context.h"
#include "runtime/runtime_filter_mgr.h"
Expand All @@ -89,24 +85,20 @@
#include "util/debug_points.h"
#include "util/debug_util.h"
#include "util/doris_metrics.h"
#include "util/hash_util.hpp"
#include "util/mem_info.h"
#include "util/network_util.h"
#include "util/pretty_printer.h"
#include "util/runtime_profile.h"
#include "util/thread.h"
#include "util/threadpool.h"
#include "util/thrift_util.h"
#include "util/uid_util.h"
#include "util/url_coding.h"
#include "vec/runtime/shared_hash_table_controller.h"
#include "vec/runtime/vdatetime_value.h"

namespace doris {

DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_instance_count, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(timeout_canceled_fragment_count, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_thread_pool_queue_size, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_thread_pool_num_active_threads, MetricUnit::NOUNIT);
bvar::LatencyRecorder g_fragmentmgr_prepare_latency("doris_FragmentMgr", "prepare");

bvar::Adder<uint64_t> g_fragment_executing_count("fragment_executing_count");
Expand Down Expand Up @@ -184,7 +176,7 @@ static Status _do_fetch_running_queries_rpc(const FrontendInfo& fe_info,
}

// Avoid logic error in frontend.
if (rpc_result.__isset.status == false || rpc_result.status.status_code != TStatusCode::OK) {
if (!rpc_result.__isset.status || rpc_result.status.status_code != TStatusCode::OK) {
LOG_WARNING("Failed to fetch running queries from {}, reason: {}",
PrintThriftNetworkAddress(fe_info.info.coordinator_address),
doris::to_string(rpc_result.status.status_code));
Expand All @@ -193,7 +185,7 @@ static Status _do_fetch_running_queries_rpc(const FrontendInfo& fe_info,
doris::to_string(rpc_result.status.status_code));
}

if (rpc_result.__isset.running_queries == false) {
if (!rpc_result.__isset.running_queries) {
return Status::InternalError("Failed to fetch running queries from {}, reason: {}",
PrintThriftNetworkAddress(fe_info.info.coordinator_address),
"running_queries is not set");
Expand Down Expand Up @@ -254,6 +246,8 @@ FragmentMgr::FragmentMgr(ExecEnv* exec_env)

REGISTER_HOOK_METRIC(fragment_thread_pool_queue_size,
[this]() { return _thread_pool->get_queue_size(); });
REGISTER_HOOK_METRIC(fragment_thread_pool_num_active_threads,
[this]() { return _thread_pool->num_active_threads(); });
CHECK(s.ok()) << s.to_string();
}

Expand All @@ -262,6 +256,7 @@ FragmentMgr::~FragmentMgr() = default;
void FragmentMgr::stop() {
DEREGISTER_HOOK_METRIC(fragment_instance_count);
DEREGISTER_HOOK_METRIC(fragment_thread_pool_queue_size);
DEREGISTER_HOOK_METRIC(fragment_thread_pool_num_active_threads);
_stop_background_threads_latch.count_down();
if (_cancel_thread) {
_cancel_thread->join();
Expand Down
3 changes: 1 addition & 2 deletions be/src/runtime/fragment_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@
#include <gen_cpp/QueryPlanExtra_types.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/types.pb.h>
#include <stdint.h>

#include <condition_variable>
#include <cstdint>
#include <functional>
#include <iosfwd>
#include <memory>
Expand Down
14 changes: 5 additions & 9 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -665,15 +665,11 @@ void PInternalService::cancel_plan_fragment(google::protobuf::RpcController* /*c
void PInternalService::fetch_data(google::protobuf::RpcController* controller,
const PFetchDataRequest* request, PFetchDataResult* result,
google::protobuf::Closure* done) {
bool ret = _heavy_work_pool.try_offer([this, controller, request, result, done]() {
brpc::Controller* cntl = static_cast<brpc::Controller*>(controller);
GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done);
_exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
});
if (!ret) {
offer_failed(result, done, _heavy_work_pool);
return;
}
// fetch_data is a light operation which will put a request rather than wait inplace when there's no data ready.
// when there's data ready, use brpc to send. there's queue in brpc service. won't take it too long.
auto* cntl = static_cast<brpc::Controller*>(controller);
auto* ctx = new GetResultBatchCtx(cntl, result, done);
_exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
}

void PInternalService::fetch_arrow_data(google::protobuf::RpcController* controller,
Expand Down
1 change: 1 addition & 0 deletions be/src/util/doris_metrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ class DorisMetrics {
UIntGauge* send_batch_thread_pool_thread_num = nullptr;
UIntGauge* send_batch_thread_pool_queue_size = nullptr;
UIntGauge* fragment_thread_pool_queue_size = nullptr;
UIntGauge* fragment_thread_pool_num_active_threads = nullptr;

// Upload metrics
UIntGauge* upload_total_byte = nullptr;
Expand Down
21 changes: 0 additions & 21 deletions be/src/util/slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -302,27 +302,6 @@ inline int Slice::compare(const Slice& b) const {
return r;
}

/// @brief STL map whose keys are Slices.
///
/// An example of usage:
/// @code
/// typedef SliceMap<int>::type MySliceMap;
///
/// MySliceMap my_map;
/// my_map.insert(MySliceMap::value_type(a, 1));
/// my_map.insert(MySliceMap::value_type(b, 2));
/// my_map.insert(MySliceMap::value_type(c, 3));
///
/// for (const MySliceMap::value_type& pair : my_map) {
/// ...
/// }
/// @endcode
template <typename T>
struct SliceMap {
/// A handy typedef for the slice map with appropriate comparison operator.
typedef std::map<Slice, T, Slice::Comparator> type;
};

// A move-only type which manage the lifecycle of externally allocated data.
// Unlike std::unique_ptr<uint8_t[]>, OwnedSlice remembers the size of data so that clients can access
// the underlying buffer as a Slice.
Expand Down
Loading

0 comments on commit 3fe644a

Please sign in to comment.