From a31466d32b7e926c7be8157c49806fb6645c43a2 Mon Sep 17 00:00:00 2001 From: Dipesh Dhameliya Date: Mon, 18 Dec 2023 21:20:33 +0530 Subject: [PATCH] BABEL: Fixed error code difference occurring due to parallel worker enforced (#260) When any error raised from parallel worker then Postgres by default does not share message_id (part of edata) to leader worker. Babelfish's error mapping framework relies on message_id to map to correct T-SQL error code. But since it is no more available to leader worker, T-SQL error code would not be found and default error code will be used. This will also affect the transactional behaviour of given error. In order to fix this issue, we need to store if given worker is spawned in the context of Babelfish or not to share error message_id. Hence, this changes made changes in two data structure namely Port and FixedParallelState to store context about Babelfish Extension changes: https://github.com/babelfish-for-postgresql/babelfish_extensions/pull/2047 Task: BABEL-4539 Signed-off-by: Dipesh Dhameliya --- src/backend/access/transam/parallel.c | 14 ++++++++++++++ src/backend/executor/execMain.c | 6 ++---- src/backend/libpq/pqmq.c | 10 ++++++++++ src/backend/utils/error/elog.c | 18 ++++++++++++++++++ src/include/access/parallel.h | 3 +++ src/include/libpq/libpq-be.h | 1 + src/include/postgres_ext.h | 1 + 7 files changed, 49 insertions(+), 4 deletions(-) diff --git a/src/backend/access/transam/parallel.c b/src/backend/access/transam/parallel.c index 2b8bc2f58dd..029803da357 100644 --- a/src/backend/access/transam/parallel.c +++ b/src/backend/access/transam/parallel.c @@ -97,6 +97,9 @@ typedef struct FixedParallelState TimestampTz stmt_ts; SerializableXactHandle serializable_xact_handle; + /* Track if parallel worker is spawned in the context of Babelfish */ + bool babelfish_context; + /* Mutex protects remaining fields. */ slock_t mutex; @@ -336,6 +339,7 @@ InitializeParallelDSM(ParallelContext *pcxt) fps->xact_ts = GetCurrentTransactionStartTimestamp(); fps->stmt_ts = GetCurrentStatementStartTimestamp(); fps->serializable_xact_handle = ShareSerializableXact(); + fps->babelfish_context = MyProcPort->is_tds_conn; SpinLockInit(&fps->mutex); fps->last_xlog_end = 0; shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps); @@ -1620,3 +1624,13 @@ LookupParallelWorkerFunction(const char *libraryname, const char *funcname) return (parallel_worker_main_type) load_external_function(libraryname, funcname, true, NULL); } + +/* + * IsBabelfishParallelWorker - return bool based on whether given worker is + * spawned in Babelfish context. + */ +bool +IsBabelfishParallelWorker(void) +{ + return (IsParallelWorker() && MyFixedParallelState->babelfish_context); +} \ No newline at end of file diff --git a/src/backend/executor/execMain.c b/src/backend/executor/execMain.c index ccc1ba3f0fc..6f1c738a0ef 100644 --- a/src/backend/executor/execMain.c +++ b/src/backend/executor/execMain.c @@ -848,12 +848,10 @@ InitPlan(QueryDesc *queryDesc, int eflags) int i; /* - * Do permissions checks + * Do permissions checks if not Babelfish parallel worker */ - if (!(sql_dialect == SQL_DIALECT_TSQL && IsParallelWorker())) - { + if (!IsBabelfishParallelWorker()) ExecCheckPermissions(rangeTable, plannedstmt->permInfos, true); - } /* * initialize the node's execution state diff --git a/src/backend/libpq/pqmq.c b/src/backend/libpq/pqmq.c index 39e77ef945d..6428eeb6d5d 100644 --- a/src/backend/libpq/pqmq.c +++ b/src/backend/libpq/pqmq.c @@ -322,6 +322,16 @@ pq_parse_errornotice(StringInfo msg, ErrorData *edata) case PG_DIAG_SOURCE_FUNCTION: edata->funcname = pstrdup(value); break; + case PG_DIAG_MESSAGE_ID: + if (MyProcPort->is_tds_conn) + { + edata->message_id = (const char *) pstrdup(value); + } + else + { + elog(ERROR, "Unexpected error field message_id is found"); + } + break; default: elog(ERROR, "unrecognized error field code: %d", (int) code); break; diff --git a/src/backend/utils/error/elog.c b/src/backend/utils/error/elog.c index 90af1290742..c4350f85ac2 100644 --- a/src/backend/utils/error/elog.c +++ b/src/backend/utils/error/elog.c @@ -67,6 +67,7 @@ #endif #include "access/transam.h" +#include "access/parallel.h" #include "access/xact.h" #include "libpq/libpq.h" #include "libpq/pqformat.h" @@ -1894,6 +1895,14 @@ ThrowErrorData(ErrorData *edata) if (edata->internalquery) newedata->internalquery = pstrdup(edata->internalquery); + /* + * Generally, vanilla postgres does not share messaged_id with leader node from + * parallel worker. But case of Babelfish where message_id is needed to find + * T-SQL error code; below hook is defined to handle message_id for Babelfish. + */ + if (edata->message_id) + newedata->message_id = (const char *) pstrdup(edata->message_id); + MemoryContextSwitchTo(oldcontext); recursion_depth--; @@ -3576,6 +3585,15 @@ send_message_to_frontend(ErrorData *edata) err_sendstring(&msgbuf, edata->funcname); } + if (IsBabelfishParallelWorker()) + { + if (edata->message_id) + { + pq_sendbyte(&msgbuf, PG_DIAG_MESSAGE_ID); + err_sendstring(&msgbuf, edata->message_id); + } + } + pq_sendbyte(&msgbuf, '\0'); /* terminator */ pq_endmessage(&msgbuf); diff --git a/src/include/access/parallel.h b/src/include/access/parallel.h index 061f8a4c4ca..83d9373a47c 100644 --- a/src/include/access/parallel.h +++ b/src/include/access/parallel.h @@ -79,4 +79,7 @@ extern void ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end); extern void ParallelWorkerMain(Datum main_arg); +/* Below helpers are added to support parallel workers in Babelfish context */ +extern bool IsBabelfishParallelWorker(void); + #endif /* PARALLEL_H */ diff --git a/src/include/libpq/libpq-be.h b/src/include/libpq/libpq-be.h index 25d3c193df0..67e066b1526 100644 --- a/src/include/libpq/libpq-be.h +++ b/src/include/libpq/libpq-be.h @@ -259,6 +259,7 @@ typedef struct Port SSL *ssl; X509 *peer; #endif + bool is_tds_conn; } Port; #ifdef USE_SSL diff --git a/src/include/postgres_ext.h b/src/include/postgres_ext.h index 240ad4e93bf..22f6de27c8d 100644 --- a/src/include/postgres_ext.h +++ b/src/include/postgres_ext.h @@ -69,5 +69,6 @@ typedef PG_INT64_TYPE pg_int64; #define PG_DIAG_SOURCE_FILE 'F' #define PG_DIAG_SOURCE_LINE 'L' #define PG_DIAG_SOURCE_FUNCTION 'R' +#define PG_DIAG_MESSAGE_ID 'I' #endif /* POSTGRES_EXT_H */