Skip to content

Commit

Permalink
[refactor](shuffle) Unify all hash-partition based shuffling
Browse files Browse the repository at this point in the history
  • Loading branch information
Gabriel39 committed Dec 10, 2024
1 parent ed11252 commit 56801f4
Show file tree
Hide file tree
Showing 10 changed files with 520 additions and 310 deletions.
249 changes: 28 additions & 221 deletions be/src/pipeline/exec/exchange_sink_operator.cpp

Large diffs are not rendered by default.

55 changes: 8 additions & 47 deletions be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include "common/status.h"
#include "exchange_sink_buffer.h"
#include "operator.h"
#include "pipeline/shuffle/writer.h"
#include "vec/sink/scale_writer_partitioning_exchanger.hpp"
#include "vec/sink/vdata_stream_sender.h"

Expand All @@ -39,20 +40,6 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
ENABLE_FACTORY_CREATOR(ExchangeSinkLocalState);
using Base = PipelineXSinkLocalState<>;

private:
class HashPartitionFunction {
public:
HashPartitionFunction(vectorized::PartitionerBase* partitioner)
: _partitioner(partitioner) {}

int get_partition(vectorized::Block* block, int position) {
return _partitioner->get_channel_ids().get<uint32_t>()[position];
}

private:
vectorized::PartitionerBase* _partitioner;
};

public:
ExchangeSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
: Base(parent, state), _serializer(this) {
Expand Down Expand Up @@ -106,19 +93,16 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
std::string name_suffix() override;
segment_v2::CompressionTypePB compression_type() const;
std::string debug_string(int indentation_level) const override;
static Status empty_callback_function(void* sender, TCreatePartitionResult* result) {
return Status::OK();
}
Status _send_new_partition_batch(vectorized::Block* input_block);
RuntimeProfile::Counter* send_new_partition_timer() { return _send_new_partition_timer; }
RuntimeProfile::Counter* add_partition_request_timer() { return _add_partition_request_timer; }
RuntimeProfile::Counter* split_block_hash_compute_timer() { return _split_block_hash_compute_timer; }
RuntimeProfile::Counter* distribute_rows_into_channels_timer() { return _distribute_rows_into_channels_timer; }
std::vector<std::shared_ptr<vectorized::Channel>> channels;
int current_channel_idx {0}; // index of current channel to send to if _random == true
bool only_local_exchange {false};

void on_channel_finished(InstanceLoId channel_id);

// for external table sink hash partition
std::unique_ptr<vectorized::ScaleWriterPartitioningExchanger<HashPartitionFunction>>
scale_writer_partitioning_exchanger;
vectorized::PartitionerBase* partitioner() const { return _partitioner.get(); }

private:
friend class ExchangeSinkOperatorX;
Expand Down Expand Up @@ -176,28 +160,16 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {
*/
std::vector<std::shared_ptr<Dependency>> _local_channels_dependency;
std::unique_ptr<vectorized::PartitionerBase> _partitioner;
std::unique_ptr<Writer> _writer;
size_t _partition_count;

std::shared_ptr<Dependency> _finish_dependency;

// for shuffle data by partition and tablet
int64_t _txn_id = -1;
vectorized::VExprContextSPtrs _tablet_sink_expr_ctxs;
std::unique_ptr<VOlapTablePartitionParam> _vpartition = nullptr;
std::unique_ptr<vectorized::OlapTabletFinder> _tablet_finder = nullptr;
std::shared_ptr<OlapTableSchemaParam> _schema = nullptr;
std::unique_ptr<vectorized::OlapTableBlockConvertor> _block_convertor = nullptr;
TupleDescriptor* _tablet_sink_tuple_desc = nullptr;
RowDescriptor* _tablet_sink_row_desc = nullptr;
OlapTableLocationParam* _location = nullptr;
vectorized::VRowDistribution _row_distribution;

RuntimeProfile::Counter* _add_partition_request_timer = nullptr;
std::vector<vectorized::RowPartTabletIds> _row_part_tablet_ids;
int64_t _number_input_rows = 0;
TPartitionType::type _part_type;

// for external table sink hash partition
std::unique_ptr<HashPartitionFunction> _partition_function = nullptr;
std::atomic<bool> _reach_limit = false;
int _last_local_channel_idx = -1;

Expand Down Expand Up @@ -237,17 +209,6 @@ class ExchangeSinkOperatorX final : public DataSinkOperatorX<ExchangeSinkLocalSt
template <typename ChannelPtrType>
void _handle_eof_channel(RuntimeState* state, ChannelPtrType channel, Status st);

Status channel_add_rows(RuntimeState* state,
std::vector<std::shared_ptr<vectorized::Channel>>& channels,
size_t num_channels, const uint32_t* __restrict channel_ids,
size_t rows, vectorized::Block* block, bool eos);

Status channel_add_rows_with_idx(RuntimeState* state,
std::vector<std::shared_ptr<vectorized::Channel>>& channels,
size_t num_channels,
std::vector<std::vector<uint32_t>>& channel2rows,
vectorized::Block* block, bool eos);

// Use ExchangeSinkOperatorX to create a sink buffer.
// The sink buffer can be shared among multiple ExchangeSinkLocalState instances,
// or each ExchangeSinkLocalState can have its own sink buffer.
Expand Down
114 changes: 114 additions & 0 deletions be/src/pipeline/shuffle/writer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "writer.h"

#include "pipeline/exec/exchange_sink_operator.h"
#include "vec/core/block.h"

namespace doris::pipeline {
#include "common/compile_check_begin.h"
template <typename ChannelPtrType>
void Writer::_handle_eof_channel(RuntimeState* state, ChannelPtrType channel, Status st) const {
channel->set_receiver_eof(st);
// Chanel will not send RPC to the downstream when eof, so close chanel by OK status.
static_cast<void>(channel->close(state));
}

Status Writer::write(ExchangeSinkLocalState* local_state, RuntimeState* state,
vectorized::Block* block, bool eos) const {
auto rows = block->rows();
{
SCOPED_TIMER(local_state->split_block_hash_compute_timer());
RETURN_IF_ERROR(local_state->partitioner()->do_partitioning(state, block));
}
int64_t old_channel_mem_usage = 0;
for (const auto& channel : local_state->channels) {
old_channel_mem_usage += channel->mem_usage();
}
{
SCOPED_TIMER(local_state->distribute_rows_into_channels_timer());
const auto& channel_filed = local_state->partitioner()->get_channel_ids();
if (channel_filed.len == sizeof(uint32_t)) {
RETURN_IF_ERROR(_channel_add_rows(state, local_state->channels,
local_state->partitioner()->partition_count(),
channel_filed.get<uint32_t>(), rows, block, eos));
} else {
RETURN_IF_ERROR(_channel_add_rows(state, local_state->channels,
local_state->partitioner()->partition_count(),
channel_filed.get<int64_t>(), rows, block, eos));
}
}
int64_t new_channel_mem_usage = 0;
for (const auto& channel : local_state->channels) {
new_channel_mem_usage += channel->mem_usage();
}
COUNTER_UPDATE(local_state->memory_used_counter(),
new_channel_mem_usage - old_channel_mem_usage);
return Status::OK();
}

template <typename ChannelIdType>
Status Writer::_channel_add_rows(RuntimeState* state,
std::vector<std::shared_ptr<vectorized::Channel>>& channels,
size_t partition_count,
const ChannelIdType* __restrict channel_ids, size_t rows,
vectorized::Block* block, bool eos) const {
std::vector<uint32_t> partition_rows_histogram;
auto row_idx = vectorized::PODArray<uint32_t>(rows);
{
partition_rows_histogram.assign(partition_count + 2, 0);
for (size_t i = 0; i < rows; ++i) {
partition_rows_histogram[channel_ids[i] + 1]++;
}
for (size_t i = 1; i <= partition_count + 1; ++i) {
partition_rows_histogram[i] += partition_rows_histogram[i - 1];
}
for (int32_t i = cast_set<int32_t>(rows) - 1; i >= 0; --i) {
row_idx[partition_rows_histogram[channel_ids[i] + 1] - 1] = i;
partition_rows_histogram[channel_ids[i] + 1]--;
}
}
#define HANDLE_CHANNEL_STATUS(state, channel, status) \
do { \
if (status.is<ErrorCode::END_OF_FILE>()) { \
_handle_eof_channel(state, channel, status); \
} else { \
RETURN_IF_ERROR(status); \
} \
} while (0)
Status status = Status::OK();
for (size_t i = 0; i < partition_count; ++i) {
uint32_t start = partition_rows_histogram[i + 1];
uint32_t size = partition_rows_histogram[i + 2] - start;
if (!channels[i]->is_receiver_eof() && size > 0) {
status = channels[i]->add_rows(block, row_idx.data(), start, size, false);
HANDLE_CHANNEL_STATUS(state, channels[i], status);
}
}
if (eos) {
for (int i = 0; i < partition_count; ++i) {
if (!channels[i]->is_receiver_eof()) {
status = channels[i]->add_rows(block, row_idx.data(), 0, 0, true);
HANDLE_CHANNEL_STATUS(state, channels[i], status);
}
}
}
return Status::OK();
}

} // namespace doris::pipeline
53 changes: 53 additions & 0 deletions be/src/pipeline/shuffle/writer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "vec/sink/vdata_stream_sender.h"

namespace doris {
class RuntimeState;
class Status;
namespace vectorized {
class Block;
class Channel;
} // namespace vectorized
namespace pipeline {

#include "common/compile_check_begin.h"
class ExchangeSinkLocalState;

class Writer {
public:
Writer() = default;

Status write(ExchangeSinkLocalState* local_state, RuntimeState* state, vectorized::Block* block,
bool eos) const;

private:
template <typename ChannelIdType>
Status _channel_add_rows(RuntimeState* state,
std::vector<std::shared_ptr<vectorized::Channel>>& channels,
size_t partition_count, const ChannelIdType* __restrict channel_ids,
size_t rows, vectorized::Block* block, bool eos) const;

template <typename ChannelPtrType>
void _handle_eof_channel(RuntimeState* state, ChannelPtrType channel, Status st) const;
};
#include "common/compile_check_end.h"
} // namespace pipeline
} // namespace doris
16 changes: 8 additions & 8 deletions be/src/vec/runtime/partitioner.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,8 @@
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"

namespace doris {
namespace doris::vectorized {
#include "common/compile_check_begin.h"

namespace vectorized {

struct ChannelField {
const void* channel_id;
const uint32_t len;
Expand All @@ -48,12 +45,16 @@ class PartitionerBase {

virtual Status open(RuntimeState* state) = 0;

virtual Status close(RuntimeState* state) = 0;

virtual Status do_partitioning(RuntimeState* state, Block* block) const = 0;

virtual ChannelField get_channel_ids() const = 0;

virtual Status clone(RuntimeState* state, std::unique_ptr<PartitionerBase>& partitioner) = 0;

size_t partition_count() const { return _partition_count; }

protected:
const size_t _partition_count;
};
Expand All @@ -74,6 +75,8 @@ class Crc32HashPartitioner : public PartitionerBase {

Status open(RuntimeState* state) override { return VExpr::open(_partition_expr_ctxs, state); }

Status close(RuntimeState* state) override { return Status::OK(); }

Status do_partitioning(RuntimeState* state, Block* block) const override;

ChannelField get_channel_ids() const override { return {_hash_vals.data(), sizeof(uint32_t)}; }
Expand Down Expand Up @@ -108,8 +111,5 @@ struct SpillPartitionChannelIds {
return ((l >> 16) | (l << 16)) % r;
}
};

} // namespace vectorized
} // namespace doris

#include "common/compile_check_end.h"
} // namespace doris::vectorized
Loading

0 comments on commit 56801f4

Please sign in to comment.