Skip to content

Commit

Permalink
Fixed error: “lost connection to parallel worker” when running parall…
Browse files Browse the repository at this point in the history
…el query
  • Loading branch information
Dipesh Dhameliya committed Oct 5, 2023
1 parent 30520ee commit 92a3ae6
Show file tree
Hide file tree
Showing 12 changed files with 53 additions and 0 deletions.
7 changes: 7 additions & 0 deletions .github/scripts/create_extension.sql
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,12 @@ GRANT ALL ON SCHEMA sys to :user;
ALTER USER :user CREATEDB;
ALTER SYSTEM SET babelfishpg_tsql.database_name = :db;
ALTER SYSTEM SET babelfishpg_tsql.migration_mode = :'migration_mode';
ALTER SYSTEM SET parallel_setup_cost = 0;
ALTER SYSTEM SET parallel_tuple_cost = 0;
ALTER SYSTEM SET min_parallel_index_scan_size = 0;
ALTER SYSTEM SET min_parallel_table_scan_size = 0;
ALTER SYSTEM SET force_parallel_mode = 1;
ALTER SYSTEM SET max_parallel_workers_per_gather = 4;
ALTER SYSTEM SET log_min_messages = debug3;
SELECT pg_reload_conf();
CALL SYS.INITIALIZE_BABELFISH(:'user');
10 changes: 10 additions & 0 deletions contrib/babelfishpg_tds/src/backend/tds/err_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -195,8 +195,12 @@ get_tsql_error_details(ErrorData *edata,

/* Possible infinite loop of errors. Do not touch it further. */
if (!error_stack_full())
{
HOLD_INTERRUPTS();
elog(LOG, "Unmapped error found. Code: %d, Message: %s, File: %s, Line: %d, Context: %s",
edata->sqlerrcode, edata->message, edata->filename, edata->lineno, error_context);
RESUME_INTERRUPTS();
}

return false;
}
Expand Down Expand Up @@ -267,8 +271,12 @@ get_tsql_error_details(ErrorData *edata,

/* Possible infinite loop of errors. Do not touch it further. */
if (!error_stack_full())
{
HOLD_INTERRUPTS();
elog(LOG, "Unmapped error found. Code: %d, Message: %s, File: %s, Line: %d, Context: %s",
edata->sqlerrcode, edata->message, edata->filename, edata->lineno, error_context);
RESUME_INTERRUPTS();
}

*tsql_error_code = ERRCODE_PLTSQL_ERROR_NOT_MAPPED;
*tsql_error_severity = 16;
Expand Down Expand Up @@ -356,9 +364,11 @@ emit_tds_log(ErrorData *edata)
/* Log the internal error message */
ErrorData *next_edata;

HOLD_INTERRUPTS();
next_edata = CopyErrorData();
elog(LOG, "internal error occurred: %s", next_edata->message);
FreeErrorData(next_edata);
RESUME_INTERRUPTS();
}
PG_END_TRY();

Expand Down
2 changes: 2 additions & 0 deletions contrib/babelfishpg_tds/src/backend/tds/tdsbulkload.c
Original file line number Diff line number Diff line change
Expand Up @@ -990,6 +990,7 @@ ProcessBCPRequest(TDSRequest request)
int ret;

HOLD_CANCEL_INTERRUPTS();
HOLD_INTERRUPTS();

/*
* Discard remaining TDS_BULK_LOAD packets only if End of
Expand All @@ -1014,6 +1015,7 @@ ProcessBCPRequest(TDSRequest request)
req->rowCount, req->colCount),
errhidestmt(true)));

RESUME_INTERRUPTS();
PG_RE_THROW();
}
PG_END_TRY();
Expand Down
8 changes: 8 additions & 0 deletions contrib/babelfishpg_tds/src/backend/tds/tdsrpc.c
Original file line number Diff line number Diff line change
Expand Up @@ -550,6 +550,7 @@ SPExecuteSQL(TDSRequestSP req)
}
PG_CATCH();
{
HOLD_INTERRUPTS();
if (TDS_DEBUG_ENABLED(TDS_DEBUG2))
ereport(LOG,
(errmsg("sp_executesql statement: %s", s.data),
Expand All @@ -558,6 +559,7 @@ SPExecuteSQL(TDSRequestSP req)

TDSStatementExceptionCallback(NULL, NULL, false);
PG_RE_THROW();
RESUME_INTERRUPTS();
}
PG_END_TRY();

Expand Down Expand Up @@ -749,6 +751,7 @@ SPExecute(TDSRequestSP req)
}
PG_CATCH();
{
HOLD_INTERRUPTS();
if (TDS_DEBUG_ENABLED(TDS_DEBUG2))
ereport(LOG,
(errmsg("sp_execute handle: %d", req->handle),
Expand All @@ -758,6 +761,7 @@ SPExecute(TDSRequestSP req)
TDSStatementExceptionCallback(NULL, NULL, false);
tvp_lookup_list = NIL;
PG_RE_THROW();
RESUME_INTERRUPTS();
}
PG_END_TRY();

Expand Down Expand Up @@ -871,6 +875,7 @@ SPPrepExec(TDSRequestSP req)
}
PG_CATCH();
{
HOLD_INTERRUPTS();
if (TDS_DEBUG_ENABLED(TDS_DEBUG2))
ereport(LOG,
(errmsg("sp_prepexec handle: %d, "
Expand All @@ -881,6 +886,7 @@ SPPrepExec(TDSRequestSP req)
TDSStatementExceptionCallback(NULL, NULL, false);
tvp_lookup_list = NIL;
PG_RE_THROW();
RESUME_INTERRUPTS();
}
PG_END_TRY();

Expand Down Expand Up @@ -1099,6 +1105,7 @@ SPCustomType(TDSRequestSP req)
}
PG_CATCH();
{
HOLD_INTERRUPTS();
if (TDS_DEBUG_ENABLED(TDS_DEBUG2))
ereport(LOG,
(errmsg("stored procedure: %s", req->name.data),
Expand All @@ -1108,6 +1115,7 @@ SPCustomType(TDSRequestSP req)
tvp_lookup_list = NIL;

PG_RE_THROW();
RESUME_INTERRUPTS();
}
PG_END_TRY();

Expand Down
4 changes: 4 additions & 0 deletions contrib/babelfishpg_tds/src/backend/tds/tdssqlbatch.c
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,13 @@ ExecuteSQLBatch(char *query)
PG_CATCH();
{
if (TDS_DEBUG_ENABLED(TDS_DEBUG2))
{
HOLD_INTERRUPTS();
ereport(LOG,
(errmsg("sql_batch statement: %s", query),
errhidestmt(true)));
RESUME_INTERRUPTS();
}

PG_RE_THROW();
}
Expand Down
5 changes: 5 additions & 0 deletions contrib/babelfishpg_tsql/src/iterative_exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -1141,6 +1141,7 @@ handle_error(PLtsql_execstate *estate,
{
elog(DEBUG1, "TSQL TXN Stop execution error mapping failed : %d current batch status : %d read only function : %d", last_error_mapping_failed, *terminate_batch, ro_func);
FreeErrorData(edata);
RESUME_INTERRUPTS();
PG_RE_THROW();
}

Expand All @@ -1149,6 +1150,7 @@ handle_error(PLtsql_execstate *estate,
FlushErrorState();

FreeErrorData(edata);
RESUME_INTERRUPTS();
}

/*
Expand Down Expand Up @@ -1253,6 +1255,7 @@ dispatch_stmt_handle_error(PLtsql_execstate *estate,
int last_error;
bool error_mapped;

HOLD_INTERRUPTS();
support_tsql_trans = pltsql_support_tsql_transactions();

/* Close trigger nesting in engine */
Expand Down Expand Up @@ -1298,6 +1301,7 @@ dispatch_stmt_handle_error(PLtsql_execstate *estate,
estate->simple_eval_estate = NULL;
if (simple_econtext_stack == NULL || topEntry != simple_econtext_stack)
pltsql_create_econtext(estate);
RESUME_INTERRUPTS();
PG_RE_THROW();
}

Expand All @@ -1308,6 +1312,7 @@ dispatch_stmt_handle_error(PLtsql_execstate *estate,
{
/* Cleanup SPI connections if they exist. */
AtEOXact_SPI(false);
RESUME_INTERRUPTS();
PG_RE_THROW();
}

Expand Down
2 changes: 2 additions & 0 deletions contrib/babelfishpg_tsql/src/linked_servers.c
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,9 @@ linked_server_establish_connection(char *servername, LinkedServerProcess * lspro
}
PG_CATCH();
{
HOLD_INTERRUPTS();
LINKED_SERVER_DEBUG("LINKED SERVER: Failed to establish connection to remote server due to error");
RESUME_INTERRUPTS();

PG_RE_THROW();
}
Expand Down
2 changes: 2 additions & 0 deletions contrib/babelfishpg_tsql/src/pl_comp.c
Original file line number Diff line number Diff line change
Expand Up @@ -1583,9 +1583,11 @@ pltsql_post_expand_star(ParseState *pstate, ColumnRef *cref, List *l)
}
PG_CATCH();
{
HOLD_INTERRUPTS();
elog(LOG, "Cache lookup failed in pltsql_post_expand_star for attribute %d of relation %u",
attnum, relid);
attopts = (Datum) 0;
RESUME_INTERRUPTS();
}
PG_END_TRY();
if (!attopts)
Expand Down
7 changes: 7 additions & 0 deletions contrib/babelfishpg_tsql/src/pl_handler.c
Original file line number Diff line number Diff line change
Expand Up @@ -4053,6 +4053,8 @@ terminate_batch(bool send_error, bool compile_error)
bool error_mapping_failed = false;
int rc;

HOLD_INTERRUPTS();

elog(DEBUG2, "TSQL TXN finish current batch, error : %d compilation error : %d", send_error, compile_error);

/*
Expand Down Expand Up @@ -4124,9 +4126,12 @@ terminate_batch(bool send_error, bool compile_error)
AbortCurTransaction = false;

if (!send_error)
{
RESUME_INTERRUPTS();
ereport(ERROR,
(errcode(ERRCODE_TRANSACTION_ROLLBACK),
errmsg("Uncommittable transaction is detected at the end of the batch. The transaction is rolled back.")));
}
}
else if (send_error && !IsTransactionBlockActive())
{
Expand All @@ -4141,6 +4146,8 @@ terminate_batch(bool send_error, bool compile_error)
MemoryContextSwitchTo(oldcontext);
}
}

RESUME_INTERRUPTS();
if (send_error)
{
PG_RE_THROW();
Expand Down
2 changes: 2 additions & 0 deletions contrib/babelfishpg_tsql/src/pltsql_bulkcopy.c
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,13 @@ BulkCopy(BulkCopyStmt *stmt, uint64 *processed)
}
PG_CATCH();
{
HOLD_INTERRUPTS();
/* For exact row which caused error, we have BulkCopyErrorCallback. */
elog(WARNING, "Error while executing Bulk Copy. Error occured while processing at "
"implicit Batch number: %d, Rows inserted in total: %ld", stmt->cur_batch_num, stmt->rows_processed);
if (rel != NULL)
table_close(rel, NoLock);
RESUME_INTERRUPTS();
PG_RE_THROW();
}
PG_END_TRY();
Expand Down
2 changes: 2 additions & 0 deletions contrib/babelfishpg_tsql/src/procedures.c
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,7 @@ sp_describe_first_result_set_internal(PG_FUNCTION_ARGS)
}
PG_CATCH();
{
HOLD_INTERRUPTS();
query = psprintf("DROP VIEW %s", sp_describe_first_result_set_view_name);

if ((rc = SPI_execute(query, false, 1)) < 0)
Expand All @@ -516,6 +517,7 @@ sp_describe_first_result_set_internal(PG_FUNCTION_ARGS)
pfree(query);
pfree(sp_describe_first_result_set_view_name);
SPI_finish();
RESUME_INTERRUPTS();
PG_RE_THROW();
}
PG_END_TRY();
Expand Down
2 changes: 2 additions & 0 deletions contrib/babelfishpg_tsql/src/rolecmds.c
Original file line number Diff line number Diff line change
Expand Up @@ -2150,6 +2150,7 @@ babelfish_add_domain_mapping_entry_internal(PG_FUNCTION_ARGS)
MemoryContext ectx;
ErrorData *edata;

HOLD_INTERRUPTS();
ectx = MemoryContextSwitchTo(ccxt);
table_close(bbf_domain_mapping_rel, RowExclusiveLock);
heap_freetuple(tuple);
Expand All @@ -2159,6 +2160,7 @@ babelfish_add_domain_mapping_entry_internal(PG_FUNCTION_ARGS)
FlushErrorState();
MemoryContextSwitchTo(ectx);

RESUME_INTERRUPTS();
ereport(ERROR,
(errcode(edata->sqlerrcode),
errmsg("Domain mapping entry could not be added due to following reason: %s",
Expand Down

0 comments on commit 92a3ae6

Please sign in to comment.