From c057a690148dfe87b50c36ae5e884d6906de4fa7 Mon Sep 17 00:00:00 2001 From: Dipesh Dhameliya Date: Wed, 25 Oct 2023 17:02:25 +0530 Subject: [PATCH] =?UTF-8?q?Fixed=20error:=20=E2=80=9Clost=20connection=20t?= =?UTF-8?q?o=20parallel=20worker=E2=80=9D=20when=20running=20parallel=20qu?= =?UTF-8?q?ery=20(#1883)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit As per coding convention of Postgresql, One should not use elog/ereport with any level of log when error report cycle is in progress. Use of such elog may run into various error particularly when error being processed is reported from parallel worker. This may even run into crashes during cleanup. In Babelfish, we are using such elog to report log/debug without holding interrupt inside error report cycle which is leading to error like “lost connection to parallel worker”. This commit aim to fix such issue by appropriately holding/resuming interrupts during error report cycle. Task: BABEL-4393 Signed-off-by: Dipesh Dhameliya --- .../src/backend/tds/err_handler.c | 12 ++++++++++++ .../src/backend/tds/tdsbulkload.c | 2 ++ .../babelfishpg_tds/src/backend/tds/tdsrpc.c | 8 ++++++++ .../src/backend/tds/tdssqlbatch.c | 4 ++++ contrib/babelfishpg_tsql/src/iterative_exec.c | 19 ++++++++++++++++++- contrib/babelfishpg_tsql/src/linked_servers.c | 2 ++ contrib/babelfishpg_tsql/src/pl_comp.c | 2 ++ contrib/babelfishpg_tsql/src/pl_handler.c | 7 +++++++ .../babelfishpg_tsql/src/pltsql_bulkcopy.c | 2 ++ contrib/babelfishpg_tsql/src/procedures.c | 5 +++++ 10 files changed, 62 insertions(+), 1 deletion(-) diff --git a/contrib/babelfishpg_tds/src/backend/tds/err_handler.c b/contrib/babelfishpg_tds/src/backend/tds/err_handler.c index 32781203df..1c4e5d5142 100644 --- a/contrib/babelfishpg_tds/src/backend/tds/err_handler.c +++ b/contrib/babelfishpg_tds/src/backend/tds/err_handler.c @@ -198,8 +198,12 @@ get_tsql_error_details(ErrorData *edata, /* Possible infinite loop of errors. Do not touch it further. */ if (!error_stack_full()) + { + HOLD_INTERRUPTS(); elog(LOG, "Unmapped error found. Code: %d, Message: %s, File: %s, Line: %d, Context: %s", edata->sqlerrcode, edata->message, edata->filename, edata->lineno, error_context); + RESUME_INTERRUPTS(); + } return false; } @@ -270,8 +274,12 @@ get_tsql_error_details(ErrorData *edata, /* Possible infinite loop of errors. Do not touch it further. */ if (!error_stack_full()) + { + HOLD_INTERRUPTS(); elog(LOG, "Unmapped error found. Code: %d, Message: %s, File: %s, Line: %d, Context: %s", edata->sqlerrcode, edata->message, edata->filename, edata->lineno, error_context); + RESUME_INTERRUPTS(); + } *tsql_error_code = ERRCODE_PLTSQL_ERROR_NOT_MAPPED; *tsql_error_severity = 16; @@ -304,7 +312,9 @@ emit_tds_log(ErrorData *edata) if (edata->elevel < ERROR) { + HOLD_INTERRUPTS(); elog(DEBUG5, "suppressing informational client message < ERROR"); + RESUME_INTERRUPTS(); /* reset the flag */ tds_disable_error_log_hook = false; @@ -359,9 +369,11 @@ emit_tds_log(ErrorData *edata) /* Log the internal error message */ ErrorData *next_edata; + HOLD_INTERRUPTS(); next_edata = CopyErrorData(); elog(LOG, "internal error occurred: %s", next_edata->message); FreeErrorData(next_edata); + RESUME_INTERRUPTS(); } PG_END_TRY(); diff --git a/contrib/babelfishpg_tds/src/backend/tds/tdsbulkload.c b/contrib/babelfishpg_tds/src/backend/tds/tdsbulkload.c index ce7e007e35..2461d6cd53 100644 --- a/contrib/babelfishpg_tds/src/backend/tds/tdsbulkload.c +++ b/contrib/babelfishpg_tds/src/backend/tds/tdsbulkload.c @@ -1008,6 +1008,7 @@ ProcessBCPRequest(TDSRequest request) int ret = 0; HOLD_CANCEL_INTERRUPTS(); + HOLD_INTERRUPTS(); /* * Discard remaining TDS_BULK_LOAD packets only if End of @@ -1029,6 +1030,7 @@ ProcessBCPRequest(TDSRequest request) req->rowCount, req->colCount), errhidestmt(true))); + RESUME_INTERRUPTS(); PG_RE_THROW(); } PG_END_TRY(); diff --git a/contrib/babelfishpg_tds/src/backend/tds/tdsrpc.c b/contrib/babelfishpg_tds/src/backend/tds/tdsrpc.c index 958e11a89a..0142a74b33 100644 --- a/contrib/babelfishpg_tds/src/backend/tds/tdsrpc.c +++ b/contrib/babelfishpg_tds/src/backend/tds/tdsrpc.c @@ -550,6 +550,7 @@ SPExecuteSQL(TDSRequestSP req) } PG_CATCH(); { + HOLD_INTERRUPTS(); if (TDS_DEBUG_ENABLED(TDS_DEBUG2)) ereport(LOG, (errmsg("sp_executesql statement: %s", s.data), @@ -557,6 +558,7 @@ SPExecuteSQL(TDSRequestSP req) errdetail_params(req->nTotalParams))); TDSStatementExceptionCallback(NULL, NULL, false); + RESUME_INTERRUPTS(); PG_RE_THROW(); } PG_END_TRY(); @@ -749,6 +751,7 @@ SPExecute(TDSRequestSP req) } PG_CATCH(); { + HOLD_INTERRUPTS(); if (TDS_DEBUG_ENABLED(TDS_DEBUG2)) ereport(LOG, (errmsg("sp_execute handle: %d", req->handle), @@ -757,6 +760,7 @@ SPExecute(TDSRequestSP req) TDSStatementExceptionCallback(NULL, NULL, false); tvp_lookup_list = NIL; + RESUME_INTERRUPTS(); PG_RE_THROW(); } PG_END_TRY(); @@ -871,6 +875,7 @@ SPPrepExec(TDSRequestSP req) } PG_CATCH(); { + HOLD_INTERRUPTS(); if (TDS_DEBUG_ENABLED(TDS_DEBUG2)) ereport(LOG, (errmsg("sp_prepexec handle: %d, " @@ -880,6 +885,7 @@ SPPrepExec(TDSRequestSP req) TDSStatementExceptionCallback(NULL, NULL, false); tvp_lookup_list = NIL; + RESUME_INTERRUPTS(); PG_RE_THROW(); } PG_END_TRY(); @@ -1099,6 +1105,7 @@ SPCustomType(TDSRequestSP req) } PG_CATCH(); { + HOLD_INTERRUPTS(); if (TDS_DEBUG_ENABLED(TDS_DEBUG2)) ereport(LOG, (errmsg("stored procedure: %s", req->name.data), @@ -1107,6 +1114,7 @@ SPCustomType(TDSRequestSP req) tvp_lookup_list = NIL; + RESUME_INTERRUPTS(); PG_RE_THROW(); } PG_END_TRY(); diff --git a/contrib/babelfishpg_tds/src/backend/tds/tdssqlbatch.c b/contrib/babelfishpg_tds/src/backend/tds/tdssqlbatch.c index db10653445..8acf94c751 100644 --- a/contrib/babelfishpg_tds/src/backend/tds/tdssqlbatch.c +++ b/contrib/babelfishpg_tds/src/backend/tds/tdssqlbatch.c @@ -94,9 +94,13 @@ ExecuteSQLBatch(char *query) PG_CATCH(); { if (TDS_DEBUG_ENABLED(TDS_DEBUG2)) + { + HOLD_INTERRUPTS(); ereport(LOG, (errmsg("sql_batch statement: %s", query), errhidestmt(true))); + RESUME_INTERRUPTS(); + } PG_RE_THROW(); } diff --git a/contrib/babelfishpg_tsql/src/iterative_exec.c b/contrib/babelfishpg_tsql/src/iterative_exec.c index 8227eda5ad..12c2947316 100644 --- a/contrib/babelfishpg_tsql/src/iterative_exec.c +++ b/contrib/babelfishpg_tsql/src/iterative_exec.c @@ -1141,7 +1141,9 @@ handle_error(PLtsql_execstate *estate, /* Mark transaction for termination */ if (IsTransactionBlockActive() && (last_error_mapping_failed || abort_transaction(estate, edata, override_flag))) { + HOLD_INTERRUPTS(); elog(DEBUG1, "TSQL TXN Mark transaction for rollback error mapping failed : %d", last_error_mapping_failed); + RESUME_INTERRUPTS(); AbortCurTransaction = true; send_env_change_token_on_txn_abort(); } @@ -1155,7 +1157,9 @@ handle_error(PLtsql_execstate *estate, /* In case of errors which terminate execution, let outer layer handle it */ if (last_error_mapping_failed || abort_execution(estate, edata, terminate_batch, override_flag) || ro_func) { + HOLD_INTERRUPTS(); elog(DEBUG1, "TSQL TXN Stop execution error mapping failed : %d current batch status : %d read only function : %d", last_error_mapping_failed, *terminate_batch, ro_func); + RESUME_INTERRUPTS(); FreeErrorData(edata); PG_RE_THROW(); } @@ -1286,15 +1290,18 @@ dispatch_stmt_handle_error(PLtsql_execstate *estate, { if (internal_sp_started) { + HOLD_INTERRUPTS(); elog(DEBUG1, "TSQL TXN PG semantics : Rollback internal savepoint"); RollbackAndReleaseCurrentSubTransaction(); MemoryContextSwitchTo(cur_ctxt); + RESUME_INTERRUPTS(); CurrentResourceOwner = oldowner; } else if (!IsTransactionBlockActive()) { if (is_part_of_pltsql_trycatch_block(estate)) { + HOLD_INTERRUPTS(); elog(DEBUG1, "TSQL TXN PG semantics : Rollback current transaction"); HoldPinnedPortals(); SPI_setCurrentInternalTxnMode(true); @@ -1302,11 +1309,14 @@ dispatch_stmt_handle_error(PLtsql_execstate *estate, StartTransactionCommand(); SPI_setCurrentInternalTxnMode(false); MemoryContextSwitchTo(cur_ctxt); + RESUME_INTERRUPTS(); } } else { + HOLD_INTERRUPTS(); elog(DEBUG1, "TSQL TXN PG semantics : Mark transaction for rollback"); + RESUME_INTERRUPTS(); AbortCurTransaction = true; } /* Recreate evaluation context in case needed */ @@ -1334,10 +1344,12 @@ dispatch_stmt_handle_error(PLtsql_execstate *estate, before_lxid == MyProc->lxid && before_subtxn_id == GetCurrentSubTransactionId()) { + HOLD_INTERRUPTS(); elog(DEBUG1, "TSQL TXN TSQL semantics : Rollback internal savepoint"); /* Rollback internal savepoint if it is current savepoint */ RollbackAndReleaseCurrentSubTransaction(); MemoryContextSwitchTo(cur_ctxt); + RESUME_INTERRUPTS(); CurrentResourceOwner = oldowner; } else if (!IsTransactionBlockActive()) @@ -1346,13 +1358,14 @@ dispatch_stmt_handle_error(PLtsql_execstate *estate, * In case of no transaction, rollback the whole transaction to * match auto commit behavior */ - + HOLD_INTERRUPTS(); elog(DEBUG1, "TSQL TXN TSQL semantics : Rollback current transaction"); /* Hold portals to make sure that cursors work */ HoldPinnedPortals(); AbortCurrentTransaction(); StartTransactionCommand(); MemoryContextSwitchTo(cur_ctxt); + RESUME_INTERRUPTS(); } else if (estate->tsql_trigger_flags & TSQL_TRAN_STARTED) { @@ -1360,11 +1373,13 @@ dispatch_stmt_handle_error(PLtsql_execstate *estate, * Trigger must run inside an explicit transaction In case of * error, rollback the transaction */ + HOLD_INTERRUPTS(); elog(DEBUG1, "TSQL TXN TSQL semantics : Rollback internal transaction"); HoldPinnedPortals(); pltsql_rollback_txn(); estate->tsql_trigger_flags &= ~TSQL_TRAN_STARTED; MemoryContextSwitchTo(cur_ctxt); + RESUME_INTERRUPTS(); } @@ -1379,9 +1394,11 @@ dispatch_stmt_handle_error(PLtsql_execstate *estate, if (pltsql_implicit_transactions && IsTransactionBlockActive() && (estate->impl_txn_type == PLTSQL_IMPL_TRAN_START)) { + HOLD_INTERRUPTS(); elog(DEBUG1, "TSQL TXN TSQL semantics : Rollback implicit transaction"); pltsql_rollback_txn(); MemoryContextSwitchTo(cur_ctxt); + RESUME_INTERRUPTS(); } estate->impl_txn_type = PLTSQL_IMPL_TRAN_OFF; diff --git a/contrib/babelfishpg_tsql/src/linked_servers.c b/contrib/babelfishpg_tsql/src/linked_servers.c index 3338f24a92..07ae82aa3b 100644 --- a/contrib/babelfishpg_tsql/src/linked_servers.c +++ b/contrib/babelfishpg_tsql/src/linked_servers.c @@ -869,7 +869,9 @@ linked_server_establish_connection(char *servername, LinkedServerProcess * lspro } PG_CATCH(); { + HOLD_INTERRUPTS(); LINKED_SERVER_DEBUG("LINKED SERVER: Failed to establish connection to remote server due to error"); + RESUME_INTERRUPTS(); PG_RE_THROW(); } diff --git a/contrib/babelfishpg_tsql/src/pl_comp.c b/contrib/babelfishpg_tsql/src/pl_comp.c index 17ec0bd7b4..6c1a5b8fc4 100644 --- a/contrib/babelfishpg_tsql/src/pl_comp.c +++ b/contrib/babelfishpg_tsql/src/pl_comp.c @@ -1580,9 +1580,11 @@ pltsql_post_expand_star(ParseState *pstate, ColumnRef *cref, List *l) } PG_CATCH(); { + HOLD_INTERRUPTS(); elog(LOG, "Cache lookup failed in pltsql_post_expand_star for attribute %d of relation %u", attnum, relid); attopts = (Datum) 0; + RESUME_INTERRUPTS(); } PG_END_TRY(); if (!attopts) diff --git a/contrib/babelfishpg_tsql/src/pl_handler.c b/contrib/babelfishpg_tsql/src/pl_handler.c index 81dcc4ba7b..8fa18e519e 100644 --- a/contrib/babelfishpg_tsql/src/pl_handler.c +++ b/contrib/babelfishpg_tsql/src/pl_handler.c @@ -4133,6 +4133,8 @@ terminate_batch(bool send_error, bool compile_error) bool error_mapping_failed = false; int rc; + HOLD_INTERRUPTS(); + elog(DEBUG2, "TSQL TXN finish current batch, error : %d compilation error : %d", send_error, compile_error); /* @@ -4204,9 +4206,12 @@ terminate_batch(bool send_error, bool compile_error) AbortCurTransaction = false; if (!send_error) + { + RESUME_INTERRUPTS(); ereport(ERROR, (errcode(ERRCODE_TRANSACTION_ROLLBACK), errmsg("Uncommittable transaction is detected at the end of the batch. The transaction is rolled back."))); + } } else if (send_error && !IsTransactionBlockActive()) { @@ -4221,6 +4226,8 @@ terminate_batch(bool send_error, bool compile_error) MemoryContextSwitchTo(oldcontext); } } + + RESUME_INTERRUPTS(); if (send_error) { PG_RE_THROW(); diff --git a/contrib/babelfishpg_tsql/src/pltsql_bulkcopy.c b/contrib/babelfishpg_tsql/src/pltsql_bulkcopy.c index 231c3bcc61..84bbe0a76e 100644 --- a/contrib/babelfishpg_tsql/src/pltsql_bulkcopy.c +++ b/contrib/babelfishpg_tsql/src/pltsql_bulkcopy.c @@ -114,11 +114,13 @@ BulkCopy(BulkCopyStmt *stmt, uint64 *processed) } PG_CATCH(); { + HOLD_INTERRUPTS(); /* For exact row which caused error, we have BulkCopyErrorCallback. */ elog(WARNING, "Error while executing Bulk Copy. Error occured while processing at " "implicit Batch number: %d, Rows inserted in total: %" PRIu64, stmt->cur_batch_num, stmt->rows_processed); if (rel != NULL) table_close(rel, NoLock); + RESUME_INTERRUPTS(); PG_RE_THROW(); } PG_END_TRY(); diff --git a/contrib/babelfishpg_tsql/src/procedures.c b/contrib/babelfishpg_tsql/src/procedures.c index 67a558330f..6c93e8e3e5 100644 --- a/contrib/babelfishpg_tsql/src/procedures.c +++ b/contrib/babelfishpg_tsql/src/procedures.c @@ -548,13 +548,18 @@ sp_describe_first_result_set_internal(PG_FUNCTION_ARGS) PG_CATCH(); { query = psprintf("DROP VIEW %s", sp_describe_first_result_set_view_name); + HOLD_INTERRUPTS(); if ((rc = SPI_execute(query, false, 1)) < 0) + { + RESUME_INTERRUPTS(); elog(ERROR, "SPI_execute failed: %s", SPI_result_code_string(rc)); + } pfree(query); pfree(sp_describe_first_result_set_view_name); SPI_finish(); + RESUME_INTERRUPTS(); PG_RE_THROW(); } PG_END_TRY();