From 935b4c172fe6b874c402c898fa9d6d71738079eb Mon Sep 17 00:00:00 2001 From: Alexander Kuzmenkov <36882414+akuzm@users.noreply.github.com> Date: Tue, 7 Jan 2025 16:02:07 +0100 Subject: [PATCH] Chunk-wise agg: add GatherMerge above Sort (#7547) Currently we would add a Gather just above Append of partial aggregates and below Sort, which doesn't allow us to paralellize the Sort itself. Disable-check: force-changelog-file --- tsl/src/chunkwise_agg.c | 129 ++++++++++-------- .../expected/chunkwise_agg_gather_sort.out | 61 +++++++++ tsl/test/shared/sql/CMakeLists.txt | 1 + .../shared/sql/chunkwise_agg_gather_sort.sql | 27 ++++ 4 files changed, 158 insertions(+), 60 deletions(-) create mode 100644 tsl/test/shared/expected/chunkwise_agg_gather_sort.out create mode 100644 tsl/test/shared/sql/chunkwise_agg_gather_sort.sql diff --git a/tsl/src/chunkwise_agg.c b/tsl/src/chunkwise_agg.c index 59c2ce47675..7c66a4cedc0 100644 --- a/tsl/src/chunkwise_agg.c +++ b/tsl/src/chunkwise_agg.c @@ -434,7 +434,7 @@ generate_agg_pushdown_path(PlannerInfo *root, Path *cheapest_total_path, RelOptI copy_append_like_path(root, partially_compressed_append, partially_compressed_sorted, - subpath->pathtarget)); + partial_grouping_target)); } if (extra_data->flags & GROUPING_CAN_USE_HASH) @@ -443,7 +443,7 @@ generate_agg_pushdown_path(PlannerInfo *root, Path *cheapest_total_path, RelOptI copy_append_like_path(root, partially_compressed_append, partially_compressed_hashed, - subpath->pathtarget)); + partial_grouping_target)); } } else @@ -507,23 +507,6 @@ generate_agg_pushdown_path(PlannerInfo *root, Path *cheapest_total_path, RelOptI hashed_subpaths, partial_grouping_target)); } - - /* Finish the partial paths (just added by add_partial_path to partially_grouped_rel in this - * function) by adding a gather node and add this path to the partially_grouped_rel using - * add_path). */ - foreach (lc, partially_grouped_rel->partial_pathlist) - { - Path *append_path = lfirst(lc); - double total_groups = append_path->rows * append_path->parallel_workers; - - Path *gather_path = (Path *) create_gather_path(root, - partially_grouped_rel, - append_path, - partially_grouped_rel->reltarget, - NULL, - &total_groups); - add_path(partially_grouped_rel, (Path *) gather_path); - } } } @@ -710,65 +693,91 @@ tsl_pushdown_partial_agg(PlannerInfo *root, Hypertable *ht, RelOptInfo *input_re } /* Replan aggregation if we were able to generate partially grouped rel paths */ - if (partially_grouped_rel->pathlist == NIL) + List *partially_grouped_paths = + list_concat(partially_grouped_rel->pathlist, partially_grouped_rel->partial_pathlist); + if (partially_grouped_paths == NIL) return; /* Prefer our paths */ output_rel->pathlist = NIL; output_rel->partial_pathlist = NIL; - /* Finalize the created partially aggregated paths by adding a 'Finalize Aggregate' node on top - * of them. */ + /* + * Finalize the created partially aggregated paths by adding a + * 'Finalize Aggregate' node on top of them, and adding Sort and Gather + * nodes as required. + */ AggClauseCosts *agg_final_costs = &extra_data->agg_final_costs; - foreach (lc, partially_grouped_rel->pathlist) + foreach (lc, partially_grouped_paths) { - Path *append_path = lfirst(lc); - - if (contains_path_plain_or_sorted_agg(append_path)) + Path *partially_aggregated_path = lfirst(lc); + AggStrategy final_strategy; + if (contains_path_plain_or_sorted_agg(partially_aggregated_path)) { - bool is_sorted; - - is_sorted = pathkeys_contained_in(root->group_pathkeys, append_path->pathkeys); - + const bool is_sorted = + pathkeys_contained_in(root->group_pathkeys, partially_aggregated_path->pathkeys); if (!is_sorted) { - append_path = (Path *) - create_sort_path(root, output_rel, append_path, root->group_pathkeys, -1.0); + partially_aggregated_path = (Path *) create_sort_path(root, + output_rel, + partially_aggregated_path, + root->group_pathkeys, + -1.0); } - add_path(output_rel, - (Path *) create_agg_path(root, - output_rel, - append_path, - grouping_target, - parse->groupClause ? AGG_SORTED : AGG_PLAIN, - AGGSPLIT_FINAL_DESERIAL, -#if PG16_LT - parse->groupClause, -#else - root->processed_groupClause, -#endif - (List *) parse->havingQual, - agg_final_costs, - d_num_groups)); + final_strategy = parse->groupClause ? AGG_SORTED : AGG_PLAIN; } else { - add_path(output_rel, - (Path *) create_agg_path(root, - output_rel, - append_path, - grouping_target, - AGG_HASHED, - AGGSPLIT_FINAL_DESERIAL, + final_strategy = AGG_HASHED; + } + + /* + * We have to add a Gather or Gather Merge on top of parallel plans. It + * goes above the Sort we might have added just before, so that the Sort + * is parallelized as well. + */ + if (partially_aggregated_path->parallel_workers > 0) + { + double total_groups = + partially_aggregated_path->rows * partially_aggregated_path->parallel_workers; + if (partially_aggregated_path->pathkeys == NIL) + { + partially_aggregated_path = + (Path *) create_gather_path(root, + partially_grouped_rel, + partially_aggregated_path, + partially_grouped_rel->reltarget, + /* required_outer = */ NULL, + &total_groups); + } + else + { + partially_aggregated_path = + (Path *) create_gather_merge_path(root, + partially_grouped_rel, + partially_aggregated_path, + partially_grouped_rel->reltarget, + partially_aggregated_path->pathkeys, + /* required_outer = */ NULL, + &total_groups); + } + } + + add_path(output_rel, + (Path *) create_agg_path(root, + output_rel, + partially_aggregated_path, + grouping_target, + final_strategy, + AGGSPLIT_FINAL_DESERIAL, #if PG16_LT - parse->groupClause, + parse->groupClause, #else - root->processed_groupClause, + root->processed_groupClause, #endif - (List *) parse->havingQual, - agg_final_costs, - d_num_groups)); - } + (List *) parse->havingQual, + agg_final_costs, + d_num_groups)); } } diff --git a/tsl/test/shared/expected/chunkwise_agg_gather_sort.out b/tsl/test/shared/expected/chunkwise_agg_gather_sort.out new file mode 100644 index 00000000000..9e4c3ce735d --- /dev/null +++ b/tsl/test/shared/expected/chunkwise_agg_gather_sort.out @@ -0,0 +1,61 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. +-- Exercise the GatherMerge -> Sort -> Append -> Partial Aggregate plan. +\set prefix 'explain (costs off, timing off, summary off)' +set parallel_setup_cost = 0; +set parallel_tuple_cost = 0; +set max_parallel_workers_per_gather = 2; +set parallel_leader_participation = off; +set enable_hashagg to off; +:prefix +select count(*) from metrics group by v0; +QUERY PLAN + Finalize GroupAggregate + Group Key: _hyper_X_X_chunk.v0 + -> Gather Merge + Workers Planned: 2 + -> Sort + Sort Key: _hyper_X_X_chunk.v0 + -> Parallel Append + -> Partial GroupAggregate + Group Key: _hyper_X_X_chunk.v0 + -> Sort + Sort Key: _hyper_X_X_chunk.v0 + -> Seq Scan on _hyper_X_X_chunk + -> Partial GroupAggregate + Group Key: _hyper_X_X_chunk.v0 + -> Sort + Sort Key: _hyper_X_X_chunk.v0 + -> Seq Scan on _hyper_X_X_chunk + -> Partial GroupAggregate + Group Key: _hyper_X_X_chunk.v0 + -> Sort + Sort Key: _hyper_X_X_chunk.v0 + -> Seq Scan on _hyper_X_X_chunk +(22 rows) + +reset enable_hashagg; +:prefix +select count(*) from metrics group by v0; +QUERY PLAN + Finalize HashAggregate + Group Key: _hyper_X_X_chunk.v0 + -> Gather + Workers Planned: 2 + -> Parallel Append + -> Partial HashAggregate + Group Key: _hyper_X_X_chunk.v0 + -> Seq Scan on _hyper_X_X_chunk + -> Partial HashAggregate + Group Key: _hyper_X_X_chunk.v0 + -> Seq Scan on _hyper_X_X_chunk + -> Partial HashAggregate + Group Key: _hyper_X_X_chunk.v0 + -> Seq Scan on _hyper_X_X_chunk +(14 rows) + +reset parallel_setup_cost; +reset parallel_tuple_cost; +reset max_parallel_workers_per_gather; +reset parallel_leader_participation; diff --git a/tsl/test/shared/sql/CMakeLists.txt b/tsl/test/shared/sql/CMakeLists.txt index 74ddb376c9c..109295d882b 100644 --- a/tsl/test/shared/sql/CMakeLists.txt +++ b/tsl/test/shared/sql/CMakeLists.txt @@ -1,5 +1,6 @@ set(TEST_FILES_SHARED cagg_compression.sql + chunkwise_agg_gather_sort.sql classify_relation.sql compat.sql constify_timestamptz_op_interval.sql diff --git a/tsl/test/shared/sql/chunkwise_agg_gather_sort.sql b/tsl/test/shared/sql/chunkwise_agg_gather_sort.sql new file mode 100644 index 00000000000..39f062f73e9 --- /dev/null +++ b/tsl/test/shared/sql/chunkwise_agg_gather_sort.sql @@ -0,0 +1,27 @@ +-- This file and its contents are licensed under the Timescale License. +-- Please see the included NOTICE for copyright information and +-- LICENSE-TIMESCALE for a copy of the license. + +-- Exercise the GatherMerge -> Sort -> Append -> Partial Aggregate plan. + +\set prefix 'explain (costs off, timing off, summary off)' + +set parallel_setup_cost = 0; +set parallel_tuple_cost = 0; +set max_parallel_workers_per_gather = 2; +set parallel_leader_participation = off; + +set enable_hashagg to off; + +:prefix +select count(*) from metrics group by v0; + +reset enable_hashagg; + +:prefix +select count(*) from metrics group by v0; + +reset parallel_setup_cost; +reset parallel_tuple_cost; +reset max_parallel_workers_per_gather; +reset parallel_leader_participation;