Skip to content

Commit

Permalink
Allows to execute some distributed stmt in functions
Browse files Browse the repository at this point in the history
  • Loading branch information
c2main committed Jan 6, 2025
1 parent 034a86e commit 9c630f1
Show file tree
Hide file tree
Showing 9 changed files with 88 additions and 8 deletions.
2 changes: 1 addition & 1 deletion src/backend/distributed/commands/multi_copy.c
Original file line number Diff line number Diff line change
Expand Up @@ -3475,7 +3475,7 @@ InitializeCopyShardState(CopyShardState *shardState,
ereport(ERROR, (errmsg("could not connect to any active placements")));
}

EnsureTaskExecutionAllowed(hasRemoteCopy);
EnsureTaskExecutionAllowed(hasRemoteCopy, true);

/*
* We just error out and code execution should never reach to this
Expand Down
2 changes: 1 addition & 1 deletion src/backend/distributed/executor/adaptive_executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -1385,7 +1385,7 @@ StartDistributedExecution(DistributedExecution *execution)
if (execution->remoteTaskList != NIL)
{
bool isRemote = true;
EnsureTaskExecutionAllowed(isRemote);
EnsureTaskExecutionAllowed(isRemote, true);
}
}

Expand Down
19 changes: 18 additions & 1 deletion src/backend/distributed/executor/local_executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,24 @@ ExecuteLocalTaskListExtended(List *taskList,
if (taskList != NIL)
{
bool isRemote = false;
EnsureTaskExecutionAllowed(isRemote);
if (!EnsureTaskExecutionAllowed(isRemote, false))
{
/* instead of erroring, let's check further */
Task *task = NULL;
foreach_ptr(task, taskList)
{
if (!task->safeToPush)
ereport(ERROR,
(errmsg("cannot execute a distributed query from a query on a "
"shard"),
errdetail("Executing a distributed query in a function call that "
"may be pushed to a remote node can lead to incorrect "
"results."),
errhint("Avoid nesting of distributed queries or use alter user "
"current_user set citus.allow_nested_distributed_execution "
"to on to allow it with possible incorrectness.")));
}
}
}

/*
Expand Down
12 changes: 8 additions & 4 deletions src/backend/distributed/executor/multi_executor.c
Original file line number Diff line number Diff line change
Expand Up @@ -899,22 +899,26 @@ ExecutorBoundParams(void)
* a function in a query that gets pushed down to the worker, and the
* function performs a query on a distributed table.
*/
void
EnsureTaskExecutionAllowed(bool isRemote)
bool
EnsureTaskExecutionAllowed(bool isRemote, bool shouldError)
{
if (IsTaskExecutionAllowed(isRemote))
{
return;
return true;
}

ereport(ERROR, (errmsg("cannot execute a distributed query from a query on a "
if (shouldError)
ereport(ERROR,
(errmsg("cannot execute a distributed query from a query on a "
"shard"),
errdetail("Executing a distributed query in a function call that "
"may be pushed to a remote node can lead to incorrect "
"results."),
errhint("Avoid nesting of distributed queries or use alter user "
"current_user set citus.allow_nested_distributed_execution "
"to on to allow it with possible incorrectness.")));

return false;
}


Expand Down
54 changes: 54 additions & 0 deletions src/backend/distributed/planner/multi_router_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -1793,6 +1793,8 @@ CreateTask(TaskType taskType)
task->partiallyLocalOrRemote = false;
task->relationShardList = NIL;

task->safeToPush = false;

return task;
}

Expand Down Expand Up @@ -2177,6 +2179,58 @@ SingleShardTaskList(Query *query, uint64 jobId, List *relationShardList,
task->replicationModel = replicationModel;
task->parametersInQueryStringResolved = parametersInQueryResolved;

StringInfo sqlQueryString = makeStringInfo();
pg_get_query_def(query, sqlQueryString);
/* log the query string we generated */
ereport(DEBUG4, (errmsg("generated sql query for task %d", task->taskId),
errdetail("query string: \"%s\"",
sqlQueryString->data)));

// if (query->hasTargetSRFs)
// if (query->rtable)
// if (query->jointree)
// if (query->targetList)
// if (query->returningList)
/* Check the target list */
// task->safeToPush = true;
// ListCell *lc;
// bool foundUDF = false;
// foreach (lc, query->targetList)
// {
// TargetEntry *tle = (TargetEntry *) lfirst(lc);
// elog(DEBUG2, "walking target list");
// if (ContainsUDFWalker((Node *) tle->expr, &foundUDF))
// {
// task->safeToPush = false;
// elog(DEBUG2, "UNSAFE");
// // break;
// }
// }
//
/* quick check first */
// if (colocationId) //FIXME include header for INVALID_COLOCATION_ID ?
// {
// goto exitnow;
// }

ListCell *lc;
bool foundNonReferenceTable = false;
foreach (lc, relationShardList)
{
RelationShard *relationShard = (RelationShard *) lfirst(lc);
// elog(DEBUG2, "walking relation shard list");
if (!IsCitusTableType(relationShard->relationId, REFERENCE_TABLE))
{
foundNonReferenceTable = true;
// elog(DEBUG2, "found UNSAFE");
}
}

if (!foundNonReferenceTable)
task->safeToPush = true;

exitnow:

return list_make1(task);
}

Expand Down
1 change: 1 addition & 0 deletions src/backend/distributed/utils/citus_copyfuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,7 @@ CopyNodeTask(COPYFUNC_ARGS)
COPY_SCALAR_FIELD(fetchedExplainAnalyzeExecutionDuration);
COPY_SCALAR_FIELD(isLocalTableModification);
COPY_SCALAR_FIELD(cannotBeExecutedInTransaction);
COPY_SCALAR_FIELD(safeToPush);
}


Expand Down
1 change: 1 addition & 0 deletions src/backend/distributed/utils/citus_outfuncs.c
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,7 @@ OutTask(OUTFUNC_ARGS)
WRITE_FLOAT_FIELD(fetchedExplainAnalyzeExecutionDuration, "%.2f");
WRITE_BOOL_FIELD(isLocalTableModification);
WRITE_BOOL_FIELD(cannotBeExecutedInTransaction);
WRITE_BOOL_FIELD(safeToPush);
}


Expand Down
2 changes: 1 addition & 1 deletion src/include/distributed/multi_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ extern void EnsureSequentialMode(ObjectType objType);
extern void SetLocalForceMaxQueryParallelization(void);
extern void SortTupleStore(CitusScanState *scanState);
extern ParamListInfo ExecutorBoundParams(void);
extern void EnsureTaskExecutionAllowed(bool isRemote);
extern bool EnsureTaskExecutionAllowed(bool isRemote, bool shouldError);


#endif /* MULTI_EXECUTOR_H */
3 changes: 3 additions & 0 deletions src/include/distributed/multi_physical_planner.h
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,9 @@ typedef struct Task

Const *partitionKeyValue;
int colocationId;

/* if it's granted this task nested statements are safe to be executed */
bool safeToPush;
} Task;


Expand Down

0 comments on commit 9c630f1

Please sign in to comment.