Skip to content

Commit

Permalink
Use PrefixSort in Window operator (facebookincubator#10417)
Browse files Browse the repository at this point in the history
Summary:
Benchmark run results https://gist.github.com/aditi-pandit/25d2f6224c97d9d9a32cdd97e05528fa

Pull Request resolved: facebookincubator#10417

Reviewed By: kagamiori

Differential Revision: D64278437

Pulled By: xiaoxmeng

fbshipit-source-id: 491e0cb4298a4749e42c6057f64aac55e761a7c4
  • Loading branch information
aditi-pandit authored and facebook-github-bot committed Oct 16, 2024
1 parent de1b2ae commit 1dd1e3e
Show file tree
Hide file tree
Showing 10 changed files with 463 additions and 25 deletions.
19 changes: 8 additions & 11 deletions velox/exec/SortWindowBuild.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
namespace facebook::velox::exec {

namespace {
std::vector<CompareFlags> makeSpillCompareFlags(
std::vector<CompareFlags> makeCompareFlags(
int32_t numPartitionKeys,
const std::vector<core::SortOrder>& sortingOrders) {
std::vector<CompareFlags> compareFlags;
Expand All @@ -42,14 +42,15 @@ std::vector<CompareFlags> makeSpillCompareFlags(
SortWindowBuild::SortWindowBuild(
const std::shared_ptr<const core::WindowNode>& node,
velox::memory::MemoryPool* pool,
common::PrefixSortConfig&& prefixSortConfig,
const common::SpillConfig* spillConfig,
tsan_atomic<bool>* nonReclaimableSection,
folly::Synchronized<common::SpillStats>* spillStats)
: WindowBuild(node, pool, spillConfig, nonReclaimableSection),
numPartitionKeys_{node->partitionKeys().size()},
spillCompareFlags_{
makeSpillCompareFlags(numPartitionKeys_, node->sortingOrders())},
compareFlags_{makeCompareFlags(numPartitionKeys_, node->sortingOrders())},
pool_(pool),
prefixSortConfig_(prefixSortConfig),
spillStats_(spillStats) {
VELOX_CHECK_NOT_NULL(pool_);
allKeyInfo_.reserve(partitionKeyInfo_.size() + sortKeyInfo_.size());
Expand Down Expand Up @@ -145,8 +146,8 @@ void SortWindowBuild::setupSpiller() {
Spiller::Type::kOrderByInput,
data_.get(),
inputType_,
spillCompareFlags_.size(),
spillCompareFlags_,
compareFlags_.size(),
compareFlags_,
spillConfig_,
spillStats_);
}
Expand Down Expand Up @@ -217,12 +218,8 @@ void SortWindowBuild::sortPartitions() {
RowContainerIterator iter;
data_->listRows(&iter, numRows_, sortedRows_.data());

std::sort(
sortedRows_.begin(),
sortedRows_.end(),
[this](const char* leftRow, const char* rightRow) {
return compareRowsWithKeys(leftRow, rightRow, allKeyInfo_);
});
PrefixSort::sort(
sortedRows_, pool_, data_.get(), compareFlags_, prefixSortConfig_);

computePartitionStartRows();
}
Expand Down
10 changes: 8 additions & 2 deletions velox/exec/SortWindowBuild.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#pragma once

#include "velox/exec/PrefixSort.h"
#include "velox/exec/Spiller.h"
#include "velox/exec/WindowBuild.h"

Expand All @@ -29,6 +30,7 @@ class SortWindowBuild : public WindowBuild {
SortWindowBuild(
const std::shared_ptr<const core::WindowNode>& node,
velox::memory::MemoryPool* pool,
common::PrefixSortConfig&& prefixSortConfig,
const common::SpillConfig* spillConfig,
tsan_atomic<bool>* nonReclaimableSection,
folly::Synchronized<common::SpillStats>* spillStats);
Expand Down Expand Up @@ -82,10 +84,14 @@ class SortWindowBuild : public WindowBuild {
// keys are set to default values. Compare flags for sorting keys match
// sorting order specified in the plan node.
//
// Used to sort 'data_' while spilling.
const std::vector<CompareFlags> spillCompareFlags_;
// Used to sort 'data_' while spilling and in Prefix sort.
const std::vector<CompareFlags> compareFlags_;

memory::MemoryPool* const pool_;

// Config for Prefix-sort.
const common::PrefixSortConfig prefixSortConfig_;

folly::Synchronized<common::SpillStats>* const spillStats_;

// allKeyInfo_ is a combination of (partitionKeyInfo_ and sortKeyInfo_).
Expand Down
9 changes: 8 additions & 1 deletion velox/exec/Window.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,14 @@ Window::Window(
}
} else {
windowBuild_ = std::make_unique<SortWindowBuild>(
windowNode, pool(), spillConfig, &nonReclaimableSection_, &spillStats_);
windowNode,
pool(),
common::PrefixSortConfig{
driverCtx->queryConfig().prefixSortNormalizedKeyMaxBytes(),
driverCtx->queryConfig().prefixSortMinRows()},
spillConfig,
&nonReclaimableSection_,
&spillStats_);
}
}

Expand Down
13 changes: 13 additions & 0 deletions velox/exec/benchmarks/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,16 @@ target_link_libraries(
velox_vector_fuzzer
velox_vector_test_lib
${FOLLY_BENCHMARK})

add_executable(velox_window_prefixsort_benchmark WindowPrefixSortBenchmark.cpp)

target_link_libraries(
velox_window_prefixsort_benchmark
velox_aggregates
velox_exec
velox_exec_test_lib
velox_hive_connector
velox_vector_fuzzer
velox_vector_test_lib
velox_window
${FOLLY_BENCHMARK})
Loading

0 comments on commit 1dd1e3e

Please sign in to comment.