diff --git a/src/backend/distributed/commands/multi_copy.c b/src/backend/distributed/commands/multi_copy.c index cb64ef7f55a..256a1b9da1f 100644 --- a/src/backend/distributed/commands/multi_copy.c +++ b/src/backend/distributed/commands/multi_copy.c @@ -3475,7 +3475,7 @@ InitializeCopyShardState(CopyShardState *shardState, ereport(ERROR, (errmsg("could not connect to any active placements"))); } - EnsureTaskExecutionAllowed(hasRemoteCopy); + EnsureTaskExecutionAllowed(hasRemoteCopy, true); /* * We just error out and code execution should never reach to this diff --git a/src/backend/distributed/executor/adaptive_executor.c b/src/backend/distributed/executor/adaptive_executor.c index e912f418d6f..683fc725d75 100644 --- a/src/backend/distributed/executor/adaptive_executor.c +++ b/src/backend/distributed/executor/adaptive_executor.c @@ -1385,7 +1385,7 @@ StartDistributedExecution(DistributedExecution *execution) if (execution->remoteTaskList != NIL) { bool isRemote = true; - EnsureTaskExecutionAllowed(isRemote); + EnsureTaskExecutionAllowed(isRemote, true); } } diff --git a/src/backend/distributed/executor/local_executor.c b/src/backend/distributed/executor/local_executor.c index bedaa643e29..ad07bb71f71 100644 --- a/src/backend/distributed/executor/local_executor.c +++ b/src/backend/distributed/executor/local_executor.c @@ -230,7 +230,24 @@ ExecuteLocalTaskListExtended(List *taskList, if (taskList != NIL) { bool isRemote = false; - EnsureTaskExecutionAllowed(isRemote); + if (!EnsureTaskExecutionAllowed(isRemote, false)) + { + /* instead of erroring, let's check further */ + Task *task = NULL; + foreach_ptr(task, taskList) + { + if (!task->safeToPush) + ereport(ERROR, + (errmsg("cannot execute a distributed query from a query on a " + "shard"), + errdetail("Executing a distributed query in a function call that " + "may be pushed to a remote node can lead to incorrect " + "results."), + errhint("Avoid nesting of distributed queries or use alter user " + "current_user set citus.allow_nested_distributed_execution " + "to on to allow it with possible incorrectness."))); + } + } } /* diff --git a/src/backend/distributed/executor/multi_executor.c b/src/backend/distributed/executor/multi_executor.c index 386a278b4c7..78d05c75a6c 100644 --- a/src/backend/distributed/executor/multi_executor.c +++ b/src/backend/distributed/executor/multi_executor.c @@ -899,15 +899,17 @@ ExecutorBoundParams(void) * a function in a query that gets pushed down to the worker, and the * function performs a query on a distributed table. */ -void -EnsureTaskExecutionAllowed(bool isRemote) +bool +EnsureTaskExecutionAllowed(bool isRemote, bool shouldError) { if (IsTaskExecutionAllowed(isRemote)) { - return; + return true; } - ereport(ERROR, (errmsg("cannot execute a distributed query from a query on a " + if (shouldError) + ereport(ERROR, + (errmsg("cannot execute a distributed query from a query on a " "shard"), errdetail("Executing a distributed query in a function call that " "may be pushed to a remote node can lead to incorrect " @@ -915,6 +917,8 @@ EnsureTaskExecutionAllowed(bool isRemote) errhint("Avoid nesting of distributed queries or use alter user " "current_user set citus.allow_nested_distributed_execution " "to on to allow it with possible incorrectness."))); + + return false; } diff --git a/src/backend/distributed/planner/multi_router_planner.c b/src/backend/distributed/planner/multi_router_planner.c index 44f955a3227..4f6bd4616f1 100644 --- a/src/backend/distributed/planner/multi_router_planner.c +++ b/src/backend/distributed/planner/multi_router_planner.c @@ -1793,6 +1793,8 @@ CreateTask(TaskType taskType) task->partiallyLocalOrRemote = false; task->relationShardList = NIL; + task->safeToPush = false; + return task; } @@ -2177,6 +2179,58 @@ SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList, task->replicationModel = replicationModel; task->parametersInQueryStringResolved = parametersInQueryResolved; + StringInfo sqlQueryString = makeStringInfo(); + pg_get_query_def(query, sqlQueryString); + /* log the query string we generated */ + ereport(DEBUG4, (errmsg("generated sql query for task %d", task->taskId), + errdetail("query string: \"%s\"", + sqlQueryString->data))); + + // if (query->hasTargetSRFs) + // if (query->rtable) + // if (query->jointree) + // if (query->targetList) + // if (query->returningList) + /* Check the target list */ + // task->safeToPush = true; + // ListCell *lc; + // bool foundUDF = false; + // foreach (lc, query->targetList) + // { + // TargetEntry *tle = (TargetEntry *) lfirst(lc); + // elog(DEBUG2, "walking target list"); + // if (ContainsUDFWalker((Node *) tle->expr, &foundUDF)) + // { + // task->safeToPush = false; + // elog(DEBUG2, "UNSAFE"); + // // break; + // } + // } + // + /* quick check first */ + // if (colocationId) //FIXME include header for INVALID_COLOCATION_ID ? + // { + // goto exitnow; + // } + + ListCell *lc; + bool foundNonReferenceTable = false; + foreach (lc, relationShardList) + { + RelationShard *relationShard = (RelationShard *) lfirst(lc); + // elog(DEBUG2, "walking relation shard list"); + if (!IsCitusTableType(relationShard->relationId, REFERENCE_TABLE)) + { + foundNonReferenceTable = true; + // elog(DEBUG2, "found UNSAFE"); + } + } + + if (!foundNonReferenceTable) + task->safeToPush = true; + +exitnow: + return list_make1(task); } diff --git a/src/backend/distributed/utils/citus_copyfuncs.c b/src/backend/distributed/utils/citus_copyfuncs.c index e283a3034c2..0889e8adf6f 100644 --- a/src/backend/distributed/utils/citus_copyfuncs.c +++ b/src/backend/distributed/utils/citus_copyfuncs.c @@ -327,6 +327,7 @@ CopyNodeTask(COPYFUNC_ARGS) COPY_SCALAR_FIELD(fetchedExplainAnalyzeExecutionDuration); COPY_SCALAR_FIELD(isLocalTableModification); COPY_SCALAR_FIELD(cannotBeExecutedInTransaction); + COPY_SCALAR_FIELD(safeToPush); } diff --git a/src/backend/distributed/utils/citus_outfuncs.c b/src/backend/distributed/utils/citus_outfuncs.c index 751063789d7..b2ecea62a0e 100644 --- a/src/backend/distributed/utils/citus_outfuncs.c +++ b/src/backend/distributed/utils/citus_outfuncs.c @@ -536,6 +536,7 @@ OutTask(OUTFUNC_ARGS) WRITE_FLOAT_FIELD(fetchedExplainAnalyzeExecutionDuration, "%.2f"); WRITE_BOOL_FIELD(isLocalTableModification); WRITE_BOOL_FIELD(cannotBeExecutedInTransaction); + WRITE_BOOL_FIELD(safeToPush); } diff --git a/src/include/distributed/multi_executor.h b/src/include/distributed/multi_executor.h index 6708d9a6445..786e0c7becc 100644 --- a/src/include/distributed/multi_executor.h +++ b/src/include/distributed/multi_executor.h @@ -153,7 +153,7 @@ extern void EnsureSequentialMode(ObjectType objType); extern void SetLocalForceMaxQueryParallelization(void); extern void SortTupleStore(CitusScanState *scanState); extern ParamListInfo ExecutorBoundParams(void); -extern void EnsureTaskExecutionAllowed(bool isRemote); +extern bool EnsureTaskExecutionAllowed(bool isRemote, bool shouldError); #endif /* MULTI_EXECUTOR_H */ diff --git a/src/include/distributed/multi_physical_planner.h b/src/include/distributed/multi_physical_planner.h index 475a41b37b8..88220c02d2b 100644 --- a/src/include/distributed/multi_physical_planner.h +++ b/src/include/distributed/multi_physical_planner.h @@ -334,6 +334,9 @@ typedef struct Task Const *partitionKeyValue; int colocationId; + + /* if it's granted this task nested statements are safe to be executed */ + bool safeToPush; } Task;