Skip to content

Commit

Permalink
Vectorized aggregation with grouping by one fixed-size column (timesc…
Browse files Browse the repository at this point in the history
…ale#7341)

The implementation uses the Postgres simplehash hash table for by-value
fixed-size compressed columns.

The biggest improvement on a "sensible" query is about 90%, and a couple
of queries show bigger improvements but these are very synthetic cases
that don't make much sense:

https://grafana.ops.savannah-dev.timescale.com/d/fasYic_4z/compare-akuzm?orgId=1&var-branch=All&var-run1=3815&var-run2=3816&var-threshold=0.02&var-use_historical_thresholds=true&var-threshold_expression=2%20%2A%20percentile_cont%280.90%29&var-exact_suite_version=false&from=now-2d&to=now

---------

Signed-off-by: Alexander Kuzmenkov <[email protected]>
Co-authored-by: Erik Nordström <[email protected]>
  • Loading branch information
akuzm and erimatnor authored Jan 2, 2025
1 parent 52da577 commit 11e866e
Show file tree
Hide file tree
Showing 36 changed files with 4,688 additions and 62 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/linux-32bit-build-and-test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ jobs:
CC: clang-14
CXX: clang++-14
DEBIAN_FRONTEND: noninteractive
IGNORES: "append-* transparent_decompression-* transparent_decompress_chunk-* pg_dump telemetry bgw_db_scheduler* hypercore_vacuum"
# vectorized_aggregation has different output on i386 because int8 is by
# reference and currently it cannot be used for vectorized hash grouping.
IGNORES: "append-* transparent_decompression-* transparent_decompress_chunk-* pg_dump telemetry bgw_db_scheduler* hypercore_vacuum vectorized_aggregation"
SKIPS: chunk_adaptive histogram_test-*
EXTENSIONS: "postgres_fdw test_decoding pageinspect pgstattuple"
strategy:
Expand Down
1 change: 1 addition & 0 deletions .unreleased/vectorized-grouping-one-fixed
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #7341 Vectorized aggregation with grouping by one fixed-size by-value compressed column (such as arithmetic types).
2 changes: 2 additions & 0 deletions tsl/src/nodes/vector_agg/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
add_subdirectory(function)
add_subdirectory(hashing)
set(SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/exec.c
${CMAKE_CURRENT_SOURCE_DIR}/grouping_policy_batch.c
${CMAKE_CURRENT_SOURCE_DIR}/grouping_policy_hash.c
${CMAKE_CURRENT_SOURCE_DIR}/plan.c)
target_sources(${TSL_LIBRARY_NAME} PRIVATE ${SOURCES})
37 changes: 31 additions & 6 deletions tsl/src/nodes/vector_agg/exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "nodes/decompress_chunk/exec.h"
#include "nodes/decompress_chunk/vector_quals.h"
#include "nodes/vector_agg.h"
#include "nodes/vector_agg/plan.h"

static int
get_input_offset(DecompressChunkState *decompress_state, Var *var)
Expand Down Expand Up @@ -179,17 +180,41 @@ vector_agg_begin(CustomScanState *node, EState *estate, int eflags)

Var *var = castNode(Var, tlentry->expr);
col->input_offset = get_input_offset(decompress_state, var);
DecompressContext *dcontext = &decompress_state->decompress_context;
CompressionColumnDescription *desc =
&dcontext->compressed_chunk_columns[col->input_offset];
col->value_bytes = desc->value_bytes;
}
}

/*
* Currently the only grouping policy we use is per-batch grouping.
* Create the grouping policy chosen at plan time.
*/
vector_agg_state->grouping =
create_grouping_policy_batch(vector_agg_state->num_agg_defs,
vector_agg_state->agg_defs,
vector_agg_state->num_grouping_columns,
vector_agg_state->grouping_columns);
const VectorAggGroupingType grouping_type =
intVal(list_nth(cscan->custom_private, VASI_GroupingType));
if (grouping_type == VAGT_Batch)
{
/*
* Per-batch grouping.
*/
vector_agg_state->grouping =
create_grouping_policy_batch(vector_agg_state->num_agg_defs,
vector_agg_state->agg_defs,
vector_agg_state->num_grouping_columns,
vector_agg_state->grouping_columns);
}
else
{
/*
* Hash grouping.
*/
vector_agg_state->grouping =
create_grouping_policy_hash(vector_agg_state->num_agg_defs,
vector_agg_state->agg_defs,
vector_agg_state->num_grouping_columns,
vector_agg_state->grouping_columns,
grouping_type);
}
}

static void
Expand Down
1 change: 1 addition & 0 deletions tsl/src/nodes/vector_agg/exec.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ typedef struct GroupingColumn
{
int input_offset;
int output_offset;
int value_bytes;
} GroupingColumn;

typedef struct
Expand Down
58 changes: 58 additions & 0 deletions tsl/src/nodes/vector_agg/function/agg_many_vector_helper.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* This file and its contents are licensed under the Timescale License.
* Please see the included NOTICE for copyright information and
* LICENSE-TIMESCALE for a copy of the license.
*/

/*
* A generic implementation of adding the given batch to many aggregate function
* states with given offsets. Used for hash aggregation, and builds on the
* FUNCTION_NAME(one) function, which adds one passing non-null row to the given
* aggregate function state.
*/
static pg_attribute_always_inline void
FUNCTION_NAME(many_vector_impl)(void *restrict agg_states, const uint32 *offsets,
const uint64 *filter, int start_row, int end_row,
const ArrowArray *vector, MemoryContext agg_extra_mctx)
{
FUNCTION_NAME(state) *restrict states = (FUNCTION_NAME(state) *) agg_states;
const CTYPE *values = vector->buffers[1];
MemoryContext old = MemoryContextSwitchTo(agg_extra_mctx);
for (int row = start_row; row < end_row; row++)
{
const CTYPE value = values[row];
FUNCTION_NAME(state) *restrict state = &states[offsets[row]];
if (arrow_row_is_valid(filter, row))
{
Assert(offsets[row] != 0);
FUNCTION_NAME(one)(state, value);
}
}
MemoryContextSwitchTo(old);
}

static pg_noinline void
FUNCTION_NAME(many_vector_all_valid)(void *restrict agg_states, const uint32 *offsets,
int start_row, int end_row, const ArrowArray *vector,
MemoryContext agg_extra_mctx)
{
FUNCTION_NAME(many_vector_impl)
(agg_states, offsets, NULL, start_row, end_row, vector, agg_extra_mctx);
}

static void
FUNCTION_NAME(many_vector)(void *restrict agg_states, const uint32 *offsets, const uint64 *filter,
int start_row, int end_row, const ArrowArray *vector,
MemoryContext agg_extra_mctx)
{
if (filter == NULL)
{
FUNCTION_NAME(many_vector_all_valid)
(agg_states, offsets, start_row, end_row, vector, agg_extra_mctx);
}
else
{
FUNCTION_NAME(many_vector_impl)
(agg_states, offsets, filter, start_row, end_row, vector, agg_extra_mctx);
}
}
2 changes: 2 additions & 0 deletions tsl/src/nodes/vector_agg/function/float48_accum_single.c
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,7 @@ FUNCTION_NAME(one)(void *restrict agg_state, const CTYPE value)
state->Sx = newSx;
}

#include "agg_many_vector_helper.c"
#include "agg_scalar_helper.c"
#include "agg_vector_validity_helper.c"

Expand All @@ -325,6 +326,7 @@ VectorAggFunctions FUNCTION_NAME(argdef) = {
.agg_emit = FUNCTION_NAME(emit),
.agg_scalar = FUNCTION_NAME(scalar),
.agg_vector = FUNCTION_NAME(vector),
.agg_many_vector = FUNCTION_NAME(many_vector),
};
#undef UPDATE
#undef COMBINE
Expand Down
75 changes: 75 additions & 0 deletions tsl/src/nodes/vector_agg/function/functions.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,70 @@ count_star_scalar(void *agg_state, Datum constvalue, bool constisnull, int n,
state->count += n;
}

static pg_attribute_always_inline void
count_star_many_scalar_impl(void *restrict agg_states, const uint32 *offsets, const uint64 *filter,
int start_row, int end_row, Datum constvalue, bool constisnull,
MemoryContext agg_extra_mctx)
{
CountState *states = (CountState *) agg_states;
for (int row = start_row; row < end_row; row++)
{
if (arrow_row_is_valid(filter, row))
{
states[offsets[row]].count++;
}
}
}

static pg_noinline void
count_star_many_scalar_nofilter(void *restrict agg_states, const uint32 *offsets, int start_row,
int end_row, Datum constvalue, bool constisnull,
MemoryContext agg_extra_mctx)
{
count_star_many_scalar_impl(agg_states,
offsets,
NULL,
start_row,
end_row,
constvalue,
constisnull,
agg_extra_mctx);
}

static void
count_star_many_scalar(void *restrict agg_states, const uint32 *offsets, const uint64 *filter,
int start_row, int end_row, Datum constvalue, bool constisnull,
MemoryContext agg_extra_mctx)
{
if (filter == NULL)
{
count_star_many_scalar_nofilter(agg_states,
offsets,
start_row,
end_row,
constvalue,
constisnull,
agg_extra_mctx);
}
else
{
count_star_many_scalar_impl(agg_states,
offsets,
filter,
start_row,
end_row,
constvalue,
constisnull,
agg_extra_mctx);
}
}

VectorAggFunctions count_star_agg = {
.state_bytes = sizeof(CountState),
.agg_init = count_init,
.agg_scalar = count_star_scalar,
.agg_emit = count_emit,
.agg_many_scalar = count_star_many_scalar,
};

/*
Expand Down Expand Up @@ -110,12 +169,28 @@ count_any_vector(void *agg_state, const ArrowArray *vector, const uint64 *filter
}
}

static void
count_any_many_vector(void *restrict agg_states, const uint32 *offsets, const uint64 *filter,
int start_row, int end_row, const ArrowArray *vector,
MemoryContext agg_extra_mctx)
{
for (int row = start_row; row < end_row; row++)
{
CountState *state = (offsets[row] + (CountState *) agg_states);
if (arrow_row_is_valid(filter, row))
{
state->count++;
}
}
}

VectorAggFunctions count_any_agg = {
.state_bytes = sizeof(CountState),
.agg_init = count_init,
.agg_emit = count_emit,
.agg_scalar = count_any_scalar,
.agg_vector = count_any_vector,
.agg_many_vector = count_any_many_vector,
};

/*
Expand Down
16 changes: 16 additions & 0 deletions tsl/src/nodes/vector_agg/function/functions.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,22 @@ typedef struct
void (*agg_scalar)(void *restrict agg_state, Datum constvalue, bool constisnull, int n,
MemoryContext agg_extra_mctx);

/*
* Add the rows of the given arrow array to aggregate function states given
* by the respective offsets.
*/
void (*agg_many_vector)(void *restrict agg_states, const uint32 *offsets, const uint64 *filter,
int start_row, int end_row, const ArrowArray *vector,
MemoryContext agg_extra_mctx);

/*
* Same as above, but for a scalar argument. This is mostly important for
* count(*) and can be NULL.
*/
void (*agg_many_scalar)(void *restrict agg_states, const uint32 *offsets, const uint64 *filter,
int start_row, int end_row, Datum constvalue, bool constisnull,
MemoryContext agg_extra_mctx);

/* Emit a partial aggregation result. */
void (*agg_emit)(void *restrict agg_state, Datum *out_result, bool *out_isnull);
} VectorAggFunctions;
Expand Down
2 changes: 2 additions & 0 deletions tsl/src/nodes/vector_agg/function/int128_accum_single.c
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ FUNCTION_NAME(one)(void *restrict agg_state, const CTYPE value)
#endif
}

#include "agg_many_vector_helper.c"
#include "agg_scalar_helper.c"
#include "agg_vector_validity_helper.c"

Expand All @@ -119,6 +120,7 @@ VectorAggFunctions FUNCTION_NAME(argdef) = {
.agg_emit = FUNCTION_NAME(emit),
.agg_scalar = FUNCTION_NAME(scalar),
.agg_vector = FUNCTION_NAME(vector),
.agg_many_vector = FUNCTION_NAME(many_vector),
};

#endif
Expand Down
2 changes: 2 additions & 0 deletions tsl/src/nodes/vector_agg/function/int24_avg_accum_single.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ FUNCTION_NAME(one)(void *restrict agg_state, const CTYPE value)
state->sum += value;
}

#include "agg_many_vector_helper.c"
#include "agg_scalar_helper.c"
#include "agg_vector_validity_helper.c"

Expand All @@ -47,6 +48,7 @@ VectorAggFunctions FUNCTION_NAME(argdef) = {
.agg_emit = int24_avg_accum_emit,
.agg_scalar = FUNCTION_NAME(scalar),
.agg_vector = FUNCTION_NAME(vector),
.agg_many_vector = FUNCTION_NAME(many_vector),
};

#endif
Expand Down
2 changes: 2 additions & 0 deletions tsl/src/nodes/vector_agg/function/int24_sum_single.c
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ FUNCTION_NAME(one)(void *restrict agg_state, const CTYPE value)

typedef Int24SumState FUNCTION_NAME(state);

#include "agg_many_vector_helper.c"
#include "agg_scalar_helper.c"
#include "agg_vector_validity_helper.c"

Expand All @@ -70,6 +71,7 @@ VectorAggFunctions FUNCTION_NAME(argdef) = {
.agg_emit = int_sum_emit,
.agg_scalar = FUNCTION_NAME(scalar),
.agg_vector = FUNCTION_NAME(vector),
.agg_many_vector = FUNCTION_NAME(many_vector),
};
#endif

Expand Down
2 changes: 2 additions & 0 deletions tsl/src/nodes/vector_agg/function/minmax_arithmetic_single.c
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ FUNCTION_NAME(one)(void *restrict agg_state, const CTYPE value)
}
}

#include "agg_many_vector_helper.c"
#include "agg_scalar_helper.c"
#include "agg_vector_validity_helper.c"

Expand All @@ -67,6 +68,7 @@ VectorAggFunctions FUNCTION_NAME(argdef) = {
.agg_emit = minmax_emit,
.agg_scalar = FUNCTION_NAME(scalar),
.agg_vector = FUNCTION_NAME(vector),
.agg_many_vector = FUNCTION_NAME(many_vector),
};
#endif

Expand Down
2 changes: 2 additions & 0 deletions tsl/src/nodes/vector_agg/function/sum_float_single.c
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ FUNCTION_NAME(one)(void *restrict agg_state, const CTYPE value)
state->result += value;
}

#include "agg_many_vector_helper.c"
#include "agg_scalar_helper.c"
#include "agg_vector_validity_helper.c"

Expand All @@ -100,6 +101,7 @@ VectorAggFunctions FUNCTION_NAME(argdef) = {
.agg_emit = FUNCTION_NAME(emit),
.agg_scalar = FUNCTION_NAME(scalar),
.agg_vector = FUNCTION_NAME(vector),
.agg_many_vector = FUNCTION_NAME(many_vector),
};

#endif
Expand Down
21 changes: 21 additions & 0 deletions tsl/src/nodes/vector_agg/grouping_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ typedef struct GroupingPolicy
*/
void (*gp_reset)(GroupingPolicy *gp);

/*
* Aggregate a single compressed batch.
*/
void (*gp_add_batch)(GroupingPolicy *gp, DecompressBatchState *batch_state);

/*
Expand All @@ -51,6 +54,24 @@ typedef struct GroupingPolicy
char *(*gp_explain)(GroupingPolicy *gp);
} GroupingPolicy;

/*
* The various types of grouping we might use, as determined at planning time.
* The hashed subtypes are all implemented by hash grouping policy.
*/
typedef enum
{
VAGT_Invalid,
VAGT_Batch,
VAGT_HashSingleFixed2,
VAGT_HashSingleFixed4,
VAGT_HashSingleFixed8
} VectorAggGroupingType;

extern GroupingPolicy *create_grouping_policy_batch(int num_agg_defs, VectorAggDef *agg_defs,
int num_grouping_columns,
GroupingColumn *grouping_columns);

extern GroupingPolicy *create_grouping_policy_hash(int num_agg_defs, VectorAggDef *agg_defs,
int num_grouping_columns,
GroupingColumn *grouping_columns,
VectorAggGroupingType grouping_type);
Loading

0 comments on commit 11e866e

Please sign in to comment.