Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 committed Dec 10, 2024
1 parent 401738f commit 06e554f
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 60 deletions.
125 changes: 67 additions & 58 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,57 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
_sink_buffer->set_dependency(state->fragment_instance_id().lo, _queue_dependency, this);
}

if (_part_type == TPartitionType::HASH_PARTITIONED) {
_partition_count = channels.size();
_partitioner =
std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>(
channels.size());
RETURN_IF_ERROR(_partitioner->init(p._texprs));
RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
_profile->add_info_string("Partitioner",
fmt::format("Crc32HashPartitioner({})", _partition_count));
} else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
_partition_count = channels.size();
_partitioner =
std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>(
channels.size());
RETURN_IF_ERROR(_partitioner->init(p._texprs));
RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
_profile->add_info_string("Partitioner",
fmt::format("Crc32HashPartitioner({})", _partition_count));
} else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
_partition_count = channels.size();
_profile->add_info_string("Partitioner",
fmt::format("Crc32HashPartitioner({})", _partition_count));
_partitioner = std::make_unique<vectorized::TabletSinkHashPartitioner>(
_partition_count, p._tablet_sink_txn_id, p._tablet_sink_schema,
p._tablet_sink_partition, p._tablet_sink_location, p._tablet_sink_tuple_id, this);
RETURN_IF_ERROR(_partitioner->init({}));
RETURN_IF_ERROR(_partitioner->prepare(state, {}));
} else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
_partition_count =
channels.size() * config::table_sink_partition_write_max_partition_nums_per_writer;
_partitioner = std::make_unique<vectorized::ScaleWriterPartitioner>(
channels.size(), _partition_count, channels.size(), 1,
config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold /
state->task_num() ==
0
? config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold
: config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold /
state->task_num(),
config::table_sink_partition_write_min_data_processed_rebalance_threshold /
state->task_num() ==
0
? config::table_sink_partition_write_min_data_processed_rebalance_threshold
: config::table_sink_partition_write_min_data_processed_rebalance_threshold /
state->task_num());

RETURN_IF_ERROR(_partitioner->init(p._texprs));
RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
_profile->add_info_string("Partitioner",
fmt::format("Crc32HashPartitioner({})", _partition_count));
}

return Status::OK();
}

Expand Down Expand Up @@ -192,64 +243,6 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
}
}
}
if (_part_type == TPartitionType::HASH_PARTITIONED) {
_partition_count = channels.size();
_partitioner =
std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>(
channels.size());
RETURN_IF_ERROR(_partitioner->init(p._texprs));
RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
_profile->add_info_string("Partitioner",
fmt::format("Crc32HashPartitioner({})", _partition_count));
} else if (_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED) {
_partition_count = channels.size();
_partitioner =
std::make_unique<vectorized::Crc32HashPartitioner<vectorized::ShuffleChannelIds>>(
channels.size());
RETURN_IF_ERROR(_partitioner->init(p._texprs));
RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
_profile->add_info_string("Partitioner",
fmt::format("Crc32HashPartitioner({})", _partition_count));
} else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
_partition_count = channels.size();
_profile->add_info_string("Partitioner",
fmt::format("Crc32HashPartitioner({})", _partition_count));
_partitioner = std::make_unique<vectorized::TabletSinkHashPartitioner>(
_partition_count, p._tablet_sink_txn_id, p._tablet_sink_schema,
p._tablet_sink_partition, p._tablet_sink_location, p._tablet_sink_tuple_id, this);
RETURN_IF_ERROR(_partitioner->init({}));
RETURN_IF_ERROR(_partitioner->prepare(state, {}));
} else if (_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
_partition_count =
channels.size() * config::table_sink_partition_write_max_partition_nums_per_writer;
_partitioner = std::make_unique<vectorized::ScaleWriterPartitioner>(
channels.size(), _partition_count, channels.size(), 1,
config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold /
state->task_num() ==
0
? config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold
: config::table_sink_partition_write_min_partition_data_processed_rebalance_threshold /
state->task_num(),
config::table_sink_partition_write_min_data_processed_rebalance_threshold /
state->task_num() ==
0
? config::table_sink_partition_write_min_data_processed_rebalance_threshold
: config::table_sink_partition_write_min_data_processed_rebalance_threshold /
state->task_num());

RETURN_IF_ERROR(_partitioner->init(*p._t_tablet_sink_exprs));

if (p._output_tuple_id == -1) {
RETURN_IF_ERROR(_partitioner->prepare(state, p._child->row_desc()));
} else {
auto* output_tuple_desc = state->desc_tbl().get_tuple_descriptor(p._output_tuple_id);
auto* output_row_desc =
state->obj_pool()->add(new RowDescriptor(output_tuple_desc, false));
RETURN_IF_ERROR(_partitioner->prepare(state, *output_row_desc));
}
_profile->add_info_string("Partitioner",
fmt::format("Crc32HashPartitioner({})", _partition_count));
}

if (_part_type == TPartitionType::HASH_PARTITIONED ||
_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED ||
Expand Down Expand Up @@ -312,13 +305,29 @@ Status ExchangeSinkOperatorX::init(const TDataSink& tsink) {
if (_part_type == TPartitionType::RANGE_PARTITIONED) {
return Status::InternalError("TPartitionType::RANGE_PARTITIONED should not be used");
}
if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(*_t_tablet_sink_exprs,
_tablet_sink_expr_ctxs));
}
return Status::OK();
}

Status ExchangeSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(DataSinkOperatorX<ExchangeSinkLocalState>::open(state));
_state = state;
_compression_type = state->fragement_transmission_compression_type();
if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
if (_output_tuple_id == -1) {
RETURN_IF_ERROR(
vectorized::VExpr::prepare(_tablet_sink_expr_ctxs, state, _child->row_desc()));
} else {
auto* output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
auto* output_row_desc = _pool->add(new RowDescriptor(output_tuple_desc, false));
RETURN_IF_ERROR(
vectorized::VExpr::prepare(_tablet_sink_expr_ctxs, state, *output_row_desc));
}
RETURN_IF_ERROR(vectorized::VExpr::open(_tablet_sink_expr_ctxs, state));
}
std::vector<InstanceLoId> ins_ids;
for (auto fragment_instance_id : _fragment_instance_ids) {
ins_ids.push_back(fragment_instance_id.lo);
Expand Down
1 change: 1 addition & 0 deletions be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalSt
// In a merge sort scenario, there are only n RPCs, so a shared sink buffer is not needed.
/// TODO: Modify this to let FE handle the judgment instead of BE.
std::shared_ptr<ExchangeSinkBuffer> get_sink_buffer(InstanceLoId sender_ins_id);
vectorized::VExprContextSPtrs& tablet_sink_expr_ctxs() { return _tablet_sink_expr_ctxs; }

private:
friend class ExchangeSinkLocalState;
Expand Down
8 changes: 6 additions & 2 deletions be/src/vec/sink/tablet_sink_hash_partitioner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "vec/sink/tablet_sink_hash_partitioner.h"

#include "pipeline/exec/operator.h"

namespace doris::vectorized {
#include "common/compile_check_begin.h"
TabletSinkHashPartitioner::TabletSinkHashPartitioner(
Expand Down Expand Up @@ -51,9 +53,11 @@ Status TabletSinkHashPartitioner::open(RuntimeState* state) {
_tablet_sink_tuple_desc = state->desc_tbl().get_tuple_descriptor(_tablet_sink_tuple_id);
_tablet_sink_row_desc =
state->obj_pool()->add(new RowDescriptor(_tablet_sink_tuple_desc, false));
_tablet_sink_expr_ctxs.resize(_tablet_sink_expr_ctxs.size());
auto& ctxs =
_local_state->parent()->cast<pipeline::ExchangeSinkOperatorX>().tablet_sink_expr_ctxs();
_tablet_sink_expr_ctxs.resize(ctxs.size());
for (size_t i = 0; i < _tablet_sink_expr_ctxs.size(); i++) {
RETURN_IF_ERROR(_tablet_sink_expr_ctxs[i]->clone(state, _tablet_sink_expr_ctxs[i]));
RETURN_IF_ERROR(ctxs[i]->clone(state, _tablet_sink_expr_ctxs[i]));
}
// if _part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED, we handle the processing of auto_increment column
// on exchange node rather than on TabletWriter
Expand Down

0 comments on commit 06e554f

Please sign in to comment.