Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Vectorized hash grouping by a single text column #7586

Draft
wants to merge 21 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/linux-32bit-build-and-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ jobs:
DEBIAN_FRONTEND: noninteractive
# vectorized_aggregation has different output on i386 because int8 is by
# reference and currently it cannot be used for vectorized hash grouping.
IGNORES: "append-* transparent_decompression-* transparent_decompress_chunk-* pg_dump telemetry bgw_db_scheduler* hypercore_vacuum vectorized_aggregation"
# vector_agg_text uses the UMASH hashing library that we can't compile
# on i386.
IGNORES: "append-* transparent_decompression-* transparent_decompress_chunk-* pg_dump telemetry bgw_db_scheduler* hypercore_vacuum vectorized_aggregation vector_agg_text"
SKIPS: chunk_adaptive histogram_test-*
EXTENSIONS: "postgres_fdw test_decoding pageinspect pgstattuple"
strategy:
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/windows-build-and-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ jobs:
build_type: ${{ fromJson(needs.config.outputs.build_type) }}
ignores: ["chunk_adaptive metadata telemetry"]
tsl_ignores: ["compression_algos"]
tsl_skips: ["bgw_db_scheduler bgw_db_scheduler_fixed"]
tsl_skips: ["vector_agg_text bgw_db_scheduler bgw_db_scheduler_fixed"]
pg_config: ["-cfsync=off -cstatement_timeout=60s"]
include:
- pg: 14
Expand Down
1 change: 1 addition & 0 deletions .unreleased/vectorized-text-grouping
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #7586 Vectorized aggregation with grouping by a single text column.
2 changes: 1 addition & 1 deletion tsl/src/nodes/decompress_chunk/compressed_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ typedef struct DecompressBatchState
* row. Indexed same as arrow arrays, w/o accounting for the reverse scan
* direction. Initialized to all ones, i.e. all rows pass.
*/
uint64 *restrict vector_qual_result;
const uint64 *restrict vector_qual_result;

/*
* This follows DecompressContext.compressed_chunk_columns, but does not
Expand Down
2 changes: 1 addition & 1 deletion tsl/src/nodes/decompress_chunk/vector_predicates.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ typedef enum VectorQualSummary
} VectorQualSummary;

static pg_attribute_always_inline VectorQualSummary
get_vector_qual_summary(uint64 *restrict qual_result, size_t n_rows)
get_vector_qual_summary(const uint64 *qual_result, size_t n_rows)
{
bool any_rows_pass = false;
bool all_rows_pass = true;
Expand Down
3 changes: 2 additions & 1 deletion tsl/src/nodes/vector_agg/grouping_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ typedef enum
VAGT_Batch,
VAGT_HashSingleFixed2,
VAGT_HashSingleFixed4,
VAGT_HashSingleFixed8
VAGT_HashSingleFixed8,
VAGT_HashSingleText
} VectorAggGroupingType;

extern GroupingPolicy *create_grouping_policy_batch(int num_agg_defs, VectorAggDef *agg_defs,
Expand Down
10 changes: 10 additions & 0 deletions tsl/src/nodes/vector_agg/grouping_policy_hash.c
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
extern HashingStrategy single_fixed_2_strategy;
extern HashingStrategy single_fixed_4_strategy;
extern HashingStrategy single_fixed_8_strategy;
#ifdef TS_USE_UMASH
extern HashingStrategy single_text_strategy;
#endif

static const GroupingPolicy grouping_policy_hash_functions;

Expand Down Expand Up @@ -68,6 +71,11 @@ create_grouping_policy_hash(int num_agg_defs, VectorAggDef *agg_defs, int num_gr

switch (grouping_type)
{
#ifdef TS_USE_UMASH
case VAGT_HashSingleText:
policy->hashing = single_text_strategy;
break;
#endif
case VAGT_HashSingleFixed8:
policy->hashing = single_fixed_8_strategy;
break;
Expand All @@ -82,6 +90,8 @@ create_grouping_policy_hash(int num_agg_defs, VectorAggDef *agg_defs, int num_gr
break;
}

policy->hashing.key_body_mctx = policy->agg_extra_mctx;

policy->hashing.init(&policy->hashing, policy);

return &policy->funcs;
Expand Down
5 changes: 5 additions & 0 deletions tsl/src/nodes/vector_agg/hashing/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,9 @@ set(SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/hash_strategy_single_fixed_4.c
${CMAKE_CURRENT_SOURCE_DIR}/hash_strategy_single_fixed_8.c
${CMAKE_CURRENT_SOURCE_DIR}/hash_strategy_common.c)

if(USE_UMASH)
list(APPEND SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/hash_strategy_single_text.c)
endif()

target_sources(${TSL_LIBRARY_NAME} PRIVATE ${SOURCES})
128 changes: 128 additions & 0 deletions tsl/src/nodes/vector_agg/hashing/hash_strategy_single_text.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/

/*
* Implementation of column hashing for a single text column.
*/

#include <postgres.h>

#include <common/hashfn.h>

#include "compression/arrow_c_data_interface.h"
#include "nodes/decompress_chunk/compressed_batch.h"
#include "nodes/vector_agg/exec.h"
#include "nodes/vector_agg/grouping_policy_hash.h"
#include "template_helper.h"

#include "batch_hashing_params.h"

#include "umash_fingerprint_key.h"

#define EXPLAIN_NAME "single text"
#define KEY_VARIANT single_text
#define OUTPUT_KEY_TYPE BytesView

static void
single_text_key_hashing_init(HashingStrategy *hashing)
{
hashing->umash_params = umash_key_hashing_init();
}

typedef struct BytesView
{
const uint8 *data;
uint32 len;
} BytesView;

static BytesView
get_bytes_view(CompressedColumnValues *column_values, int arrow_row)
{
const uint32 start = ((uint32 *) column_values->buffers[1])[arrow_row];
const int32 value_bytes = ((uint32 *) column_values->buffers[1])[arrow_row + 1] - start;
Assert(value_bytes >= 0);

return (BytesView){ .len = value_bytes, .data = &((uint8 *) column_values->buffers[2])[start] };
}

static pg_attribute_always_inline void
single_text_key_hashing_get_key(BatchHashingParams params, int row, void *restrict output_key_ptr,
void *restrict hash_table_key_ptr, bool *restrict valid)
{
Assert(params.policy->num_grouping_columns == 1);

BytesView *restrict output_key = (BytesView *) output_key_ptr;
HASH_TABLE_KEY_TYPE *restrict hash_table_key = (HASH_TABLE_KEY_TYPE *) hash_table_key_ptr;

if (unlikely(params.single_grouping_column.decompression_type == DT_Scalar))
{
output_key->len = VARSIZE_ANY_EXHDR(*params.single_grouping_column.output_value);
output_key->data = (const uint8 *) VARDATA_ANY(*params.single_grouping_column.output_value);
*valid = !*params.single_grouping_column.output_isnull;
}
else if (params.single_grouping_column.decompression_type == DT_ArrowText)
{
*output_key = get_bytes_view(&params.single_grouping_column, row);
*valid = arrow_row_is_valid(params.single_grouping_column.buffers[0], row);
}
else if (params.single_grouping_column.decompression_type == DT_ArrowTextDict)
{
const int16 index = ((int16 *) params.single_grouping_column.buffers[3])[row];
*output_key = get_bytes_view(&params.single_grouping_column, index);
*valid = arrow_row_is_valid(params.single_grouping_column.buffers[0], row);
}
else
{
pg_unreachable();

Check warning on line 79 in tsl/src/nodes/vector_agg/hashing/hash_strategy_single_text.c

View check run for this annotation

Codecov / codecov/patch

tsl/src/nodes/vector_agg/hashing/hash_strategy_single_text.c#L79

Added line #L79 was not covered by tests
}

DEBUG_PRINT("%p consider key row %d key index %d is %d bytes: ",
params.policy,
row,
params.policy->last_used_key_index + 1,
output_key->len);
for (size_t i = 0; i < output_key->len; i++)
{
DEBUG_PRINT("%.2x.", output_key->data[i]);
}
DEBUG_PRINT("\n");

const struct umash_fp fp = umash_fprint(params.policy->hashing.umash_params,
/* seed = */ ~0ULL,
output_key->data,
output_key->len);
*hash_table_key = umash_fingerprint_get_key(fp);
}

static pg_attribute_always_inline void
single_text_key_hashing_store_new(GroupingPolicyHash *restrict policy, uint32 new_key_index,
BytesView output_key)
{
const int total_bytes = output_key.len + VARHDRSZ;
text *restrict stored = (text *) MemoryContextAlloc(policy->hashing.key_body_mctx, total_bytes);
SET_VARSIZE(stored, total_bytes);
memcpy(VARDATA(stored), output_key.data, output_key.len);
output_key.data = (uint8 *) VARDATA(stored);
policy->hashing.output_keys[new_key_index] = PointerGetDatum(stored);
}

/*
* We use the standard single-key key output functions.
*/
static void
single_text_emit_key(GroupingPolicyHash *policy, uint32 current_key,
TupleTableSlot *aggregated_slot)
{
return hash_strategy_output_key_single_emit(policy, current_key, aggregated_slot);
}

static void
single_text_key_hashing_prepare_for_batch(GroupingPolicyHash *policy,
DecompressBatchState *batch_state)
{
}

#include "hash_strategy_impl.c"
11 changes: 10 additions & 1 deletion tsl/src/nodes/vector_agg/hashing/hashing_strategy.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@ typedef struct HashingStrategy
* This is stored separately from hash table keys, because they might not
* have the full column values, and also storing them contiguously here
* leads to better memory access patterns when emitting the results.
* The details of the key storage are managed by the hashing strategy.
* The details of the key storage are managed by the hashing strategy. The
* by-reference keys can use a separate memory context for dense storage.
*/
Datum *restrict output_keys;
uint64 num_allocated_output_keys;
MemoryContext key_body_mctx;

/*
* In single-column grouping, we store the null key outside of the hash
Expand All @@ -54,6 +56,13 @@ typedef struct HashingStrategy
* to reduce the hash table size.
*/
uint32 null_key_index;

#ifdef TS_USE_UMASH
/*
* UMASH fingerprinting parameters.
*/
struct umash_params *umash_params;
#endif
} HashingStrategy;

void hash_strategy_output_key_alloc(GroupingPolicyHash *policy, DecompressBatchState *batch_state);
Expand Down
45 changes: 45 additions & 0 deletions tsl/src/nodes/vector_agg/hashing/umash_fingerprint_key.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/
#pragma once

/*
* Helpers to use the umash fingerprint as a hash table key in our hashing
* strategies for vectorized grouping.
*/

#include "import/umash.h"

/*
* The struct is packed so that the hash table entry fits into 16
* bytes with the uint32 key index that goes before.
*/
struct umash_fingerprint_key
{
uint32 hash;
uint64 rest;
} pg_attribute_packed();

#define HASH_TABLE_KEY_TYPE struct umash_fingerprint_key
#define KEY_HASH(X) (X.hash)
#define KEY_EQUAL(a, b) (a.hash == b.hash && a.rest == b.rest)

static inline struct umash_fingerprint_key
umash_fingerprint_get_key(struct umash_fp fp)
{
const struct umash_fingerprint_key key = {
.hash = fp.hash[0] & (~(uint32) 0),
.rest = fp.hash[1],
};
return key;
}

static inline struct umash_params *
umash_key_hashing_init()
{
struct umash_params *params = palloc0(sizeof(struct umash_params));
umash_params_derive(params, 0xabcdef1234567890ull, NULL);
return params;
}
13 changes: 13 additions & 0 deletions tsl/src/nodes/vector_agg/plan.c
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,10 @@ get_vectorized_grouping_type(Agg *agg, CustomScan *custom, List *resolved_target
/*
* We support hashed vectorized grouping by one fixed-size by-value
* compressed column.
* We can use our hash table for GroupAggregate as well, because it preserves
* the input order of the keys.
* FIXME write a test for that.
* FIXME rewrite to use num_grouping_columns
*/
if (num_grouping_columns == 1)
{
Expand All @@ -526,6 +530,15 @@ get_vectorized_grouping_type(Agg *agg, CustomScan *custom, List *resolved_target
break;
}
}
#ifdef TS_USE_UMASH
else
{
Ensure(single_grouping_var->vartype == TEXTOID,
"invalid vector type %d for grouping",
single_grouping_var->vartype);
return VAGT_HashSingleText;
}
#endif
}

return VAGT_Invalid;
Expand Down
Loading
Loading