Skip to content

Commit

Permalink
Chunk-wise agg: add GatherMerge above Sort (timescale#7547)
Browse files Browse the repository at this point in the history
Currently we would add a Gather just above Append of partial aggregates
and below Sort, which doesn't allow us to paralellize the Sort itself.

Disable-check: force-changelog-file
  • Loading branch information
akuzm authored Jan 7, 2025
1 parent 8ec3365 commit 935b4c1
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 60 deletions.
129 changes: 69 additions & 60 deletions tsl/src/chunkwise_agg.c
Original file line number Diff line number Diff line change
Expand Up @@ -434,7 +434,7 @@ generate_agg_pushdown_path(PlannerInfo *root, Path *cheapest_total_path, RelOptI
copy_append_like_path(root,
partially_compressed_append,
partially_compressed_sorted,
subpath->pathtarget));
partial_grouping_target));
}

if (extra_data->flags & GROUPING_CAN_USE_HASH)
Expand All @@ -443,7 +443,7 @@ generate_agg_pushdown_path(PlannerInfo *root, Path *cheapest_total_path, RelOptI
copy_append_like_path(root,
partially_compressed_append,
partially_compressed_hashed,
subpath->pathtarget));
partial_grouping_target));
}
}
else
Expand Down Expand Up @@ -507,23 +507,6 @@ generate_agg_pushdown_path(PlannerInfo *root, Path *cheapest_total_path, RelOptI
hashed_subpaths,
partial_grouping_target));
}

/* Finish the partial paths (just added by add_partial_path to partially_grouped_rel in this
* function) by adding a gather node and add this path to the partially_grouped_rel using
* add_path). */
foreach (lc, partially_grouped_rel->partial_pathlist)
{
Path *append_path = lfirst(lc);
double total_groups = append_path->rows * append_path->parallel_workers;

Path *gather_path = (Path *) create_gather_path(root,
partially_grouped_rel,
append_path,
partially_grouped_rel->reltarget,
NULL,
&total_groups);
add_path(partially_grouped_rel, (Path *) gather_path);
}
}
}

Expand Down Expand Up @@ -710,65 +693,91 @@ tsl_pushdown_partial_agg(PlannerInfo *root, Hypertable *ht, RelOptInfo *input_re
}

/* Replan aggregation if we were able to generate partially grouped rel paths */
if (partially_grouped_rel->pathlist == NIL)
List *partially_grouped_paths =
list_concat(partially_grouped_rel->pathlist, partially_grouped_rel->partial_pathlist);
if (partially_grouped_paths == NIL)
return;

/* Prefer our paths */
output_rel->pathlist = NIL;
output_rel->partial_pathlist = NIL;

/* Finalize the created partially aggregated paths by adding a 'Finalize Aggregate' node on top
* of them. */
/*
* Finalize the created partially aggregated paths by adding a
* 'Finalize Aggregate' node on top of them, and adding Sort and Gather
* nodes as required.
*/
AggClauseCosts *agg_final_costs = &extra_data->agg_final_costs;
foreach (lc, partially_grouped_rel->pathlist)
foreach (lc, partially_grouped_paths)
{
Path *append_path = lfirst(lc);

if (contains_path_plain_or_sorted_agg(append_path))
Path *partially_aggregated_path = lfirst(lc);
AggStrategy final_strategy;
if (contains_path_plain_or_sorted_agg(partially_aggregated_path))
{
bool is_sorted;

is_sorted = pathkeys_contained_in(root->group_pathkeys, append_path->pathkeys);

const bool is_sorted =
pathkeys_contained_in(root->group_pathkeys, partially_aggregated_path->pathkeys);
if (!is_sorted)
{
append_path = (Path *)
create_sort_path(root, output_rel, append_path, root->group_pathkeys, -1.0);
partially_aggregated_path = (Path *) create_sort_path(root,
output_rel,
partially_aggregated_path,
root->group_pathkeys,
-1.0);
}

add_path(output_rel,
(Path *) create_agg_path(root,
output_rel,
append_path,
grouping_target,
parse->groupClause ? AGG_SORTED : AGG_PLAIN,
AGGSPLIT_FINAL_DESERIAL,
#if PG16_LT
parse->groupClause,
#else
root->processed_groupClause,
#endif
(List *) parse->havingQual,
agg_final_costs,
d_num_groups));
final_strategy = parse->groupClause ? AGG_SORTED : AGG_PLAIN;
}
else
{
add_path(output_rel,
(Path *) create_agg_path(root,
output_rel,
append_path,
grouping_target,
AGG_HASHED,
AGGSPLIT_FINAL_DESERIAL,
final_strategy = AGG_HASHED;
}

/*
* We have to add a Gather or Gather Merge on top of parallel plans. It
* goes above the Sort we might have added just before, so that the Sort
* is parallelized as well.
*/
if (partially_aggregated_path->parallel_workers > 0)
{
double total_groups =
partially_aggregated_path->rows * partially_aggregated_path->parallel_workers;
if (partially_aggregated_path->pathkeys == NIL)
{
partially_aggregated_path =
(Path *) create_gather_path(root,
partially_grouped_rel,
partially_aggregated_path,
partially_grouped_rel->reltarget,
/* required_outer = */ NULL,
&total_groups);
}
else
{
partially_aggregated_path =
(Path *) create_gather_merge_path(root,
partially_grouped_rel,
partially_aggregated_path,
partially_grouped_rel->reltarget,
partially_aggregated_path->pathkeys,
/* required_outer = */ NULL,
&total_groups);
}
}

add_path(output_rel,
(Path *) create_agg_path(root,
output_rel,
partially_aggregated_path,
grouping_target,
final_strategy,
AGGSPLIT_FINAL_DESERIAL,
#if PG16_LT
parse->groupClause,
parse->groupClause,
#else
root->processed_groupClause,
root->processed_groupClause,
#endif
(List *) parse->havingQual,
agg_final_costs,
d_num_groups));
}
(List *) parse->havingQual,
agg_final_costs,
d_num_groups));
}
}
61 changes: 61 additions & 0 deletions tsl/test/shared/expected/chunkwise_agg_gather_sort.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.
-- Exercise the GatherMerge -> Sort -> Append -> Partial Aggregate plan.
\set prefix 'explain (costs off, timing off, summary off)'
set parallel_setup_cost = 0;
set parallel_tuple_cost = 0;
set max_parallel_workers_per_gather = 2;
set parallel_leader_participation = off;
set enable_hashagg to off;
:prefix
select count(*) from metrics group by v0;
QUERY PLAN
Finalize GroupAggregate
Group Key: _hyper_X_X_chunk.v0
-> Gather Merge
Workers Planned: 2
-> Sort
Sort Key: _hyper_X_X_chunk.v0
-> Parallel Append
-> Partial GroupAggregate
Group Key: _hyper_X_X_chunk.v0
-> Sort
Sort Key: _hyper_X_X_chunk.v0
-> Seq Scan on _hyper_X_X_chunk
-> Partial GroupAggregate
Group Key: _hyper_X_X_chunk.v0
-> Sort
Sort Key: _hyper_X_X_chunk.v0
-> Seq Scan on _hyper_X_X_chunk
-> Partial GroupAggregate
Group Key: _hyper_X_X_chunk.v0
-> Sort
Sort Key: _hyper_X_X_chunk.v0
-> Seq Scan on _hyper_X_X_chunk
(22 rows)

reset enable_hashagg;
:prefix
select count(*) from metrics group by v0;
QUERY PLAN
Finalize HashAggregate
Group Key: _hyper_X_X_chunk.v0
-> Gather
Workers Planned: 2
-> Parallel Append
-> Partial HashAggregate
Group Key: _hyper_X_X_chunk.v0
-> Seq Scan on _hyper_X_X_chunk
-> Partial HashAggregate
Group Key: _hyper_X_X_chunk.v0
-> Seq Scan on _hyper_X_X_chunk
-> Partial HashAggregate
Group Key: _hyper_X_X_chunk.v0
-> Seq Scan on _hyper_X_X_chunk
(14 rows)

reset parallel_setup_cost;
reset parallel_tuple_cost;
reset max_parallel_workers_per_gather;
reset parallel_leader_participation;
1 change: 1 addition & 0 deletions tsl/test/shared/sql/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
set(TEST_FILES_SHARED
cagg_compression.sql
chunkwise_agg_gather_sort.sql
classify_relation.sql
compat.sql
constify_timestamptz_op_interval.sql
Expand Down
27 changes: 27 additions & 0 deletions tsl/test/shared/sql/chunkwise_agg_gather_sort.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
-- This file and its contents are licensed under the Timescale License.
-- Please see the included NOTICE for copyright information and
-- LICENSE-TIMESCALE for a copy of the license.

-- Exercise the GatherMerge -> Sort -> Append -> Partial Aggregate plan.

\set prefix 'explain (costs off, timing off, summary off)'

set parallel_setup_cost = 0;
set parallel_tuple_cost = 0;
set max_parallel_workers_per_gather = 2;
set parallel_leader_participation = off;

set enable_hashagg to off;

:prefix
select count(*) from metrics group by v0;

reset enable_hashagg;

:prefix
select count(*) from metrics group by v0;

reset parallel_setup_cost;
reset parallel_tuple_cost;
reset max_parallel_workers_per_gather;
reset parallel_leader_participation;

0 comments on commit 935b4c1

Please sign in to comment.