Skip to content

Commit

Permalink
Fixed error: “lost connection to parallel worker” when running parall…
Browse files Browse the repository at this point in the history
…el query (#1883) (#1953)

As per coding convention of Postgresql, One should not use elog/ereport with any level of log when error report cycle is
in progress. Use of such elog may run into various error particularly when error being processed is reported from
parallel worker. This may even run into crashes during cleanup.

In Babelfish, we are using such elog to report log/debug without holding interrupt inside error report cycle which is
leading to error like “lost connection to parallel worker”. This commit aim to fix such issue by appropriately
holding/resuming interrupts during error report cycle.

Task: BABEL-4393
Signed-off-by: Dipesh Dhameliya <[email protected]>
  • Loading branch information
Deepesh125 authored Oct 25, 2023
1 parent 6011e40 commit 6f64310
Show file tree
Hide file tree
Showing 9 changed files with 60 additions and 1 deletion.
12 changes: 12 additions & 0 deletions contrib/babelfishpg_tds/src/backend/tds/err_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,12 @@ get_tsql_error_details(ErrorData *edata,

/* Possible infinite loop of errors. Do not touch it further. */
if (!error_stack_full())
{
HOLD_INTERRUPTS();
elog(LOG, "Unmapped error found. Code: %d, Message: %s, File: %s, Line: %d, Context: %s",
edata->sqlerrcode, edata->message, edata->filename, edata->lineno, error_context);
RESUME_INTERRUPTS();
}

return false;
}
Expand Down Expand Up @@ -267,8 +271,12 @@ get_tsql_error_details(ErrorData *edata,

/* Possible infinite loop of errors. Do not touch it further. */
if (!error_stack_full())
{
HOLD_INTERRUPTS();
elog(LOG, "Unmapped error found. Code: %d, Message: %s, File: %s, Line: %d, Context: %s",
edata->sqlerrcode, edata->message, edata->filename, edata->lineno, error_context);
RESUME_INTERRUPTS();
}

*tsql_error_code = ERRCODE_PLTSQL_ERROR_NOT_MAPPED;
*tsql_error_severity = 16;
Expand Down Expand Up @@ -301,7 +309,9 @@ emit_tds_log(ErrorData *edata)

if (edata->elevel < ERROR)
{
HOLD_INTERRUPTS();
elog(DEBUG5, "suppressing informational client message < ERROR");
RESUME_INTERRUPTS();

/* reset the flag */
tds_disable_error_log_hook = false;
Expand Down Expand Up @@ -356,9 +366,11 @@ emit_tds_log(ErrorData *edata)
/* Log the internal error message */
ErrorData *next_edata;

HOLD_INTERRUPTS();
next_edata = CopyErrorData();
elog(LOG, "internal error occurred: %s", next_edata->message);
FreeErrorData(next_edata);
RESUME_INTERRUPTS();
}
PG_END_TRY();

Expand Down
2 changes: 2 additions & 0 deletions contrib/babelfishpg_tds/src/backend/tds/tdsbulkload.c
Original file line number Diff line number Diff line change
Expand Up @@ -990,6 +990,7 @@ ProcessBCPRequest(TDSRequest request)
int ret;

HOLD_CANCEL_INTERRUPTS();
HOLD_INTERRUPTS();

/*
* Discard remaining TDS_BULK_LOAD packets only if End of
Expand All @@ -1014,6 +1015,7 @@ ProcessBCPRequest(TDSRequest request)
req->rowCount, req->colCount),
errhidestmt(true)));

RESUME_INTERRUPTS();
PG_RE_THROW();
}
PG_END_TRY();
Expand Down
8 changes: 8 additions & 0 deletions contrib/babelfishpg_tds/src/backend/tds/tdsrpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -550,13 +550,15 @@ SPExecuteSQL(TDSRequestSP req)
}
PG_CATCH();
{
HOLD_INTERRUPTS();
if (TDS_DEBUG_ENABLED(TDS_DEBUG2))
ereport(LOG,
(errmsg("sp_executesql statement: %s", s.data),
errhidestmt(true),
errdetail_params(req->nTotalParams)));

TDSStatementExceptionCallback(NULL, NULL, false);
RESUME_INTERRUPTS();
PG_RE_THROW();
}
PG_END_TRY();
Expand Down Expand Up @@ -749,6 +751,7 @@ SPExecute(TDSRequestSP req)
}
PG_CATCH();
{
HOLD_INTERRUPTS();
if (TDS_DEBUG_ENABLED(TDS_DEBUG2))
ereport(LOG,
(errmsg("sp_execute handle: %d", req->handle),
Expand All @@ -757,6 +760,7 @@ SPExecute(TDSRequestSP req)

TDSStatementExceptionCallback(NULL, NULL, false);
tvp_lookup_list = NIL;
RESUME_INTERRUPTS();
PG_RE_THROW();
}
PG_END_TRY();
Expand Down Expand Up @@ -871,6 +875,7 @@ SPPrepExec(TDSRequestSP req)
}
PG_CATCH();
{
HOLD_INTERRUPTS();
if (TDS_DEBUG_ENABLED(TDS_DEBUG2))
ereport(LOG,
(errmsg("sp_prepexec handle: %d, "
Expand All @@ -880,6 +885,7 @@ SPPrepExec(TDSRequestSP req)

TDSStatementExceptionCallback(NULL, NULL, false);
tvp_lookup_list = NIL;
RESUME_INTERRUPTS();
PG_RE_THROW();
}
PG_END_TRY();
Expand Down Expand Up @@ -1099,6 +1105,7 @@ SPCustomType(TDSRequestSP req)
}
PG_CATCH();
{
HOLD_INTERRUPTS();
if (TDS_DEBUG_ENABLED(TDS_DEBUG2))
ereport(LOG,
(errmsg("stored procedure: %s", req->name.data),
Expand All @@ -1107,6 +1114,7 @@ SPCustomType(TDSRequestSP req)

tvp_lookup_list = NIL;

RESUME_INTERRUPTS();
PG_RE_THROW();
}
PG_END_TRY();
Expand Down
4 changes: 4 additions & 0 deletions contrib/babelfishpg_tds/src/backend/tds/tdssqlbatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,13 @@ ExecuteSQLBatch(char *query)
PG_CATCH();
{
if (TDS_DEBUG_ENABLED(TDS_DEBUG2))
{
HOLD_INTERRUPTS();
ereport(LOG,
(errmsg("sql_batch statement: %s", query),
errhidestmt(true)));
RESUME_INTERRUPTS();
}

PG_RE_THROW();
}
Expand Down
19 changes: 18 additions & 1 deletion contrib/babelfishpg_tsql/src/iterative_exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -1111,7 +1111,9 @@ handle_error(PLtsql_execstate *estate,
/* Mark transaction for termination */
if (IsTransactionBlockActive() && (last_error_mapping_failed || abort_transaction(estate, edata, override_flag)))
{
HOLD_INTERRUPTS();
elog(DEBUG1, "TSQL TXN Mark transaction for rollback error mapping failed : %d", last_error_mapping_failed);
RESUME_INTERRUPTS();
AbortCurTransaction = true;
}

Expand All @@ -1124,7 +1126,9 @@ handle_error(PLtsql_execstate *estate,
/* In case of errors which terminate execution, let outer layer handle it */
if (last_error_mapping_failed || abort_execution(estate, edata, terminate_batch, override_flag) || ro_func)
{
HOLD_INTERRUPTS();
elog(DEBUG1, "TSQL TXN Stop execution error mapping failed : %d current batch status : %d read only function : %d", last_error_mapping_failed, *terminate_batch, ro_func);
RESUME_INTERRUPTS();
FreeErrorData(edata);
PG_RE_THROW();
}
Expand Down Expand Up @@ -1255,27 +1259,33 @@ dispatch_stmt_handle_error(PLtsql_execstate *estate,
{
if (internal_sp_started)
{
HOLD_INTERRUPTS();
elog(DEBUG1, "TSQL TXN PG semantics : Rollback internal savepoint");
RollbackAndReleaseCurrentSubTransaction();
MemoryContextSwitchTo(cur_ctxt);
RESUME_INTERRUPTS();
CurrentResourceOwner = oldowner;
}
else if (!IsTransactionBlockActive())
{
if (is_part_of_pltsql_trycatch_block(estate))
{
HOLD_INTERRUPTS();
elog(DEBUG1, "TSQL TXN PG semantics : Rollback current transaction");
HoldPinnedPortals();
SPI_setCurrentInternalTxnMode(true);
AbortCurrentTransaction();
StartTransactionCommand();
SPI_setCurrentInternalTxnMode(false);
MemoryContextSwitchTo(cur_ctxt);
RESUME_INTERRUPTS();
}
}
else
{
HOLD_INTERRUPTS();
elog(DEBUG1, "TSQL TXN PG semantics : Mark transaction for rollback");
RESUME_INTERRUPTS();
AbortCurTransaction = true;
}
/* Recreate evaluation context in case needed */
Expand Down Expand Up @@ -1303,10 +1313,12 @@ dispatch_stmt_handle_error(PLtsql_execstate *estate,
before_lxid == MyProc->lxid &&
before_subtxn_id == GetCurrentSubTransactionId())
{
HOLD_INTERRUPTS();
elog(DEBUG1, "TSQL TXN TSQL semantics : Rollback internal savepoint");
/* Rollback internal savepoint if it is current savepoint */
RollbackAndReleaseCurrentSubTransaction();
MemoryContextSwitchTo(cur_ctxt);
RESUME_INTERRUPTS();
CurrentResourceOwner = oldowner;
}
else if (!IsTransactionBlockActive())
Expand All @@ -1315,25 +1327,28 @@ dispatch_stmt_handle_error(PLtsql_execstate *estate,
* In case of no transaction, rollback the whole transaction to
* match auto commit behavior
*/

HOLD_INTERRUPTS();
elog(DEBUG1, "TSQL TXN TSQL semantics : Rollback current transaction");
/* Hold portals to make sure that cursors work */
HoldPinnedPortals();
AbortCurrentTransaction();
StartTransactionCommand();
MemoryContextSwitchTo(cur_ctxt);
RESUME_INTERRUPTS();
}
else if (estate->tsql_trigger_flags & TSQL_TRAN_STARTED)
{
/*
* Trigger must run inside an explicit transaction In case of
* error, rollback the transaction
*/
HOLD_INTERRUPTS();
elog(DEBUG1, "TSQL TXN TSQL semantics : Rollback internal transaction");
HoldPinnedPortals();
pltsql_rollback_txn();
estate->tsql_trigger_flags &= ~TSQL_TRAN_STARTED;
MemoryContextSwitchTo(cur_ctxt);
RESUME_INTERRUPTS();
}


Expand All @@ -1348,9 +1363,11 @@ dispatch_stmt_handle_error(PLtsql_execstate *estate,
if (pltsql_implicit_transactions &&
IsTransactionBlockActive() && (estate->impl_txn_type == PLTSQL_IMPL_TRAN_START))
{
HOLD_INTERRUPTS();
elog(DEBUG1, "TSQL TXN TSQL semantics : Rollback implicit transaction");
pltsql_rollback_txn();
MemoryContextSwitchTo(cur_ctxt);
RESUME_INTERRUPTS();
}

estate->impl_txn_type = PLTSQL_IMPL_TRAN_OFF;
Expand Down
2 changes: 2 additions & 0 deletions contrib/babelfishpg_tsql/src/pl_comp.c
Original file line number Diff line number Diff line change
Expand Up @@ -1589,9 +1589,11 @@ pltsql_post_expand_star(ParseState *pstate, ColumnRef *cref, List *l)
}
PG_CATCH();
{
HOLD_INTERRUPTS();
elog(LOG, "Cache lookup failed in pltsql_post_expand_star for attribute %d of relation %u",
attnum, relid);
attopts = (Datum) 0;
RESUME_INTERRUPTS();
}
PG_END_TRY();
if (!attopts)
Expand Down
7 changes: 7 additions & 0 deletions contrib/babelfishpg_tsql/src/pl_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -3726,6 +3726,8 @@ terminate_batch(bool send_error, bool compile_error)
bool error_mapping_failed = false;
int rc;

HOLD_INTERRUPTS();

elog(DEBUG2, "TSQL TXN finish current batch, error : %d compilation error : %d", send_error, compile_error);

/*
Expand Down Expand Up @@ -3797,9 +3799,12 @@ terminate_batch(bool send_error, bool compile_error)
AbortCurTransaction = false;

if (!send_error)
{
RESUME_INTERRUPTS();
ereport(ERROR,
(errcode(ERRCODE_TRANSACTION_ROLLBACK),
errmsg("Uncommittable transaction is detected at the end of the batch. The transaction is rolled back.")));
}
}
else if (send_error && !IsTransactionBlockActive())
{
Expand All @@ -3814,6 +3819,8 @@ terminate_batch(bool send_error, bool compile_error)
MemoryContextSwitchTo(oldcontext);
}
}

RESUME_INTERRUPTS();
if (send_error)
{
PG_RE_THROW();
Expand Down
2 changes: 2 additions & 0 deletions contrib/babelfishpg_tsql/src/pltsql_bulkcopy.c
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,13 @@ BulkCopy(BulkCopyStmt *stmt, uint64 *processed)
}
PG_CATCH();
{
HOLD_INTERRUPTS();
/* For exact row which caused error, we have BulkCopyErrorCallback. */
elog(WARNING, "Error while executing Bulk Copy. Error occured while processing at "
"implicit Batch number: %d, Rows inserted in total: %ld", stmt->cur_batch_num, stmt->rows_processed);
if (rel != NULL)
table_close(rel, NoLock);
RESUME_INTERRUPTS();
PG_RE_THROW();
}
PG_END_TRY();
Expand Down
5 changes: 5 additions & 0 deletions contrib/babelfishpg_tsql/src/procedures.c
Original file line number Diff line number Diff line change
Expand Up @@ -451,13 +451,18 @@ sp_describe_first_result_set_internal(PG_FUNCTION_ARGS)
PG_CATCH();
{
query = psprintf("DROP VIEW %s", sp_describe_first_result_set_view_name);
HOLD_INTERRUPTS();

if ((rc = SPI_execute(query, false, 1)) < 0)
{
RESUME_INTERRUPTS();
elog(ERROR, "SPI_execute failed: %s", SPI_result_code_string(rc));
}

pfree(query);
pfree(sp_describe_first_result_set_view_name);
SPI_finish();
RESUME_INTERRUPTS();
PG_RE_THROW();
}
PG_END_TRY();
Expand Down

0 comments on commit 6f64310

Please sign in to comment.