Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Aggregator support prefetch #9679

Open
wants to merge 41 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 32 commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
b869b2f
new hash
guo-shaoge Nov 25, 2024
e8a2df8
prefetch done
guo-shaoge Nov 26, 2024
b314166
executeImplBatchStringHashMap done
guo-shaoge Nov 27, 2024
ec6e892
handle resize exception done
guo-shaoge Nov 27, 2024
9dc702d
tmp save
guo-shaoge Nov 27, 2024
ce1f767
revert Serialized Key changes
guo-shaoge Nov 28, 2024
8ac8beb
refine
guo-shaoge Nov 28, 2024
0053ce8
refine
guo-shaoge Nov 28, 2024
fcf8ed2
fix unit test
guo-shaoge Nov 29, 2024
ae7b969
refine
guo-shaoge Dec 2, 2024
3a86617
unit test
guo-shaoge Dec 2, 2024
623fef5
prefetch
guo-shaoge Dec 2, 2024
19f320d
fix
guo-shaoge Dec 2, 2024
3a226df
refine
guo-shaoge Dec 3, 2024
c44ace7
refine
guo-shaoge Dec 3, 2024
3e30f95
revert new hasher
guo-shaoge Dec 3, 2024
ea85d19
debug low distinct value
guo-shaoge Dec 3, 2024
16937ff
Revert "revert new hasher"
guo-shaoge Dec 3, 2024
d2fba57
refine original code path
guo-shaoge Dec 3, 2024
71b6ecd
Reapply "revert new hasher"
guo-shaoge Dec 3, 2024
40ceb08
one level old hash; two level new hash
guo-shaoge Dec 3, 2024
4cb24c2
Revert "one level old hash; two level new hash"
guo-shaoge Dec 4, 2024
c02cf71
revert new hasher; refine original code path
guo-shaoge Dec 4, 2024
352b710
fix case
guo-shaoge Dec 5, 2024
e5ab87c
refine
guo-shaoge Dec 13, 2024
30d99be
is_serialized_key
guo-shaoge Dec 15, 2024
01cae80
fix
guo-shaoge Dec 15, 2024
95c597d
fix start_row
guo-shaoge Dec 15, 2024
83fb879
Merge branch 'master' into hashagg_prefetch
guo-shaoge Dec 16, 2024
e0dea79
revert prefetch for StringHashTable
guo-shaoge Dec 17, 2024
143fee3
comment
guo-shaoge Dec 17, 2024
6cea464
fmt
guo-shaoge Dec 18, 2024
6750737
comment
guo-shaoge Dec 19, 2024
2e94829
prefetch agg data
guo-shaoge Dec 20, 2024
dc0973a
try fix ci
guo-shaoge Dec 22, 2024
9623368
prefetch threshold(2MB); mini batch; prefetch exists key; prefetch in…
guo-shaoge Dec 24, 2024
0ef6ba6
clean code
guo-shaoge Dec 24, 2024
49d4188
fix prefetch agg func is 0
guo-shaoge Dec 24, 2024
b4a1bde
fix case
guo-shaoge Dec 24, 2024
738ab2a
refine agg func is zero
guo-shaoge Dec 25, 2024
361f988
refine
guo-shaoge Dec 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions dbms/src/AggregateFunctions/AggregateFunctionGroupUniqArray.h
Original file line number Diff line number Diff line change
Expand Up @@ -188,12 +188,12 @@ class AggregateFunctionGroupUniqArrayGeneric

void insertResultInto(ConstAggregateDataPtr __restrict place, IColumn & to, Arena *) const override
{
ColumnArray & arr_to = assert_cast<ColumnArray &>(to);
auto & arr_to = assert_cast<ColumnArray &>(to);
ColumnArray::Offsets & offsets_to = arr_to.getOffsets();
IColumn & data_to = arr_to.getData();

auto & set = this->data(place).value;
offsets_to.push_back((offsets_to.size() == 0 ? 0 : offsets_to.back()) + set.size());
offsets_to.push_back((offsets_to.empty() ? 0 : offsets_to.back()) + set.size());

for (auto & elem : set)
deserializeAndInsert<is_plain_column>(elem.getValue(), data_to);
Expand Down
85 changes: 46 additions & 39 deletions dbms/src/Common/ColumnsHashing.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,20 @@ struct HashMethodOneNumber
using Self = HashMethodOneNumber<Value, Mapped, FieldType, use_cache>;
using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, use_cache>;

static constexpr bool is_serialized_key = false;

const FieldType * vec;
const size_t total_rows;

/// If the keys of a fixed length then key_sizes contains their lengths, empty otherwise.
HashMethodOneNumber(const ColumnRawPtrs & key_columns, const Sizes & /*key_sizes*/, const TiDB::TiDBCollators &)
: total_rows(key_columns[0]->size())
{
vec = &static_cast<const ColumnVector<FieldType> *>(key_columns[0])->getData()[0];
}

explicit HashMethodOneNumber(const IColumn * column)
: total_rows(column->size())
{
vec = &static_cast<const ColumnVector<FieldType> *>(column)->getData()[0];
}
Expand Down Expand Up @@ -86,54 +91,46 @@ struct HashMethodOneNumber


/// For the case when there is one string key.
template <typename Value, typename Mapped, bool place_string_to_arena = true, bool use_cache = true>
template <typename Value, typename Mapped, bool use_cache = true>
struct HashMethodString
: public columns_hashing_impl::
HashMethodBase<HashMethodString<Value, Mapped, place_string_to_arena, use_cache>, Value, Mapped, use_cache>
: public columns_hashing_impl::HashMethodBase<HashMethodString<Value, Mapped, use_cache>, Value, Mapped, use_cache>
{
using Self = HashMethodString<Value, Mapped, place_string_to_arena, use_cache>;
guo-shaoge marked this conversation as resolved.
Show resolved Hide resolved
using Self = HashMethodString<Value, Mapped, use_cache>;
using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, use_cache>;

static constexpr bool is_serialized_key = false;

const IColumn::Offset * offsets;
const UInt8 * chars;
TiDB::TiDBCollatorPtr collator = nullptr;
const size_t total_rows;

HashMethodString(
const ColumnRawPtrs & key_columns,
const Sizes & /*key_sizes*/,
const TiDB::TiDBCollators & collators)
: total_rows(key_columns[0]->size())
{
const IColumn & column = *key_columns[0];
const auto & column_string = assert_cast<const ColumnString &>(column);
offsets = column_string.getOffsets().data();
chars = column_string.getChars().data();
if (!collators.empty())
{
if constexpr (!place_string_to_arena)
throw Exception("String with collator must be placed on arena.", ErrorCodes::LOGICAL_ERROR);
collator = collators[0];
}
}

ALWAYS_INLINE inline auto getKeyHolder(
ALWAYS_INLINE inline ArenaKeyHolder getKeyHolder(
ssize_t row,
[[maybe_unused]] Arena * pool,
std::vector<String> & sort_key_containers) const
{
auto last_offset = row == 0 ? 0 : offsets[row - 1];
// Remove last zero byte.
StringRef key(chars + last_offset, offsets[row] - last_offset - 1);
if (likely(collator))
key = collator->sortKey(key.data, key.size, sort_key_containers[0]);

if constexpr (place_string_to_arena)
{
if (likely(collator))
key = collator->sortKey(key.data, key.size, sort_key_containers[0]);
return ArenaKeyHolder{key, *pool};
}
else
{
return key;
}
return ArenaKeyHolder{key, *pool};
}

protected:
Expand All @@ -147,10 +144,14 @@ struct HashMethodStringBin
using Self = HashMethodStringBin<Value, Mapped, padding>;
using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, false>;

static constexpr bool is_serialized_key = false;

const IColumn::Offset * offsets;
const UInt8 * chars;
const size_t total_rows;

HashMethodStringBin(const ColumnRawPtrs & key_columns, const Sizes & /*key_sizes*/, const TiDB::TiDBCollators &)
: total_rows(key_columns[0]->size())
{
const IColumn & column = *key_columns[0];
const auto & column_string = assert_cast<const ColumnString &>(column);
Expand Down Expand Up @@ -344,12 +345,16 @@ struct HashMethodFastPathTwoKeysSerialized
using Self = HashMethodFastPathTwoKeysSerialized<Key1Desc, Key2Desc, Value, Mapped>;
using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, false>;

static constexpr bool is_serialized_key = true;

Key1Desc key_1_desc;
Key2Desc key_2_desc;
const size_t total_rows;

HashMethodFastPathTwoKeysSerialized(const ColumnRawPtrs & key_columns, const Sizes &, const TiDB::TiDBCollators &)
: key_1_desc(key_columns[0])
, key_2_desc(key_columns[1])
, total_rows(key_columns[0]->size())
{}

ALWAYS_INLINE inline auto getKeyHolder(ssize_t row, Arena * pool, std::vector<String> &) const
Expand All @@ -370,25 +375,26 @@ struct HashMethodFastPathTwoKeysSerialized


/// For the case when there is one fixed-length string key.
template <typename Value, typename Mapped, bool place_string_to_arena = true, bool use_cache = true>
template <typename Value, typename Mapped, bool use_cache = true>
struct HashMethodFixedString
: public columns_hashing_impl::HashMethodBase<
HashMethodFixedString<Value, Mapped, place_string_to_arena, use_cache>,
Value,
Mapped,
use_cache>
: public columns_hashing_impl::
HashMethodBase<HashMethodFixedString<Value, Mapped, use_cache>, Value, Mapped, use_cache>
{
using Self = HashMethodFixedString<Value, Mapped, place_string_to_arena, use_cache>;
using Self = HashMethodFixedString<Value, Mapped, use_cache>;
using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, use_cache>;

static constexpr bool is_serialized_key = false;

size_t n;
const ColumnFixedString::Chars_t * chars;
TiDB::TiDBCollatorPtr collator = nullptr;
const size_t total_rows;

HashMethodFixedString(
const ColumnRawPtrs & key_columns,
const Sizes & /*key_sizes*/,
const TiDB::TiDBCollators & collators)
: total_rows(key_columns[0]->size())
{
const IColumn & column = *key_columns[0];
const auto & column_string = assert_cast<const ColumnFixedString &>(column);
Expand All @@ -398,26 +404,16 @@ struct HashMethodFixedString
collator = collators[0];
}

ALWAYS_INLINE inline auto getKeyHolder(
ALWAYS_INLINE inline ArenaKeyHolder getKeyHolder(
size_t row,
[[maybe_unused]] Arena * pool,
Arena * pool,
std::vector<String> & sort_key_containers) const
{
StringRef key(&(*chars)[row * n], n);

if (collator)
{
key = collator->sortKeyFastPath(key.data, key.size, sort_key_containers[0]);
}

if constexpr (place_string_to_arena)
{
return ArenaKeyHolder{key, *pool};
}
else
{
return key;
}
return ArenaKeyHolder{key, *pool};
}

protected:
Expand All @@ -438,10 +434,12 @@ struct HashMethodKeysFixed
using BaseHashed = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, use_cache>;
using Base = columns_hashing_impl::BaseStateKeysFixed<Key, has_nullable_keys_>;

static constexpr bool is_serialized_key = false;
static constexpr bool has_nullable_keys = has_nullable_keys_;

Sizes key_sizes;
size_t keys_size;
const size_t total_rows;

/// SSSE3 shuffle method can be used. Shuffle masks will be calculated and stored here.
#if defined(__SSSE3__) && !defined(MEMORY_SANITIZER)
Expand All @@ -467,6 +465,7 @@ struct HashMethodKeysFixed
: Base(key_columns)
, key_sizes(std::move(key_sizes_))
, keys_size(key_columns.size())
, total_rows(key_columns[0]->size())
{
if (usePreparedKeys(key_sizes))
{
Expand Down Expand Up @@ -593,9 +592,12 @@ struct HashMethodSerialized
using Self = HashMethodSerialized<Value, Mapped>;
using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, false>;

static constexpr bool is_serialized_key = true;

ColumnRawPtrs key_columns;
size_t keys_size;
TiDB::TiDBCollators collators;
const size_t total_rows;

HashMethodSerialized(
const ColumnRawPtrs & key_columns_,
Expand All @@ -604,6 +606,7 @@ struct HashMethodSerialized
: key_columns(key_columns_)
, keys_size(key_columns_.size())
, collators(collators_)
, total_rows(key_columns_[0]->size())
{}

ALWAYS_INLINE inline SerializedKeyHolder getKeyHolder(
Expand All @@ -629,12 +632,16 @@ struct HashMethodHashed
using Self = HashMethodHashed<Value, Mapped, use_cache>;
using Base = columns_hashing_impl::HashMethodBase<Self, Value, Mapped, use_cache>;

static constexpr bool is_serialized_key = false;

ColumnRawPtrs key_columns;
TiDB::TiDBCollators collators;
const size_t total_rows;

HashMethodHashed(ColumnRawPtrs key_columns_, const Sizes &, const TiDB::TiDBCollators & collators_)
: key_columns(std::move(key_columns_))
, collators(collators_)
, total_rows(key_columns[0]->size())
{}

ALWAYS_INLINE inline Key getKeyHolder(size_t row, Arena *, std::vector<String> & sort_key_containers) const
Expand Down
Loading