-
Notifications
You must be signed in to change notification settings - Fork 684
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
An incorrect query result, where the distributed query plan seems wrong #7698
Comments
the distributed query plan of this query: task count should not be 0. |
This PR addresses two reported issues: citusdata#7698 and citusdata#7697. The Problem: Both issues involve incorrect query results when a query references both local and distributed tables, and includes a WHERE EXISTS clause on the local table. For example: SELECT ... WHERE EXISTS (SELECT * FROM local_table); In such cases, the WHERE EXISTS clause typically generates an InitPlan or "one-time filter," which determines whether the rest of the plan's output qualifies for the result. If this InitPlan relies on the contents of a local table, it must be executed locally on the coordinator. However, the planner's decisions regarding whether to convert local or distributed tables into intermediate results fail to account for the references within the InitPlan. This results in an incorrect query execution plan and, subsequently, incorrect data. The Fix: This PR ensures that when the standard planner (standard_planner) generates an InitPlan in the PlannedStmt, we check the executor parameters (PARAM nodes) in the join qualifiers for relations referenced by the InitPlan. If such references exist, distributed table references are converted to intermediate results rather than local tables. This adjustment ensures that local tables used in the InitPlan remain intact and behave as expected. This fix prevents incorrect query results in cases involving mixed local and distributed tables with WHERE EXISTS clauses and improves the accuracy of distributed query planning.
This PR addresses two reported issues: citusdata#7698 and citusdata#7697. The Problem: Both issues involve incorrect query results when a query references both local and distributed tables, and includes a WHERE EXISTS clause on the local table. For example: SELECT ... WHERE EXISTS (SELECT * FROM local_table); In such cases, the WHERE EXISTS clause typically generates an InitPlan or "one-time filter," which determines whether the rest of the plan's output qualifies for the result. If this InitPlan relies on the contents of a local table, it must be executed locally on the coordinator. However, the planner's decisions regarding whether to convert local or distributed tables into intermediate results fail to account for the references within the InitPlan. This results in an incorrect query execution plan and, subsequently, incorrect data. The Fix: This PR ensures that when the standard planner (standard_planner) generates an InitPlan in the PlannedStmt, we check the executor parameters (PARAM nodes) in the join qualifiers for relations referenced by the InitPlan. If such references exist, distributed table references are converted to intermediate results rather than local tables. This adjustment ensures that local tables used in the InitPlan remain intact and behave as expected. This fix prevents incorrect query results in cases involving mixed local and distributed tables with WHERE EXISTS clauses and improves the accuracy of distributed query planning.
This PR addresses two reported issues: citusdata#7698 and citusdata#7697. The Problem: Both issues involve incorrect query results when a query references both local and distributed tables, and includes a WHERE EXISTS clause on the local table. For example: SELECT ... WHERE EXISTS (SELECT * FROM local_table); In such cases, the WHERE EXISTS clause typically generates an InitPlan or "one-time filter," which determines whether the rest of the plan's output qualifies for the result. If this InitPlan relies on the contents of a local table, it must be executed locally on the coordinator. However, the planner's decisions regarding whether to convert local or distributed tables into intermediate results fail to account for the references within the InitPlan. This results in an incorrect query execution plan and, subsequently, incorrect data. The Fix: This PR ensures that when the standard planner (standard_planner) generates an InitPlan in the PlannedStmt, we check the executor parameters (PARAM nodes) in the join qualifiers for relations referenced by the InitPlan. If such references exist, distributed table references are converted to intermediate results rather than local tables. This adjustment ensures that local tables used in the InitPlan remain intact and behave as expected. This fix prevents incorrect query results in cases involving mixed local and distributed tables with WHERE EXISTS clauses and improves the accuracy of distributed query planning.
The actual problem is that we directly push down this query and this is wrong: set client_min_messages to debug3;
select t3.vkey
from (t1 right outer join t3
on (t1.c10 = t3.vkey ))
where exists (select * from t3);
DEBUG: no shard pruning constraints on t1 found
DEBUG: shard count after pruning for t1: 32
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: no shard pruning constraints on t1 found
DEBUG: shard count after pruning for t1: 32
DEBUG: assigned task 1 to node localhost:9701
DEBUG: assigned task 2 to node localhost:9702
DEBUG: assigned task 3 to node localhost:9701
DEBUG: assigned task 4 to node localhost:9702
..
DEBUG: assigned task 32 to node localhost:9702 So, for such a query, where t3 is a reference table (which we call as a recurring rel) and t1 is a distributed table, and where recurring relation is in the outer part of the outer part of the outer join and the distributed one is in the inner part of the join; we should normally go through recursive planning and should first recursively plan the distributed rel. See this; /*
* Similarly, logical planner cannot handle outer joins when the outer rel
* is recurring, such as "<recurring> LEFT JOIN <distributed>". In that case,
* we convert distributed table into a subquery and recursively plan inner
* side of the outer join. That way, inner rel gets converted into an intermediate
* result and logical planner can handle the new query since it's of the from
* "<recurring> LEFT JOIN <recurring>".
*/
if (ShouldRecursivelyPlanOuterJoins(context))
{
RecursivelyPlanRecurringTupleOuterJoinWalker((Node *) query->jointree,
query, context);
} Otherwise, i.e., when push-down this query, it causes returning incorrect result set because the tuples that don't exist in shards of the distributed table would appear in each outer join pushed down to the shards. In the past, because pushing down such a query would cause returning such an incorrect result set, we were throwing an error. But as of #6512, we support such queries by recursively planning the distributed table first. In other words, we first convert it into another recurring relation form which we call as "intermediate result". Although this is not so much ideal from performance perspective, this at least allow us supporting such queries in an appropriate way. Having said that, this issue doesn't seem specific to local tables but can be generalized to set returning functions and reference tables etc. too. One thing that looks interesting is that when I remove the set client_min_messages to debug2;
select * from (t2 full outer join t1 on(t2.vkey = t1.vkey ));
DEBUG: Router planner cannot handle multi-shard select queries
**DEBUG: recursively planning right side of the full join since the other side is a recurring rel**
**DEBUG: recursively planning distributed relation "t1" since it is part of a distributed join node that is outer joined with a recurring rel**
DEBUG: Wrapping relation "t1" to a subquery
DEBUG: Router planner cannot handle multi-shard select queries
DEBUG: generating subplan 10_1 for subquery SELECT vkey FROM public.t1 WHERE true
DEBUG: Plan 10 query after replacing subqueries and CTEs: SELECT t2.vkey, t1.vkey FROM (public.t2 FULL JOIN (SELECT t1_1.vkey FROM (SELECT intermediate_result.vkey FROM read_intermediate_result('10_1'::text, 'binary'::citus_copy_format) intermediate_result(vkey integer)) t1_1) t1 ON ((t2.vkey OPERATOR(pg_catalog.=) t1.vkey)))
DEBUG: Creating router plan
┌──────┬──────┐
│ vkey │ vkey │
├──────┼──────┤
│ 5 │ │
└──────┴──────┘
(1 row) In other words, in that case, we first convert the distributed table into an intermediate result and everything goes well. However, when the So, either standard_planner() or a prior part of the Citus planner seems to convert the outer join mentioned above into some other format and that causes us incorrectly planning the query. @codeforall & me will debug this a bit to see what's happening here. (cc: @colm-mchugh) |
@onurctirtir this problem (and both #7697 and #7696) does not occur with Citus 13 and on investigation the problem is with Postgres 16, because of pg commit 695f5deb79, which prevents the cc @codeforall sharing this update in case its useful to your PR |
I made t1 as a distributed table:
SELECT create_distributed_table('t1', 'vkey');
query:
The result should be t3.vkey = 1 ,but actually returns nothing. The citus version is 12.5.
The text was updated successfully, but these errors were encountered: