From 6f64310ed34f47d105f21a9675c8e3f4c5fbcf2f Mon Sep 17 00:00:00 2001 From: Dipesh Dhameliya Date: Wed, 25 Oct 2023 20:44:17 +0530 Subject: [PATCH] =?UTF-8?q?Fixed=20error:=20=E2=80=9Clost=20connection=20t?= =?UTF-8?q?o=20parallel=20worker=E2=80=9D=20when=20running=20parallel=20qu?= =?UTF-8?q?ery=20(#1883)=20(#1953)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit As per coding convention of Postgresql, One should not use elog/ereport with any level of log when error report cycle is in progress. Use of such elog may run into various error particularly when error being processed is reported from parallel worker. This may even run into crashes during cleanup. In Babelfish, we are using such elog to report log/debug without holding interrupt inside error report cycle which is leading to error like “lost connection to parallel worker”. This commit aim to fix such issue by appropriately holding/resuming interrupts during error report cycle. Task: BABEL-4393 Signed-off-by: Dipesh Dhameliya --- .../src/backend/tds/err_handler.c | 12 ++++++++++++ .../src/backend/tds/tdsbulkload.c | 2 ++ .../babelfishpg_tds/src/backend/tds/tdsrpc.c | 8 ++++++++ .../src/backend/tds/tdssqlbatch.c | 4 ++++ contrib/babelfishpg_tsql/src/iterative_exec.c | 19 ++++++++++++++++++- contrib/babelfishpg_tsql/src/pl_comp.c | 2 ++ contrib/babelfishpg_tsql/src/pl_handler.c | 7 +++++++ .../babelfishpg_tsql/src/pltsql_bulkcopy.c | 2 ++ contrib/babelfishpg_tsql/src/procedures.c | 5 +++++ 9 files changed, 60 insertions(+), 1 deletion(-) diff --git a/contrib/babelfishpg_tds/src/backend/tds/err_handler.c b/contrib/babelfishpg_tds/src/backend/tds/err_handler.c index ab689023a2..29165031c7 100644 --- a/contrib/babelfishpg_tds/src/backend/tds/err_handler.c +++ b/contrib/babelfishpg_tds/src/backend/tds/err_handler.c @@ -195,8 +195,12 @@ get_tsql_error_details(ErrorData *edata, /* Possible infinite loop of errors. Do not touch it further. */ if (!error_stack_full()) + { + HOLD_INTERRUPTS(); elog(LOG, "Unmapped error found. Code: %d, Message: %s, File: %s, Line: %d, Context: %s", edata->sqlerrcode, edata->message, edata->filename, edata->lineno, error_context); + RESUME_INTERRUPTS(); + } return false; } @@ -267,8 +271,12 @@ get_tsql_error_details(ErrorData *edata, /* Possible infinite loop of errors. Do not touch it further. */ if (!error_stack_full()) + { + HOLD_INTERRUPTS(); elog(LOG, "Unmapped error found. Code: %d, Message: %s, File: %s, Line: %d, Context: %s", edata->sqlerrcode, edata->message, edata->filename, edata->lineno, error_context); + RESUME_INTERRUPTS(); + } *tsql_error_code = ERRCODE_PLTSQL_ERROR_NOT_MAPPED; *tsql_error_severity = 16; @@ -301,7 +309,9 @@ emit_tds_log(ErrorData *edata) if (edata->elevel < ERROR) { + HOLD_INTERRUPTS(); elog(DEBUG5, "suppressing informational client message < ERROR"); + RESUME_INTERRUPTS(); /* reset the flag */ tds_disable_error_log_hook = false; @@ -356,9 +366,11 @@ emit_tds_log(ErrorData *edata) /* Log the internal error message */ ErrorData *next_edata; + HOLD_INTERRUPTS(); next_edata = CopyErrorData(); elog(LOG, "internal error occurred: %s", next_edata->message); FreeErrorData(next_edata); + RESUME_INTERRUPTS(); } PG_END_TRY(); diff --git a/contrib/babelfishpg_tds/src/backend/tds/tdsbulkload.c b/contrib/babelfishpg_tds/src/backend/tds/tdsbulkload.c index a51ea08755..194ed67af2 100644 --- a/contrib/babelfishpg_tds/src/backend/tds/tdsbulkload.c +++ b/contrib/babelfishpg_tds/src/backend/tds/tdsbulkload.c @@ -990,6 +990,7 @@ ProcessBCPRequest(TDSRequest request) int ret; HOLD_CANCEL_INTERRUPTS(); + HOLD_INTERRUPTS(); /* * Discard remaining TDS_BULK_LOAD packets only if End of @@ -1014,6 +1015,7 @@ ProcessBCPRequest(TDSRequest request) req->rowCount, req->colCount), errhidestmt(true))); + RESUME_INTERRUPTS(); PG_RE_THROW(); } PG_END_TRY(); diff --git a/contrib/babelfishpg_tds/src/backend/tds/tdsrpc.c b/contrib/babelfishpg_tds/src/backend/tds/tdsrpc.c index 123714d08f..2d1040e767 100644 --- a/contrib/babelfishpg_tds/src/backend/tds/tdsrpc.c +++ b/contrib/babelfishpg_tds/src/backend/tds/tdsrpc.c @@ -550,6 +550,7 @@ SPExecuteSQL(TDSRequestSP req) } PG_CATCH(); { + HOLD_INTERRUPTS(); if (TDS_DEBUG_ENABLED(TDS_DEBUG2)) ereport(LOG, (errmsg("sp_executesql statement: %s", s.data), @@ -557,6 +558,7 @@ SPExecuteSQL(TDSRequestSP req) errdetail_params(req->nTotalParams))); TDSStatementExceptionCallback(NULL, NULL, false); + RESUME_INTERRUPTS(); PG_RE_THROW(); } PG_END_TRY(); @@ -749,6 +751,7 @@ SPExecute(TDSRequestSP req) } PG_CATCH(); { + HOLD_INTERRUPTS(); if (TDS_DEBUG_ENABLED(TDS_DEBUG2)) ereport(LOG, (errmsg("sp_execute handle: %d", req->handle), @@ -757,6 +760,7 @@ SPExecute(TDSRequestSP req) TDSStatementExceptionCallback(NULL, NULL, false); tvp_lookup_list = NIL; + RESUME_INTERRUPTS(); PG_RE_THROW(); } PG_END_TRY(); @@ -871,6 +875,7 @@ SPPrepExec(TDSRequestSP req) } PG_CATCH(); { + HOLD_INTERRUPTS(); if (TDS_DEBUG_ENABLED(TDS_DEBUG2)) ereport(LOG, (errmsg("sp_prepexec handle: %d, " @@ -880,6 +885,7 @@ SPPrepExec(TDSRequestSP req) TDSStatementExceptionCallback(NULL, NULL, false); tvp_lookup_list = NIL; + RESUME_INTERRUPTS(); PG_RE_THROW(); } PG_END_TRY(); @@ -1099,6 +1105,7 @@ SPCustomType(TDSRequestSP req) } PG_CATCH(); { + HOLD_INTERRUPTS(); if (TDS_DEBUG_ENABLED(TDS_DEBUG2)) ereport(LOG, (errmsg("stored procedure: %s", req->name.data), @@ -1107,6 +1114,7 @@ SPCustomType(TDSRequestSP req) tvp_lookup_list = NIL; + RESUME_INTERRUPTS(); PG_RE_THROW(); } PG_END_TRY(); diff --git a/contrib/babelfishpg_tds/src/backend/tds/tdssqlbatch.c b/contrib/babelfishpg_tds/src/backend/tds/tdssqlbatch.c index db10653445..8acf94c751 100644 --- a/contrib/babelfishpg_tds/src/backend/tds/tdssqlbatch.c +++ b/contrib/babelfishpg_tds/src/backend/tds/tdssqlbatch.c @@ -94,9 +94,13 @@ ExecuteSQLBatch(char *query) PG_CATCH(); { if (TDS_DEBUG_ENABLED(TDS_DEBUG2)) + { + HOLD_INTERRUPTS(); ereport(LOG, (errmsg("sql_batch statement: %s", query), errhidestmt(true))); + RESUME_INTERRUPTS(); + } PG_RE_THROW(); } diff --git a/contrib/babelfishpg_tsql/src/iterative_exec.c b/contrib/babelfishpg_tsql/src/iterative_exec.c index e069c0d49c..fd07828232 100644 --- a/contrib/babelfishpg_tsql/src/iterative_exec.c +++ b/contrib/babelfishpg_tsql/src/iterative_exec.c @@ -1111,7 +1111,9 @@ handle_error(PLtsql_execstate *estate, /* Mark transaction for termination */ if (IsTransactionBlockActive() && (last_error_mapping_failed || abort_transaction(estate, edata, override_flag))) { + HOLD_INTERRUPTS(); elog(DEBUG1, "TSQL TXN Mark transaction for rollback error mapping failed : %d", last_error_mapping_failed); + RESUME_INTERRUPTS(); AbortCurTransaction = true; } @@ -1124,7 +1126,9 @@ handle_error(PLtsql_execstate *estate, /* In case of errors which terminate execution, let outer layer handle it */ if (last_error_mapping_failed || abort_execution(estate, edata, terminate_batch, override_flag) || ro_func) { + HOLD_INTERRUPTS(); elog(DEBUG1, "TSQL TXN Stop execution error mapping failed : %d current batch status : %d read only function : %d", last_error_mapping_failed, *terminate_batch, ro_func); + RESUME_INTERRUPTS(); FreeErrorData(edata); PG_RE_THROW(); } @@ -1255,15 +1259,18 @@ dispatch_stmt_handle_error(PLtsql_execstate *estate, { if (internal_sp_started) { + HOLD_INTERRUPTS(); elog(DEBUG1, "TSQL TXN PG semantics : Rollback internal savepoint"); RollbackAndReleaseCurrentSubTransaction(); MemoryContextSwitchTo(cur_ctxt); + RESUME_INTERRUPTS(); CurrentResourceOwner = oldowner; } else if (!IsTransactionBlockActive()) { if (is_part_of_pltsql_trycatch_block(estate)) { + HOLD_INTERRUPTS(); elog(DEBUG1, "TSQL TXN PG semantics : Rollback current transaction"); HoldPinnedPortals(); SPI_setCurrentInternalTxnMode(true); @@ -1271,11 +1278,14 @@ dispatch_stmt_handle_error(PLtsql_execstate *estate, StartTransactionCommand(); SPI_setCurrentInternalTxnMode(false); MemoryContextSwitchTo(cur_ctxt); + RESUME_INTERRUPTS(); } } else { + HOLD_INTERRUPTS(); elog(DEBUG1, "TSQL TXN PG semantics : Mark transaction for rollback"); + RESUME_INTERRUPTS(); AbortCurTransaction = true; } /* Recreate evaluation context in case needed */ @@ -1303,10 +1313,12 @@ dispatch_stmt_handle_error(PLtsql_execstate *estate, before_lxid == MyProc->lxid && before_subtxn_id == GetCurrentSubTransactionId()) { + HOLD_INTERRUPTS(); elog(DEBUG1, "TSQL TXN TSQL semantics : Rollback internal savepoint"); /* Rollback internal savepoint if it is current savepoint */ RollbackAndReleaseCurrentSubTransaction(); MemoryContextSwitchTo(cur_ctxt); + RESUME_INTERRUPTS(); CurrentResourceOwner = oldowner; } else if (!IsTransactionBlockActive()) @@ -1315,13 +1327,14 @@ dispatch_stmt_handle_error(PLtsql_execstate *estate, * In case of no transaction, rollback the whole transaction to * match auto commit behavior */ - + HOLD_INTERRUPTS(); elog(DEBUG1, "TSQL TXN TSQL semantics : Rollback current transaction"); /* Hold portals to make sure that cursors work */ HoldPinnedPortals(); AbortCurrentTransaction(); StartTransactionCommand(); MemoryContextSwitchTo(cur_ctxt); + RESUME_INTERRUPTS(); } else if (estate->tsql_trigger_flags & TSQL_TRAN_STARTED) { @@ -1329,11 +1342,13 @@ dispatch_stmt_handle_error(PLtsql_execstate *estate, * Trigger must run inside an explicit transaction In case of * error, rollback the transaction */ + HOLD_INTERRUPTS(); elog(DEBUG1, "TSQL TXN TSQL semantics : Rollback internal transaction"); HoldPinnedPortals(); pltsql_rollback_txn(); estate->tsql_trigger_flags &= ~TSQL_TRAN_STARTED; MemoryContextSwitchTo(cur_ctxt); + RESUME_INTERRUPTS(); } @@ -1348,9 +1363,11 @@ dispatch_stmt_handle_error(PLtsql_execstate *estate, if (pltsql_implicit_transactions && IsTransactionBlockActive() && (estate->impl_txn_type == PLTSQL_IMPL_TRAN_START)) { + HOLD_INTERRUPTS(); elog(DEBUG1, "TSQL TXN TSQL semantics : Rollback implicit transaction"); pltsql_rollback_txn(); MemoryContextSwitchTo(cur_ctxt); + RESUME_INTERRUPTS(); } estate->impl_txn_type = PLTSQL_IMPL_TRAN_OFF; diff --git a/contrib/babelfishpg_tsql/src/pl_comp.c b/contrib/babelfishpg_tsql/src/pl_comp.c index a4d4150056..af89f2f5a1 100644 --- a/contrib/babelfishpg_tsql/src/pl_comp.c +++ b/contrib/babelfishpg_tsql/src/pl_comp.c @@ -1589,9 +1589,11 @@ pltsql_post_expand_star(ParseState *pstate, ColumnRef *cref, List *l) } PG_CATCH(); { + HOLD_INTERRUPTS(); elog(LOG, "Cache lookup failed in pltsql_post_expand_star for attribute %d of relation %u", attnum, relid); attopts = (Datum) 0; + RESUME_INTERRUPTS(); } PG_END_TRY(); if (!attopts) diff --git a/contrib/babelfishpg_tsql/src/pl_handler.c b/contrib/babelfishpg_tsql/src/pl_handler.c index 43a92d3c69..72aaaee1e2 100644 --- a/contrib/babelfishpg_tsql/src/pl_handler.c +++ b/contrib/babelfishpg_tsql/src/pl_handler.c @@ -3726,6 +3726,8 @@ terminate_batch(bool send_error, bool compile_error) bool error_mapping_failed = false; int rc; + HOLD_INTERRUPTS(); + elog(DEBUG2, "TSQL TXN finish current batch, error : %d compilation error : %d", send_error, compile_error); /* @@ -3797,9 +3799,12 @@ terminate_batch(bool send_error, bool compile_error) AbortCurTransaction = false; if (!send_error) + { + RESUME_INTERRUPTS(); ereport(ERROR, (errcode(ERRCODE_TRANSACTION_ROLLBACK), errmsg("Uncommittable transaction is detected at the end of the batch. The transaction is rolled back."))); + } } else if (send_error && !IsTransactionBlockActive()) { @@ -3814,6 +3819,8 @@ terminate_batch(bool send_error, bool compile_error) MemoryContextSwitchTo(oldcontext); } } + + RESUME_INTERRUPTS(); if (send_error) { PG_RE_THROW(); diff --git a/contrib/babelfishpg_tsql/src/pltsql_bulkcopy.c b/contrib/babelfishpg_tsql/src/pltsql_bulkcopy.c index 163be672f0..9384ba5adc 100644 --- a/contrib/babelfishpg_tsql/src/pltsql_bulkcopy.c +++ b/contrib/babelfishpg_tsql/src/pltsql_bulkcopy.c @@ -110,11 +110,13 @@ BulkCopy(BulkCopyStmt *stmt, uint64 *processed) } PG_CATCH(); { + HOLD_INTERRUPTS(); /* For exact row which caused error, we have BulkCopyErrorCallback. */ elog(WARNING, "Error while executing Bulk Copy. Error occured while processing at " "implicit Batch number: %d, Rows inserted in total: %ld", stmt->cur_batch_num, stmt->rows_processed); if (rel != NULL) table_close(rel, NoLock); + RESUME_INTERRUPTS(); PG_RE_THROW(); } PG_END_TRY(); diff --git a/contrib/babelfishpg_tsql/src/procedures.c b/contrib/babelfishpg_tsql/src/procedures.c index b584b264d9..e8661158df 100644 --- a/contrib/babelfishpg_tsql/src/procedures.c +++ b/contrib/babelfishpg_tsql/src/procedures.c @@ -451,13 +451,18 @@ sp_describe_first_result_set_internal(PG_FUNCTION_ARGS) PG_CATCH(); { query = psprintf("DROP VIEW %s", sp_describe_first_result_set_view_name); + HOLD_INTERRUPTS(); if ((rc = SPI_execute(query, false, 1)) < 0) + { + RESUME_INTERRUPTS(); elog(ERROR, "SPI_execute failed: %s", SPI_result_code_string(rc)); + } pfree(query); pfree(sp_describe_first_result_set_view_name); SPI_finish(); + RESUME_INTERRUPTS(); PG_RE_THROW(); } PG_END_TRY();