diff --git a/.github/workflows/dotnet-tests.yml b/.github/workflows/dotnet-tests.yml index 5b06a9eae1..f4c3c6c4dc 100644 --- a/.github/workflows/dotnet-tests.yml +++ b/.github/workflows/dotnet-tests.yml @@ -62,6 +62,7 @@ jobs: run: | cd test/dotnet dotnet build + VSTEST_DISABLE_STANDARD_OUTPUT_CAPTURING=1 \ babel_URL=localhost \ babel_port=1433 \ babel_databaseName=master \ diff --git a/contrib/babelfishpg_tds/src/backend/tds/tdsbulkload.c b/contrib/babelfishpg_tds/src/backend/tds/tdsbulkload.c index 400b78b226..d95dfe630d 100644 --- a/contrib/babelfishpg_tds/src/backend/tds/tdsbulkload.c +++ b/contrib/babelfishpg_tds/src/backend/tds/tdsbulkload.c @@ -17,6 +17,7 @@ #include "postgres.h" +#include "access/xact.h" #include "utils/guc.h" #include "lib/stringinfo.h" #include "pgstat.h" @@ -35,7 +36,7 @@ static void FetchMoreBcpData(StringInfo *message, int dataLenToRead, bool freeMe static void FetchMoreBcpPlpData(StringInfo *message, int dataLenToRead); static int ReadBcpPlp(ParameterToken temp, StringInfo *message, TDSRequestBulkLoad request); static void FreePlpToken(ParameterToken token); -uint64_t offset = 0; +uint64_t volatile bcpOffset = 0; #define COLUMNMETADATA_HEADER_LEN sizeof(uint32_t) + sizeof(uint16) + 1 #define FIXED_LEN_TYPE_COLUMNMETADATA_LEN 1 @@ -79,7 +80,7 @@ do \ #define CheckMessageHasEnoughBytesToReadColMetadata(message, dataLen) \ do \ { \ - if ((*message)->len - offset < dataLen) \ + if ((*message)->len - bcpOffset < dataLen) \ FetchMoreBcpData(message, dataLen, false); \ } while(0) @@ -90,7 +91,7 @@ do \ #define CheckMessageHasEnoughBytesToReadRows(message, dataLen) \ do \ { \ - if ((*message)->len - offset < dataLen) \ + if ((*message)->len - bcpOffset < dataLen) \ FetchMoreBcpData(message, dataLen, true); \ } while(0) @@ -98,10 +99,16 @@ do \ #define CheckPlpMessageHasEnoughBytesToRead(message, dataLen) \ do \ { \ - if ((*message)->len - offset < dataLen) \ + if ((*message)->len - bcpOffset < dataLen) \ FetchMoreBcpPlpData(message, dataLen); \ } while(0) +void +TdsResetBcpOffset() +{ + bcpOffset = 0; +} + static void FetchMoreBcpData(StringInfo *message, int dataLenToRead, bool freeMessageData) { @@ -133,12 +140,12 @@ FetchMoreBcpData(StringInfo *message, int dataLenToRead, bool freeMessageData) if (freeMessageData) { temp = makeStringInfo(); - appendBinaryStringInfo(temp, (*message)->data + offset, (*message)->len - offset); + appendBinaryStringInfo(temp, (*message)->data + bcpOffset, (*message)->len - bcpOffset); if ((*message)->data) pfree((*message)->data); pfree((*message)); - offset = 0; + bcpOffset = 0; } else temp = *message; @@ -146,7 +153,7 @@ FetchMoreBcpData(StringInfo *message, int dataLenToRead, bool freeMessageData) /* * Keep fetching for additional packets until we have enough data to read. */ - while (dataLenToRead + offset > temp->len) + while (dataLenToRead + bcpOffset > temp->len) { /* * We should hold the interrupts until we read the next request frame. @@ -173,7 +180,7 @@ FetchMoreBcpData(StringInfo *message, int dataLenToRead, bool freeMessageData) /* * Incase of PLP data we should not discard the previous packet since we - * first store the offset of the PLP Chunks first and then read the data later. + * first store the bcpOffset of the PLP Chunks first and then read the data later. */ static void FetchMoreBcpPlpData(StringInfo *message, int dataLenToRead) @@ -198,7 +205,7 @@ FetchMoreBcpPlpData(StringInfo *message, int dataLenToRead) /* * Keep fetching for additional packets until we have enough data to read. */ - while (dataLenToRead + offset > (*message)->len) + while (dataLenToRead + bcpOffset > (*message)->len) { /* * We should hold the interrupts until we read the next request frame. @@ -240,33 +247,34 @@ GetBulkLoadRequest(StringInfo message) request->rowData = NIL; request->reqType = TDS_REQUEST_BULK_LOAD; - if (unlikely((uint8_t) message->data[offset] != TDS_TOKEN_COLMETADATA)) + TdsResetBcpOffset(); + if (unlikely((uint8_t) message->data[bcpOffset] != TDS_TOKEN_COLMETADATA)) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("The incoming tabular data stream (TDS) Bulk Load Request (BulkLoadBCP) protocol stream is incorrect. " "unexpected token encountered processing the request."))); - offset++; + bcpOffset++; - memcpy(&colCount, &message->data[offset], sizeof(uint16)); + memcpy(&colCount, &message->data[bcpOffset], sizeof(uint16)); colmetadata = palloc0(colCount * sizeof(BulkLoadColMetaData)); request->colCount = colCount; request->colMetaData = colmetadata; - offset += sizeof(uint16); + bcpOffset += sizeof(uint16); for (int currentColumn = 0; currentColumn < colCount; currentColumn++) { CheckMessageHasEnoughBytesToReadColMetadata(&message, COLUMNMETADATA_HEADER_LEN); /* UserType */ - memcpy(&colmetadata[currentColumn].userType, &message->data[offset], sizeof(uint32_t)); - offset += sizeof(uint32_t); + memcpy(&colmetadata[currentColumn].userType, &message->data[bcpOffset], sizeof(uint32_t)); + bcpOffset += sizeof(uint32_t); /* Flags */ - memcpy(&colmetadata[currentColumn].flags, &message->data[offset], sizeof(uint16)); - offset += sizeof(uint16); + memcpy(&colmetadata[currentColumn].flags, &message->data[bcpOffset], sizeof(uint16)); + bcpOffset += sizeof(uint16); /* TYPE_INFO */ - colmetadata[currentColumn].columnTdsType = message->data[offset++]; + colmetadata[currentColumn].columnTdsType = message->data[bcpOffset++]; /* Datatype specific Column Metadata. */ switch (colmetadata[currentColumn].columnTdsType) @@ -278,14 +286,14 @@ GetBulkLoadRequest(StringInfo message) case TDS_TYPE_DATETIMEN: case TDS_TYPE_UNIQUEIDENTIFIER: CheckMessageHasEnoughBytesToReadColMetadata(&message, FIXED_LEN_TYPE_COLUMNMETADATA_LEN); - colmetadata[currentColumn].maxLen = message->data[offset++]; + colmetadata[currentColumn].maxLen = message->data[bcpOffset++]; break; case TDS_TYPE_DECIMALN: case TDS_TYPE_NUMERICN: CheckMessageHasEnoughBytesToReadColMetadata(&message, NUMERIC_COLUMNMETADATA_LEN); - colmetadata[currentColumn].maxLen = message->data[offset++]; - colmetadata[currentColumn].precision = message->data[offset++]; - colmetadata[currentColumn].scale = message->data[offset++]; + colmetadata[currentColumn].maxLen = message->data[bcpOffset++]; + colmetadata[currentColumn].precision = message->data[bcpOffset++]; + colmetadata[currentColumn].scale = message->data[bcpOffset++]; break; case TDS_TYPE_CHAR: case TDS_TYPE_VARCHAR: @@ -293,12 +301,12 @@ GetBulkLoadRequest(StringInfo message) case TDS_TYPE_NVARCHAR: { CheckMessageHasEnoughBytesToReadColMetadata(&message, STRING_COLUMNMETADATA_LEN); - memcpy(&colmetadata[currentColumn].maxLen, &message->data[offset], sizeof(uint16)); - offset += sizeof(uint16); + memcpy(&colmetadata[currentColumn].maxLen, &message->data[bcpOffset], sizeof(uint16)); + bcpOffset += sizeof(uint16); - memcpy(&collation, &message->data[offset], sizeof(uint32_t)); - offset += sizeof(uint32_t); - colmetadata[currentColumn].sortId = message->data[offset++]; + memcpy(&collation, &message->data[bcpOffset], sizeof(uint32_t)); + bcpOffset += sizeof(uint32_t); + colmetadata[currentColumn].sortId = message->data[bcpOffset++]; colmetadata[currentColumn].encoding = TdsGetEncoding(collation); } break; @@ -309,53 +317,53 @@ GetBulkLoadRequest(StringInfo message) uint16_t tableLen = 0; CheckMessageHasEnoughBytesToReadColMetadata(&message, sizeof(uint32_t)); - memcpy(&colmetadata[currentColumn].maxLen, &message->data[offset], sizeof(uint32_t)); - offset += sizeof(uint32_t); + memcpy(&colmetadata[currentColumn].maxLen, &message->data[bcpOffset], sizeof(uint32_t)); + bcpOffset += sizeof(uint32_t); /* Read collation(LICD) and sort-id for TEXT and NTEXT. */ if (colmetadata[currentColumn].columnTdsType == TDS_TYPE_TEXT || colmetadata[currentColumn].columnTdsType == TDS_TYPE_NTEXT) { CheckMessageHasEnoughBytesToReadColMetadata(&message, sizeof(uint32_t) + 1); - memcpy(&collation, &message->data[offset], sizeof(uint32_t)); - offset += sizeof(uint32_t); - colmetadata[currentColumn].sortId = message->data[offset++]; + memcpy(&collation, &message->data[bcpOffset], sizeof(uint32_t)); + bcpOffset += sizeof(uint32_t); + colmetadata[currentColumn].sortId = message->data[bcpOffset++]; colmetadata[currentColumn].encoding = TdsGetEncoding(collation); } CheckMessageHasEnoughBytesToReadColMetadata(&message, sizeof(uint16_t)); - memcpy(&tableLen, &message->data[offset], sizeof(uint16_t)); - offset += sizeof(uint16_t); + memcpy(&tableLen, &message->data[bcpOffset], sizeof(uint16_t)); + bcpOffset += sizeof(uint16_t); /* Skip table name for now. */ CheckMessageHasEnoughBytesToReadColMetadata(&message, tableLen * 2); - offset += tableLen * 2; + bcpOffset += tableLen * 2; } break; case TDS_TYPE_XML: { CheckMessageHasEnoughBytesToReadColMetadata(&message, 1); - colmetadata[currentColumn].maxLen = message->data[offset++]; + colmetadata[currentColumn].maxLen = message->data[bcpOffset++]; } break; case TDS_TYPE_DATETIME2: { CheckMessageHasEnoughBytesToReadColMetadata(&message, FIXED_LEN_TYPE_COLUMNMETADATA_LEN); - colmetadata[currentColumn].scale = message->data[offset++]; + colmetadata[currentColumn].scale = message->data[bcpOffset++]; colmetadata[currentColumn].maxLen = 8; } break; case TDS_TYPE_TIME: { CheckMessageHasEnoughBytesToReadColMetadata(&message, FIXED_LEN_TYPE_COLUMNMETADATA_LEN); - colmetadata[currentColumn].scale = message->data[offset++]; + colmetadata[currentColumn].scale = message->data[bcpOffset++]; colmetadata[currentColumn].maxLen = 5; } break; case TDS_TYPE_DATETIMEOFFSET: { CheckMessageHasEnoughBytesToReadColMetadata(&message, FIXED_LEN_TYPE_COLUMNMETADATA_LEN); - colmetadata[currentColumn].scale = message->data[offset++]; + colmetadata[currentColumn].scale = message->data[bcpOffset++]; colmetadata[currentColumn].maxLen = 10; } break; @@ -365,8 +373,8 @@ GetBulkLoadRequest(StringInfo message) uint16 plp; CheckMessageHasEnoughBytesToReadColMetadata(&message, BINARY_COLUMNMETADATA_LEN); - memcpy(&plp, &message->data[offset], sizeof(uint16)); - offset += sizeof(uint16); + memcpy(&plp, &message->data[bcpOffset], sizeof(uint16)); + bcpOffset += sizeof(uint16); colmetadata[currentColumn].maxLen = plp; } break; @@ -375,8 +383,8 @@ GetBulkLoadRequest(StringInfo message) break; case TDS_TYPE_SQLVARIANT: CheckMessageHasEnoughBytesToReadColMetadata(&message, SQL_VARIANT_COLUMNMETADATA_LEN); - memcpy(&colmetadata[currentColumn].maxLen, &message->data[offset], sizeof(uint32_t)); - offset += sizeof(uint32_t); + memcpy(&colmetadata[currentColumn].maxLen, &message->data[bcpOffset], sizeof(uint32_t)); + bcpOffset += sizeof(uint32_t); break; /* @@ -471,15 +479,15 @@ GetBulkLoadRequest(StringInfo message) /* Column Name */ CheckMessageHasEnoughBytesToReadColMetadata(&message, sizeof(uint8_t)); - memcpy(&colmetadata[currentColumn].colNameLen, &message->data[offset++], sizeof(uint8_t)); + memcpy(&colmetadata[currentColumn].colNameLen, &message->data[bcpOffset++], sizeof(uint8_t)); CheckMessageHasEnoughBytesToReadColMetadata(&message, colmetadata[currentColumn].colNameLen * 2); colmetadata[currentColumn].colName = (char *) palloc0(colmetadata[currentColumn].colNameLen * sizeof(char) * 2 + 1); - memcpy(colmetadata[currentColumn].colName, &message->data[offset], + memcpy(colmetadata[currentColumn].colName, &message->data[bcpOffset], colmetadata[currentColumn].colNameLen * 2); colmetadata[currentColumn].colName[colmetadata[currentColumn].colNameLen * 2] = '\0'; - offset += colmetadata[currentColumn].colNameLen * 2; + bcpOffset += colmetadata[currentColumn].colNameLen * 2; } request->firstMessage = makeStringInfo(); appendBinaryStringInfo(request->firstMessage, message->data, message->len); @@ -506,7 +514,7 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message) CheckMessageHasEnoughBytesToReadRows(&message, 1); /* Loop over each row. */ - while ((uint8_t) message->data[offset] == TDS_TOKEN_ROW + while ((uint8_t) message->data[bcpOffset] == TDS_TOKEN_ROW && request->currentBatchSize < pltsql_plugin_handler_ptr->get_insert_bulk_kilobytes_per_batch() * 1024 && request->rowCount < pltsql_plugin_handler_ptr->get_insert_bulk_rows_per_batch()) { @@ -518,7 +526,7 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message) rowData->columnValues = palloc0(request->colCount * sizeof(Datum)); rowData->isNull = palloc0(request->colCount * sizeof(bool)); - offset++; + bcpOffset++; request->currentBatchSize++; while (i != request->colCount) /* Loop over each column. */ @@ -544,7 +552,7 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message) else { CheckMessageHasEnoughBytesToReadRows(&message, 1); - len = message->data[offset++]; + len = message->data[bcpOffset++]; request->currentBatchSize++; if (len == 0) /* null */ @@ -559,7 +567,7 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message) CheckMessageHasEnoughBytesToReadRows(&message, len); /* Build temp Stringinfo. */ - temp->data = &message->data[offset]; + temp->data = &message->data[bcpOffset]; temp->len = len; temp->maxlen = colmetadata[i].maxLen; temp->cursor = 0; @@ -606,7 +614,7 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message) break; } - offset += len; + bcpOffset += len; request->currentBatchSize += len; } break; @@ -623,7 +631,7 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message) CheckMessageHasEnoughBytesToReadRows(&message, 1); - len = message->data[offset++]; + len = message->data[bcpOffset++]; request->currentBatchSize++; if (len == 0) /* null */ { @@ -637,7 +645,7 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message) CheckMessageHasEnoughBytesToReadRows(&message, len); /* Build temp Stringinfo. */ - temp->data = &message->data[offset]; + temp->data = &message->data[bcpOffset]; temp->len = len; temp->maxlen = colmetadata[i].maxLen; temp->cursor = 0; @@ -648,7 +656,7 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message) */ rowData->columnValues[i] = TdsTypeNumericToDatum(temp, colmetadata[i].scale); - offset += len; + bcpOffset += len; request->currentBatchSize += len; } break; @@ -663,8 +671,8 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message) if (colmetadata[i].maxLen != 0xffff) { CheckMessageHasEnoughBytesToReadRows(&message, sizeof(short)); - memcpy(&len, &message->data[offset], sizeof(short)); - offset += sizeof(short); + memcpy(&len, &message->data[bcpOffset], sizeof(short)); + bcpOffset += sizeof(short); request->currentBatchSize += sizeof(short); if (len != 0xffff) { @@ -673,12 +681,12 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message) CheckMessageHasEnoughBytesToReadRows(&message, len); /* Build temp Stringinfo. */ - temp->data = &message->data[offset]; + temp->data = &message->data[bcpOffset]; temp->len = len; temp->maxlen = colmetadata[i].maxLen; temp->cursor = 0; - offset += len; + bcpOffset += len; request->currentBatchSize += len; } else /* null */ @@ -750,7 +758,7 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message) * Ignore the Data Text Ptr since its currently of no * use. */ - dataTextPtrLen = message->data[offset++]; + dataTextPtrLen = message->data[bcpOffset++]; request->currentBatchSize++; if (dataTextPtrLen == 0) /* null */ { @@ -761,14 +769,14 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message) CheckMessageHasEnoughBytesToReadRows(&message, dataTextPtrLen + 8 + sizeof(uint32_t)); - offset += dataTextPtrLen; + bcpOffset += dataTextPtrLen; request->currentBatchSize += dataTextPtrLen; - offset += 8; /* TODO: Ignored the Data Text + bcpOffset += 8; /* TODO: Ignored the Data Text * TimeStamp for now. */ request->currentBatchSize += 8; - memcpy(&len, &message->data[offset], sizeof(uint32_t)); - offset += sizeof(uint32_t); + memcpy(&len, &message->data[bcpOffset], sizeof(uint32_t)); + bcpOffset += sizeof(uint32_t); request->currentBatchSize += sizeof(uint32_t); if (len == 0) /* null */ { @@ -782,7 +790,7 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message) CheckMessageHasEnoughBytesToReadRows(&message, len); /* Build temp Stringinfo. */ - temp->data = &message->data[offset]; + temp->data = &message->data[bcpOffset]; temp->len = len; temp->maxlen = colmetadata[i].maxLen; temp->cursor = 0; @@ -804,7 +812,7 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message) break; } - offset += len; + bcpOffset += len; request->currentBatchSize += len; } break; @@ -843,8 +851,8 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message) { CheckMessageHasEnoughBytesToReadRows(&message, sizeof(uint32_t)); - memcpy(&len, &message->data[offset], sizeof(uint32_t)); - offset += sizeof(uint32_t); + memcpy(&len, &message->data[bcpOffset], sizeof(uint32_t)); + bcpOffset += sizeof(uint32_t); request->currentBatchSize += sizeof(uint32_t); if (len == 0) /* null */ @@ -859,7 +867,7 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message) CheckMessageHasEnoughBytesToReadRows(&message, len); /* Build temp Stringinfo. */ - temp->data = &message->data[offset]; + temp->data = &message->data[bcpOffset]; temp->len = len; temp->maxlen = colmetadata[i].maxLen; temp->cursor = 0; @@ -870,7 +878,7 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message) */ rowData->columnValues[i] = TdsTypeSqlVariantToDatum(temp); - offset += len; + bcpOffset += len; request->currentBatchSize += len; } break; @@ -888,17 +896,75 @@ SetBulkLoadRowData(TDSRequestBulkLoad request, StringInfo message) CheckMessageHasEnoughBytesToReadRows(&message, 1); if (request->rowCount < pltsql_plugin_handler_ptr->get_insert_bulk_rows_per_batch() && request->currentBatchSize < pltsql_plugin_handler_ptr->get_insert_bulk_kilobytes_per_batch() * 1024 - && (uint8_t) message->data[offset] != TDS_TOKEN_DONE) + && (uint8_t) message->data[bcpOffset] != TDS_TOKEN_DONE) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("The incoming tabular data stream (TDS) Bulk Load Request (BulkLoadBCP) protocol stream is incorrect. " "Row %d, unexpected token encountered processing the request. %d", - request->rowCount, (uint8_t) message->data[offset]))); + request->rowCount, (uint8_t) message->data[bcpOffset]))); pfree(temp); return message; } +static void +CleanupBCPDuringError(bool internal_sp_started, + volatile int before_subtxn_id, + volatile int before_lxid, + ResourceOwner oldowner, + MemoryContext oldcontext) +{ + int ret = 0; + + /* Reset BCP bcpOffset. */ + TdsResetBcpOffset(); + + HOLD_INTERRUPTS(); + + /* + * Discard remaining TDS_BULK_LOAD packets only if End of + * Message has not been reached for the current request. + * Otherwise we have no TDS_BULK_LOAD packets left for the + * current request that need to be discarded. + */ + if (!TdsGetRecvPacketEomStatus()) + { + HOLD_CANCEL_INTERRUPTS(); + ret = TdsDiscardAllPendingBcpRequest(); + RESUME_CANCEL_INTERRUPTS(); + } + + if (ret < 0) + TdsErrorContext->err_text = "EOF on TDS socket while fetching For Bulk Load Request"; + + if (internal_sp_started && before_lxid == MyProc->lxid && before_subtxn_id == GetCurrentSubTransactionId()) + { + if (TDS_DEBUG_ENABLED(TDS_DEBUG2)) + elog(LOG, "TSQL TXN PG semantics : Rollback internal savepoint"); + RollbackAndReleaseCurrentSubTransaction(); + CurrentResourceOwner = oldowner; + } + else if (!IsTransactionBlockActive()) + { + AbortCurrentTransaction(); + StartTransactionCommand(); + } + else + { + /* + * In the case of an error and transaction is active but the earlier savepoint + * did not match, then we shall rollback the current transaction and let the + * the actual error be relayed to the customer. + */ + elog(LOG, "The current transaction is rolled back since it " + "was in inconsistent state during Bulk Copy"); + pltsql_plugin_handler_ptr->pltsql_rollback_txn_callback(); + } + + MemoryContextSwitchTo(oldcontext); + RESUME_INTERRUPTS(); +} + /* * ProcessBCPRequest - Processes the request and calls the bulk_load_callback * for futher execution. @@ -910,12 +976,33 @@ ProcessBCPRequest(TDSRequest request) uint64 retValue = 0; TDSRequestBulkLoad req = (TDSRequestBulkLoad) request; StringInfo message = req->firstMessage; + volatile bool internal_sp_started = false; + volatile int before_subtxn_id = 0; + volatile int before_lxid = MyProc->lxid; + ResourceOwner oldowner = CurrentResourceOwner; + MemoryContext oldcontext = CurrentMemoryContext; + bool endOfMessage = false; set_ps_display("active"); TdsErrorContext->err_text = "Processing Bulk Load Request"; pgstat_report_activity(STATE_RUNNING, "Processing Bulk Load Request"); - while (1) + /* + * If a transaction is active then start a Savepoint to rollback + * later in case of error. + */ + if (IsTransactionBlockActive()) + { + if (TDS_DEBUG_ENABLED(TDS_DEBUG2)) + elog(LOG, "TSQL TXN Start internal savepoint"); + BeginInternalSubTransaction(NULL); + internal_sp_started = true; + before_subtxn_id = GetCurrentSubTransactionId(); + } + else + internal_sp_started = false; + + while (!endOfMessage) { int nargs = 0; Datum *values = NULL; @@ -929,24 +1016,8 @@ ProcessBCPRequest(TDSRequest request) } PG_CATCH(); { - int ret; - - HOLD_CANCEL_INTERRUPTS(); - - /* - * Discard remaining TDS_BULK_LOAD packets only if End of Message - * has not been reached for the current request. Otherwise we have - * no TDS_BULK_LOAD packets left for the current request that need - * to be discarded. - */ - if (!TdsGetRecvPacketEomStatus()) - ret = TdsDiscardAllPendingBcpRequest(); - - RESUME_CANCEL_INTERRUPTS(); - - if (ret < 0) - TdsErrorContext->err_text = "EOF on TDS socket while fetching For Bulk Load Request"; - + CleanupBCPDuringError(internal_sp_started, before_subtxn_id, + before_lxid, oldowner, oldcontext); PG_RE_THROW(); } PG_END_TRY(); @@ -955,79 +1026,79 @@ ProcessBCPRequest(TDSRequest request) * If the row-count is 0 then there are no rows left to be inserted. * We should begin with cleanup. */ - if (req->rowCount == 0) + if (req->rowCount > 0) { - /* Using Same callback function to do the clean-up. */ - pltsql_plugin_handler_ptr->bulk_load_callback(0, 0, NULL, NULL); - break; - } + nargs = req->colCount * req->rowCount; + values = palloc0(nargs * sizeof(Datum)); + nulls = palloc0(nargs * sizeof(bool)); - nargs = req->colCount * req->rowCount; - values = palloc0(nargs * sizeof(Datum)); - nulls = palloc0(nargs * sizeof(bool)); - - /* Flaten and create a 1-D array of Value & Datums */ - foreach(lc, req->rowData) - { - BulkLoadRowData *row = (BulkLoadRowData *) lfirst(lc); - - for (int currentColumn = 0; currentColumn < req->colCount; currentColumn++) + /* Flaten and create a 1-D array of Value & Datums */ + foreach(lc, req->rowData) { - if (row->isNull[currentColumn]) /* null */ - nulls[count] = row->isNull[currentColumn]; - else - values[count] = row->columnValues[currentColumn]; - count++; + BulkLoadRowData *row = (BulkLoadRowData *) lfirst(lc); + + for (int currentColumn = 0; currentColumn < req->colCount; currentColumn++) + { + if (row->isNull[currentColumn]) /* null */ + nulls[count] = row->isNull[currentColumn]; + else + values[count] = row->columnValues[currentColumn]; + count++; + } } } - if (req->rowData) /* If any row exists then do an insert. */ + PG_TRY(); { - PG_TRY(); + retValue += pltsql_plugin_handler_ptr->bulk_load_callback(req->rowCount ? req->colCount : 0, + req->rowCount, values, nulls); + + /* Free the List of Rows. */ + if (req->rowData) { - retValue += pltsql_plugin_handler_ptr->bulk_load_callback(req->colCount, - req->rowCount, values, nulls); + list_free_deep(req->rowData); + req->rowData = NIL; } - PG_CATCH(); - { - int ret; + /* If there we no rows then we have reached the end of the loop. */ + else + endOfMessage = true; - HOLD_CANCEL_INTERRUPTS(); - HOLD_INTERRUPTS(); - - /* - * Discard remaining TDS_BULK_LOAD packets only if End of - * Message has not been reached for the current request. - * Otherwise we have no TDS_BULK_LOAD packets left for the - * current request that need to be discarded. - */ - if (!TdsGetRecvPacketEomStatus()) - ret = TdsDiscardAllPendingBcpRequest(); - - RESUME_CANCEL_INTERRUPTS(); - - if (ret < 0) - TdsErrorContext->err_text = "EOF on TDS socket while fetching For Bulk Load Request"; - - if (TDS_DEBUG_ENABLED(TDS_DEBUG2)) - ereport(LOG, - (errmsg("Bulk Load Request. Number of Rows: %d and Number of columns: %d.", - req->rowCount, req->colCount), - errhidestmt(true))); - - RESUME_INTERRUPTS(); - PG_RE_THROW(); - } - PG_END_TRY(); - /* Free the List of Rows. */ - list_free_deep(req->rowData); - req->rowData = NIL; if (values) pfree(values); if (nulls) pfree(nulls); } + PG_CATCH(); + { + if (TDS_DEBUG_ENABLED(TDS_DEBUG2)) + ereport(LOG, + (errmsg("Bulk Load Request. Number of Rows: %d and Number of columns: %d.", + req->rowCount, req->colCount), + errhidestmt(true))); + + CleanupBCPDuringError(internal_sp_started, before_subtxn_id, + before_lxid, oldowner, oldcontext); + PG_RE_THROW(); + } + PG_END_TRY(); + } + /* Reset the offset at the end of the request. */ + TdsResetBcpOffset(); + + /* If we Started an internal savepoint then release it. */ + if (internal_sp_started && before_subtxn_id == GetCurrentSubTransactionId()) + { + elog(DEBUG5, "TSQL TXN Release internal savepoint"); + ReleaseCurrentSubTransaction(); + CurrentResourceOwner = oldowner; + MemoryContextSwitchTo(oldcontext); } + /* Unlikely case where Transaction is active but the savepoints do not match. */ + else if (unlikely(internal_sp_started && before_subtxn_id != GetCurrentSubTransactionId())) + ereport(FATAL, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("The current Transaction was found to be in inconsisten state " + "during Bulk Copy"))); /* * Send Done Token if rows processed is a positive number. Command type - @@ -1054,7 +1125,6 @@ ProcessBCPRequest(TDSRequest request) pltsql_plugin_handler_ptr->stmt_needs_logging = false; error_context_stack = plerrcontext; } - offset = 0; } static int @@ -1066,8 +1136,8 @@ ReadBcpPlp(ParameterToken temp, StringInfo *message, TDSRequestBulkLoad request) unsigned long lenCheck = 0; CheckPlpMessageHasEnoughBytesToRead(message, sizeof(plpTok)); - memcpy(&plpTok, &(*message)->data[offset], sizeof(plpTok)); - offset += sizeof(plpTok); + memcpy(&plpTok, &(*message)->data[bcpOffset], sizeof(plpTok)); + bcpOffset += sizeof(plpTok); request->currentBatchSize += sizeof(plpTok); temp->plp = NULL; @@ -1083,11 +1153,11 @@ ReadBcpPlp(ParameterToken temp, StringInfo *message, TDSRequestBulkLoad request) uint32_t tempLen; CheckPlpMessageHasEnoughBytesToRead(message, sizeof(tempLen)); - if (offset + sizeof(tempLen) > (*message)->len) + if (bcpOffset + sizeof(tempLen) > (*message)->len) return STATUS_ERROR; - memcpy(&tempLen, &(*message)->data[offset], sizeof(tempLen)); - offset += sizeof(tempLen); + memcpy(&tempLen, &(*message)->data[bcpOffset], sizeof(tempLen)); + bcpOffset += sizeof(tempLen); request->currentBatchSize += sizeof(tempLen); /* PLP Terminator */ @@ -1096,7 +1166,7 @@ ReadBcpPlp(ParameterToken temp, StringInfo *message, TDSRequestBulkLoad request) plpTemp = palloc0(sizeof(PlpData)); plpTemp->next = NULL; - plpTemp->offset = offset; + plpTemp->offset = bcpOffset; plpTemp->len = tempLen; if (plpPrev == NULL) { @@ -1110,10 +1180,10 @@ ReadBcpPlp(ParameterToken temp, StringInfo *message, TDSRequestBulkLoad request) } CheckPlpMessageHasEnoughBytesToRead(message, plpTemp->len); - if (offset + plpTemp->len > (*message)->len) + if (bcpOffset + plpTemp->len > (*message)->len) return STATUS_ERROR; - offset += plpTemp->len; + bcpOffset += plpTemp->len; request->currentBatchSize += plpTemp->len; lenCheck += plpTemp->len; } diff --git a/contrib/babelfishpg_tds/src/backend/tds/tdscomm.c b/contrib/babelfishpg_tds/src/backend/tds/tdscomm.c index 810656613a..5b2ef60e75 100644 --- a/contrib/babelfishpg_tds/src/backend/tds/tdscomm.c +++ b/contrib/babelfishpg_tds/src/backend/tds/tdscomm.c @@ -867,7 +867,16 @@ TdsReadNextPendingBcpRequest(StringInfo message) if (TdsReadNextBuffer() == EOF) return EOF; - Assert(TdsRecvMessageType == TDS_BULK_LOAD); + + /* + * The driver could send an attention packet even in the middle of a + * large TDS request. In that case we should abort the entire request which + * has been read. + */ + if (TdsRecvMessageType != TDS_BULK_LOAD) + ereport(FATAL, + (errcode(ERRCODE_PROTOCOL_VIOLATION), + errmsg("Unexpected out of band packet while fetching a Bulk Load Request."))); readBytes = TdsLeftInPacket; diff --git a/contrib/babelfishpg_tds/src/backend/tds/tdsprotocol.c b/contrib/babelfishpg_tds/src/backend/tds/tdsprotocol.c index 038c244dda..68c4be8e4d 100644 --- a/contrib/babelfishpg_tds/src/backend/tds/tdsprotocol.c +++ b/contrib/babelfishpg_tds/src/backend/tds/tdsprotocol.c @@ -127,7 +127,7 @@ ResetTDSConnection(void) TdsErrorContext->err_text = "Resetting the TDS connection"; /* Make sure we've killed any active transaction */ - AbortOutOfAnyTransaction(); + pltsql_plugin_handler_ptr->pltsql_abort_any_transaction_callback(); /* * Save the transaction isolation level that should be restored after @@ -153,6 +153,7 @@ ResetTDSConnection(void) TdsProtocolInit(); TdsResetCache(); TdsResponseReset(); + TdsResetBcpOffset(); SetConfigOption("default_transaction_isolation", isolationOld, PGC_BACKEND, PGC_S_CLIENT); diff --git a/contrib/babelfishpg_tds/src/include/tds_int.h b/contrib/babelfishpg_tds/src/include/tds_int.h index 3cd2c06b54..68e7dd1a8c 100644 --- a/contrib/babelfishpg_tds/src/include/tds_int.h +++ b/contrib/babelfishpg_tds/src/include/tds_int.h @@ -367,4 +367,7 @@ extern int tds_parse_xml_decl(const xmlChar *str, size_t *lenp, extern char *TdsEncodingConversion(const char *s, int len, pg_enc src_encoding, pg_enc dest_encoding, int *encodedByteLen); extern coll_info_t TdsLookupCollationTableCallback(Oid oid); +/* Functions in tdsbulkload.c */ +extern void TdsResetBcpOffset(void); + #endif /* TDS_INT_H */ diff --git a/contrib/babelfishpg_tsql/src/pl_exec-2.c b/contrib/babelfishpg_tsql/src/pl_exec-2.c index 26d1d47255..a11280058c 100644 --- a/contrib/babelfishpg_tsql/src/pl_exec-2.c +++ b/contrib/babelfishpg_tsql/src/pl_exec-2.c @@ -3014,6 +3014,7 @@ execute_bulk_load_insert(int ncol, int nrow, pfree(cstmt->relation); } pfree(cstmt); + cstmt = NULL; } /* Reset Insert-Bulk Options. */ @@ -3047,27 +3048,11 @@ execute_bulk_load_insert(int ncol, int nrow, * In an error condition, the caller calls the function again to do * the cleanup. */ - MemoryContext oldcontext; - /* Cleanup cstate. */ EndBulkCopy(cstmt->cstate, true); if (ActiveSnapshotSet() && GetActiveSnapshot() == snap) PopActiveSnapshot(); - oldcontext = CurrentMemoryContext; - - /* - * If a transaction block is already in progress then abort it, else - * rollback entire transaction. - */ - if (!IsTransactionBlockActive()) - { - AbortCurrentTransaction(); - StartTransactionCommand(); - } - else - pltsql_rollback_txn(); - MemoryContextSwitchTo(oldcontext); /* Reset Insert-Bulk Options. */ insert_bulk_keep_nulls = prev_insert_bulk_keep_nulls; diff --git a/contrib/babelfishpg_tsql/src/pl_handler.c b/contrib/babelfishpg_tsql/src/pl_handler.c index b10dd208cd..b8c5aed6ae 100644 --- a/contrib/babelfishpg_tsql/src/pl_handler.c +++ b/contrib/babelfishpg_tsql/src/pl_handler.c @@ -3665,6 +3665,8 @@ _PG_init(void) (*pltsql_protocol_plugin_ptr)->sp_unprepare_callback = &sp_unprepare; (*pltsql_protocol_plugin_ptr)->reset_session_properties = &reset_session_properties; (*pltsql_protocol_plugin_ptr)->bulk_load_callback = &execute_bulk_load_insert; + (*pltsql_protocol_plugin_ptr)->pltsql_rollback_txn_callback = &pltsql_rollback_txn; + (*pltsql_protocol_plugin_ptr)->pltsql_abort_any_transaction_callback = &pltsql_abort_any_transaction; (*pltsql_protocol_plugin_ptr)->pltsql_declare_var_callback = &pltsql_declare_variable; (*pltsql_protocol_plugin_ptr)->pltsql_read_out_param_callback = &pltsql_read_composite_out_param; (*pltsql_protocol_plugin_ptr)->sqlvariant_set_metadata = common_utility_plugin_ptr->TdsSetMetaData; diff --git a/contrib/babelfishpg_tsql/src/pltsql.h b/contrib/babelfishpg_tsql/src/pltsql.h index d9ca6e73a5..21fb1c1ab6 100644 --- a/contrib/babelfishpg_tsql/src/pltsql.h +++ b/contrib/babelfishpg_tsql/src/pltsql.h @@ -1636,6 +1636,10 @@ typedef struct PLtsql_protocol_plugin uint64 (*bulk_load_callback) (int ncol, int nrow, Datum *Values, bool *Nulls); + void (*pltsql_rollback_txn_callback) (void); + + void (*pltsql_abort_any_transaction_callback) (void); + int (*pltsql_get_generic_typmod) (Oid funcid, int nargs, Oid declared_oid); const char *(*pltsql_get_logical_schema_name) (const char *physical_schema_name, bool missingOk); @@ -1982,6 +1986,7 @@ extern void PLTsqlRollbackTransaction(char *txnName, QueryCompletion *qc, bool c extern void pltsql_start_txn(void); extern void pltsql_commit_txn(void); extern void pltsql_rollback_txn(void); +extern void pltsql_abort_any_transaction(void); extern bool pltsql_get_errdata(int *tsql_error_code, int *tsql_error_severity, int *tsql_error_state); extern void pltsql_eval_txn_data(PLtsql_execstate *estate, PLtsql_stmt_execsql *stmt, CachedPlanSource *cachedPlanSource); extern bool is_sysname_column(ColumnDef *coldef); diff --git a/contrib/babelfishpg_tsql/src/pltsql_utils.c b/contrib/babelfishpg_tsql/src/pltsql_utils.c index 31dfc3bbd1..68937b8f7a 100644 --- a/contrib/babelfishpg_tsql/src/pltsql_utils.c +++ b/contrib/babelfishpg_tsql/src/pltsql_utils.c @@ -515,6 +515,13 @@ pltsql_rollback_txn(void) StartTransactionCommand(); } +void +pltsql_abort_any_transaction(void) +{ + NestedTranCount = 0; + AbortOutOfAnyTransaction(); +} + bool pltsql_get_errdata(int *tsql_error_code, int *tsql_error_severity, int *tsql_error_state) { diff --git a/test/dotnet/ExpectedOutput/insertBulkErrors.out b/test/dotnet/ExpectedOutput/insertBulkErrors.out new file mode 100644 index 0000000000..a4e892b12b --- /dev/null +++ b/test/dotnet/ExpectedOutput/insertBulkErrors.out @@ -0,0 +1,589 @@ +#Q#Create table sourceTable(a int, b int PRIMARY KEY) +#Q#Create table destinationTable(a int, b int PRIMARY KEY) +#Q#Insert into sourceTable values (1, 1); +#Q#Insert into sourceTable values (NULL, 2); +#Q#select @@trancount; +#D#int +1 +#Q#select @@trancount +#D#int +1 +#Q#Select * from sourceTable +#D#int#!#int +1#!#1 +#!#2 +#Q#Select * from destinationTable +#D#int#!#int +1#!#1 +#!#2 +#Q#drop table sourceTable +#Q#drop table destinationTable +#Q#Create table sourceTable(a int, b int PRIMARY KEY) +#Q#Create table destinationTable(a int, b int PRIMARY KEY) +#Q#Insert into sourceTable values (1, 1); +#Q#Insert into sourceTable values (NULL, 2); +#Q#select @@trancount; +#D#int +1 +#Q#select @@trancount +#D#int +1 +#Q#Select * from sourceTable +#D#int#!#int +1#!#1 +#!#2 +#Q#Select * from destinationTable +#Q#drop table sourceTable +#Q#drop table destinationTable +#Q#Create table sourceTable(a int, b int PRIMARY KEY) +#Q#Create table destinationTable(a int, b int PRIMARY KEY) +#Q#create index idx on destinationTable(a); +#Q#Insert into sourceTable values (1, 1); +#Q#Insert into sourceTable values (NULL, 2); +#Q#Select * from sourceTable +#D#int#!#int +1#!#1 +#!#2 +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_indexscan', 'off', false); +#D#text +off +#Q#Select * from destinationTable +#D#int#!#int +1#!#1 +#!#2 +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_indexscan', 'on', false); +#D#text +on +#Q#Select * from destinationTable +#D#int#!#int +1#!#1 +#!#2 +#Q#SELECT set_config('enable_bitmapscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_seqscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_indexscan', 'on', false); +#D#text +on +#Q#drop table sourceTable +#Q#drop table destinationTable +#Q#Create table sourceTable(a int, b int PRIMARY KEY) +#Q#Create table destinationTable(a int, b int PRIMARY KEY) +#Q#create index idx on destinationTable(a); +#Q#Insert into sourceTable values (1, 1); +#Q#Insert into sourceTable values (NULL, 2); +#Q#Select * from sourceTable +#D#int#!#int +1#!#1 +#!#2 +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_indexscan', 'off', false); +#D#text +off +#Q#Select * from destinationTable +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_indexscan', 'on', false); +#D#text +on +#Q#Select * from destinationTable +#Q#Select * from sourceTable +#D#int#!#int +1#!#1 +#!#2 +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_indexscan', 'off', false); +#D#text +off +#Q#Select * from destinationTable +#D#int#!#int +1#!#1 +#!#2 +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_indexscan', 'on', false); +#D#text +on +#Q#Select * from destinationTable +#D#int#!#int +1#!#1 +#!#2 +#Q#SELECT set_config('enable_bitmapscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_seqscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_indexscan', 'on', false); +#D#text +on +#Q#drop table sourceTable +#Q#drop table destinationTable +#Q#create table sourceTable(c1 int, c2 CHAR(1024)) +#Q#INSERT INTO sourceTable SELECT generate_series(1, 1001, 1), 'Foo' +#Q#create table destinationTable(c1 int PRIMARY KEY, c2 CHAR(1024)) +#Q#INSERT INTO destinationTable VALUES(1001, 'Foo') +#Q#create table sourceTable1(c1 int, c2 CHAR(1024)) +#Q#INSERT INTO sourceTable1 VALUES(1001, 'Foo') +#Q#INSERT INTO sourceTable1 SELECT generate_series(1, 1000, 1), 'Foo' +#Q#Select count(c1) from sourceTable +#D#int +1001 +#Q#select count(c1) from sourceTable1 +#D#int +1001 +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_indexscan', 'off', false); +#D#text +off +#Q#Select count(c1) from destinationTable +#D#int +1 +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_indexscan', 'on', false); +#D#text +on +#Q#Select count(c1) from destinationTable +#D#int +1 +#Q#SELECT set_config('enable_bitmapscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_seqscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_indexscan', 'on', false); +#D#text +on +#Q#drop table sourceTable +#Q#drop table sourceTable1 +#Q#drop table destinationTable +#Q#create table sourceTable(c1 int, c2 CHAR(1024)) +#Q#INSERT INTO sourceTable SELECT generate_series(1, 1001, 1), 'Foo' +#Q#create table destinationTable(c1 int, c2 CHAR(1024), check(c1 < 1000)) +#Q#create table sourceTable1(c1 int, c2 CHAR(1024)) +#Q#INSERT INTO sourceTable1 VALUES(1001, 'Foo') +#Q#INSERT INTO sourceTable1 SELECT generate_series(1, 1000, 1), 'Foo' +#Q#SELECT @@trancount +#D#int +1 +#Q#SELECT @@trancount +#D#int +1 +#Q#SELECT @@trancount +#D#int +1 +#Q#SELECT @@trancount +#D#int +1 +#Q#SELECT @@trancount +#D#int +1 +#Q#Select count(c1) from sourceTable +#D#int +1001 +#Q#select count(c1) from sourceTable1 +#D#int +1001 +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_indexscan', 'off', false); +#D#text +off +#Q#Select count(c1) from destinationTable +#D#int +0 +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_indexscan', 'on', false); +#D#text +on +#Q#Select count(c1) from destinationTable +#D#int +0 +#Q#SELECT set_config('enable_bitmapscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_seqscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_indexscan', 'on', false); +#D#text +on +#Q#drop table sourceTable +#Q#drop table sourceTable1 +#Q#drop table destinationTable +#Q#create table sourceTable(c1 int, c2 CHAR(1024)) +#Q#INSERT INTO sourceTable VALUES (1, 'Foo'), (2, 'Foo') +#Q#create table destinationTable(c1 int PRIMARY KEY, c2 CHAR(1024)) +#Q#INSERT INTO destinationTable VALUES(2, 'Foo') +#Q#Select * from sourceTable +#D#int#!#char +1#!#Foo +2#!#Foo +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_indexscan', 'off', false); +#D#text +off +#Q#Select c1 from destinationTable +#D#int +2 +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_indexscan', 'on', false); +#D#text +on +#Q#Select c1 from destinationTable +#D#int +2 +#Q#SELECT set_config('enable_bitmapscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_seqscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_indexscan', 'on', false); +#D#text +on +#Q#drop table sourceTable +#Q#drop table destinationTable +#Q#create table sourceTable(c1 int, c2 CHAR(1024)) +#Q#INSERT INTO sourceTable SELECT generate_series(1, 1001, 1), 'Foo' +#Q#create table destinationTable(c1 int, c2 CHAR(1024), check(c1 < 1000)) +#Q#create index idx on destinationTable(c1); +#Q#create table sourceTable1(c1 int, c2 CHAR(1024)) +#Q#INSERT INTO sourceTable1 VALUES(1001, 'Foo') +#Q#INSERT INTO sourceTable1 SELECT generate_series(1, 1000, 1), 'Foo' +#Q#INSERT INTO destinationTable VALUES (-1, 'Foo'); +#Q#INSERT INTO destinationTable VALUES (-2, 'Foo'); +#Q#SELECT @@trancount +#D#int +1 +#Q#SELECT @@trancount +#D#int +1 +#Q#SELECT @@trancount +#D#int +1 +#Q#SELECT @@trancount +#D#int +1 +#Q#SELECT @@trancount +#D#int +1 +#Q#SELECT @@trancount +#D#int +1 +#Q#Select count(c1) from sourceTable +#D#int +1001 +#Q#select count(c1) from sourceTable1 +#D#int +1001 +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_indexscan', 'off', false); +#D#text +off +#Q#Select count(c1) from destinationTable +#D#int +1 +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_indexscan', 'on', false); +#D#text +on +#Q#Select count(c1) from destinationTable +#D#int +1 +#Q#SELECT set_config('enable_bitmapscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_seqscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_indexscan', 'on', false); +#D#text +on +#Q#drop table sourceTable +#Q#drop table sourceTable1 +#Q#drop table destinationTable +#Q#create table sourceTable(c1 int, c2 CHAR(1024)) +#Q#INSERT INTO sourceTable SELECT generate_series(1, 1001, 1), 'Foo' +#Q#create table destinationTable(c1 int, c2 CHAR(1024), check(c1 < 1000)) +#Q#create index idx on destinationTable(c1); +#Q#create table sourceTable1(c1 int, c2 CHAR(1024)) +#Q#INSERT INTO sourceTable1 VALUES(1001, 'Foo') +#Q#INSERT INTO sourceTable1 SELECT generate_series(1, 1000, 1), 'Foo' +#Q#INSERT INTO destinationTable VALUES (-1, 'Foo'); +#Q#INSERT INTO destinationTable VALUES (-2, 'Foo'); +#Q#SELECT @@trancount +#D#int +1 +#Q#SELECT @@trancount +#D#int +1 +#Q#SELECT @@trancount +#D#int +1 +#Q#SELECT @@trancount +#D#int +1 +#Q#SELECT @@trancount +#D#int +1 +#Q#SELECT @@trancount +#D#int +1 +#Q#Select count(c1) from sourceTable +#D#int +1001 +#Q#select count(c1) from sourceTable1 +#D#int +1001 +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_indexscan', 'off', false); +#D#text +off +#Q#Select count(c1) from destinationTable +#D#int +4 +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_indexscan', 'on', false); +#D#text +on +#Q#Select count(c1) from destinationTable +#D#int +4 +#Q#SELECT set_config('enable_bitmapscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_seqscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_indexscan', 'on', false); +#D#text +on +#Q#drop table sourceTable +#Q#drop table sourceTable1 +#Q#drop table destinationTable +#Q# SET implicit_transactions ON +#Q#create table sourceTable(c1 int, c2 CHAR(1024)) +#Q#INSERT INTO sourceTable SELECT generate_series(1, 1001, 1), 'Foo' +#Q#create table destinationTable(c1 int, c2 CHAR(1024), check(c1 < 1000)) +#Q#create index idx on destinationTable(c1); +#Q#create table sourceTable1(c1 int, c2 CHAR(1024)) +#Q#INSERT INTO sourceTable1 VALUES(1001, 'Foo') +#Q#INSERT INTO sourceTable1 SELECT generate_series(1, 1000, 1), 'Foo' +#Q#INSERT INTO destinationTable VALUES (-1, 'Foo'); +#Q#INSERT INTO destinationTable VALUES (-2, 'Foo'); +#Q#SELECT @@trancount +#D#int +2 +#Q#SELECT @@trancount +#D#int +2 +#Q#SELECT @@trancount +#D#int +2 +#Q#SELECT @@trancount +#D#int +2 +#Q#SELECT @@trancount +#D#int +2 +#Q#SELECT @@trancount +#D#int +2 +#Q#Select count(c1) from sourceTable +#D#int +1001 +#Q#select count(c1) from sourceTable1 +#D#int +1001 +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_indexscan', 'off', false); +#D#text +off +#Q#Select count(c1) from destinationTable +#D#int +1 +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_indexscan', 'on', false); +#D#text +on +#Q#Select count(c1) from destinationTable +#D#int +1 +#Q#SELECT set_config('enable_bitmapscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_seqscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_indexscan', 'on', false); +#D#text +on +#Q#drop table sourceTable +#E#table "sourcetable" does not exist +#Q#drop table sourceTable1 +#E#table "sourcetable1" does not exist +#Q#drop table destinationTable +#E#table "destinationtable" does not exist +#Q#create table sourceTable(c1 int, c2 CHAR(1024)) +#Q#INSERT INTO sourceTable SELECT generate_series(1, 1001, 1), 'Foo' +#Q#create table destinationTable(c1 int, c2 CHAR(1024), check(c1 < 1000)) +#Q#create index idx on destinationTable(c1); +#Q#create table sourceTable1(c1 int, c2 CHAR(1024)) +#Q#INSERT INTO sourceTable1 VALUES(1001, 'Foo') +#Q#INSERT INTO sourceTable1 SELECT generate_series(1, 1000, 1), 'Foo' +#Q#INSERT INTO destinationTable VALUES (-1, 'Foo'); +#Q#INSERT INTO destinationTable VALUES (-2, 'Foo'); +#Q#SELECT @@trancount +#D#int +2 +#Q#SELECT @@trancount +#D#int +2 +#Q#SELECT @@trancount +#D#int +2 +#Q#SELECT @@trancount +#D#int +2 +#Q#SELECT @@trancount +#D#int +2 +#Q#SELECT @@trancount +#D#int +2 +#Q#Select count(c1) from sourceTable +#D#int +1001 +#Q#select count(c1) from sourceTable1 +#D#int +1001 +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_indexscan', 'off', false); +#D#text +off +#Q#Select count(c1) from destinationTable +#D#int +4 +#Q#SELECT set_config('enable_bitmapscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_seqscan', 'off', false); +#D#text +off +#Q#SELECT set_config('enable_indexscan', 'on', false); +#D#text +on +#Q#Select count(c1) from destinationTable +#D#int +4 +#Q#SELECT set_config('enable_bitmapscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_seqscan', 'on', false); +#D#text +on +#Q#SELECT set_config('enable_indexscan', 'on', false); +#D#text +on +#Q#drop table sourceTable +#Q#drop table sourceTable1 +#Q#drop table destinationTable +#Q# SET implicit_transactions OFF diff --git a/test/dotnet/input/InsertBulk/insertBulkErrors.txt b/test/dotnet/input/InsertBulk/insertBulkErrors.txt new file mode 100644 index 0000000000..0b9e869a4b --- /dev/null +++ b/test/dotnet/input/InsertBulk/insertBulkErrors.txt @@ -0,0 +1,494 @@ +########################################################## +#################### TEST DETAILS ######################## +### 1. Testing explicit transaction (error case handled in 5.) +### a. Commit without error +### b. Rollback without error +### 2. Index with without transaction +### 3. Primary Key error case +### 4. Unique constraint with error case +### 5. Check constraint with error case +### a. transaction testing during error scenarios +### b. @@trancount test - error should not terminate transaction +### c. Test CheckConstraint BCP Option Enabled +### d. Test Reusing the same connection for BCP even after error scenarios +### 6. Reset-connection testing with Primary Key error +### 7. Savepoint rollback and commit in error and non-error case. +### 8. implicit_transactions have no role to play here but we have still added tests. +### The above tests test the seq and index. +########################################################## + +####### Testing explicit transaction ####### +# commit and then check for inserts +Create table sourceTable(a int, b int PRIMARY KEY) +Create table destinationTable(a int, b int PRIMARY KEY) +Insert into sourceTable values (1, 1); +Insert into sourceTable values (NULL, 2); +txn#!#begin +select @@trancount; +traninsertbulk#!#sourceTable#!#destinationTable + +select @@trancount +txn#!#commit + +Select * from sourceTable +Select * from destinationTable +drop table sourceTable +drop table destinationTable + +# rollback and then check for inserts +Create table sourceTable(a int, b int PRIMARY KEY) +Create table destinationTable(a int, b int PRIMARY KEY) +Insert into sourceTable values (1, 1); +Insert into sourceTable values (NULL, 2); +txn#!#begin +select @@trancount; +# int +traninsertbulk#!#sourceTable#!#destinationTable + +select @@trancount +txn#!#rollback + +Select * from sourceTable +Select * from destinationTable +drop table sourceTable +drop table destinationTable + +# Index without transaction +Create table sourceTable(a int, b int PRIMARY KEY) +Create table destinationTable(a int, b int PRIMARY KEY) +create index idx on destinationTable(a); +Insert into sourceTable values (1, 1); +Insert into sourceTable values (NULL, 2); +insertbulk#!#sourceTable#!#destinationTable +Select * from sourceTable + +# Seq scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'on', false); +SELECT set_config('enable_indexscan', 'off', false); +Select * from destinationTable + +# Index scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'off', false); +SELECT set_config('enable_indexscan', 'on', false); +Select * from destinationTable + +SELECT set_config('enable_bitmapscan', 'on', false); +SELECT set_config('enable_seqscan', 'on', false); +SELECT set_config('enable_indexscan', 'on', false); +drop table sourceTable +drop table destinationTable + +####### Index with transaction ####### +Create table sourceTable(a int, b int PRIMARY KEY) +Create table destinationTable(a int, b int PRIMARY KEY) +create index idx on destinationTable(a); +Insert into sourceTable values (1, 1); +Insert into sourceTable values (NULL, 2); + +# transaction rollback test with index +txn#!#begin +traninsertbulk#!#sourceTable#!#destinationTable +txn#!#rollback +Select * from sourceTable + +# Seq scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'on', false); +SELECT set_config('enable_indexscan', 'off', false); +Select * from destinationTable + +# Index scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'off', false); +SELECT set_config('enable_indexscan', 'on', false); +Select * from destinationTable + +# transaction commit test with index +txn#!#begin +traninsertbulk#!#sourceTable#!#destinationTable +txn#!#commit +Select * from sourceTable + +# Seq scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'on', false); +SELECT set_config('enable_indexscan', 'off', false); +Select * from destinationTable + +# Index scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'off', false); +SELECT set_config('enable_indexscan', 'on', false); +Select * from destinationTable + +SELECT set_config('enable_bitmapscan', 'on', false); +SELECT set_config('enable_seqscan', 'on', false); +SELECT set_config('enable_indexscan', 'on', false); +drop table sourceTable +drop table destinationTable + + +####### Primary Key error ####### + +# last row is error (last packet will be flushed) +create table sourceTable(c1 int, c2 CHAR(1024)) +INSERT INTO sourceTable SELECT generate_series(1, 1001, 1), 'Foo' +create table destinationTable(c1 int PRIMARY KEY, c2 CHAR(1024)) +INSERT INTO destinationTable VALUES(1001, 'Foo') + +insertbulk#!#sourceTable#!#destinationTable + +# 1st row is error (remaining packets to be discarded) +create table sourceTable1(c1 int, c2 CHAR(1024)) +INSERT INTO sourceTable1 VALUES(1001, 'Foo') +INSERT INTO sourceTable1 SELECT generate_series(1, 1000, 1), 'Foo' + +insertbulk#!#sourceTable1#!#destinationTable + +Select count(c1) from sourceTable +select count(c1) from sourceTable1 + +# Seq scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'on', false); +SELECT set_config('enable_indexscan', 'off', false); +Select count(c1) from destinationTable + +# Index scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'off', false); +SELECT set_config('enable_indexscan', 'on', false); +Select count(c1) from destinationTable + +SELECT set_config('enable_bitmapscan', 'on', false); +SELECT set_config('enable_seqscan', 'on', false); +SELECT set_config('enable_indexscan', 'on', false); +drop table sourceTable +drop table sourceTable1 +drop table destinationTable + +####### Check ####### +##### THESE TESTS ALSO TEST REUSING THE SAME CONNECTION +##### ON WHICH WE ERROR OUT AND NEED TO RESET TDS STATE +##### WE ALSO SEE THAT TRANSACTION IS NOT ROLLED BACK FOR +##### ANY ERROR DURING BULK OPERATION + +# last row is error (last packet will be flushed) +create table sourceTable(c1 int, c2 CHAR(1024)) +INSERT INTO sourceTable SELECT generate_series(1, 1001, 1), 'Foo' +create table destinationTable(c1 int, c2 CHAR(1024), check(c1 < 1000)) + + +# 1st row is error (remaining packets to be discarded) +create table sourceTable1(c1 int, c2 CHAR(1024)) +INSERT INTO sourceTable1 VALUES(1001, 'Foo') +INSERT INTO sourceTable1 SELECT generate_series(1, 1000, 1), 'Foo' + +txn#!#begin +SELECT @@trancount +traninsertbulk#!#sourceTable#!#destinationTable +SELECT @@trancount +traninsertbulk#!#sourceTable1#!#destinationTable +SELECT @@trancount +traninsertbulk#!#sourceTable#!#destinationTable +SELECT @@trancount +traninsertbulk#!#sourceTable1#!#destinationTable +SELECT @@trancount +txn#!#commit + +Select count(c1) from sourceTable +select count(c1) from sourceTable1 + +# Seq scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'on', false); +SELECT set_config('enable_indexscan', 'off', false); +Select count(c1) from destinationTable + +# Index scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'off', false); +SELECT set_config('enable_indexscan', 'on', false); +Select count(c1) from destinationTable + +SELECT set_config('enable_bitmapscan', 'on', false); +SELECT set_config('enable_seqscan', 'on', false); +SELECT set_config('enable_indexscan', 'on', false); + +drop table sourceTable +drop table sourceTable1 +drop table destinationTable + +####### Reset-connection with error (retry the insert bulk in a loop) ####### +create table sourceTable(c1 int, c2 CHAR(1024)) +INSERT INTO sourceTable VALUES (1, 'Foo'), (2, 'Foo') +create table destinationTable(c1 int PRIMARY KEY, c2 CHAR(1024)) +INSERT INTO destinationTable VALUES(2, 'Foo') + +insertbulk#!#sourceTable#!#destinationTable +insertbulk#!#sourceTable#!#destinationTable +insertbulk#!#sourceTable#!#destinationTable +insertbulk#!#sourceTable#!#destinationTable +insertbulk#!#sourceTable#!#destinationTable +insertbulk#!#sourceTable#!#destinationTable +insertbulk#!#sourceTable#!#destinationTable +insertbulk#!#sourceTable#!#destinationTable +insertbulk#!#sourceTable#!#destinationTable +insertbulk#!#sourceTable#!#destinationTable + +Select * from sourceTable + +# Seq scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'on', false); +SELECT set_config('enable_indexscan', 'off', false); +Select c1 from destinationTable + +# Index scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'off', false); +SELECT set_config('enable_indexscan', 'on', false); +Select c1 from destinationTable + +SELECT set_config('enable_bitmapscan', 'on', false); +SELECT set_config('enable_seqscan', 'on', false); +SELECT set_config('enable_indexscan', 'on', false); +drop table sourceTable +drop table destinationTable + +####### Savepoint rollback with and without error ####### +# last row is error (last packet will be flushed) +create table sourceTable(c1 int, c2 CHAR(1024)) +INSERT INTO sourceTable SELECT generate_series(1, 1001, 1), 'Foo' +create table destinationTable(c1 int, c2 CHAR(1024), check(c1 < 1000)) +create index idx on destinationTable(c1); + +# 1st row is error (remaining packets to be discarded) +create table sourceTable1(c1 int, c2 CHAR(1024)) +INSERT INTO sourceTable1 VALUES(1001, 'Foo') +INSERT INTO sourceTable1 SELECT generate_series(1, 1000, 1), 'Foo' + +txn#!#begin +INSERT INTO destinationTable VALUES (-1, 'Foo'); +txn#!#savepoint#!#sp1 +INSERT INTO destinationTable VALUES (-2, 'Foo'); + +###### WITHOUT ERROR ###### +SELECT @@trancount +traninsertbulk#!#destinationTable#!#destinationTable + +###### WITH ERROR ###### +SELECT @@trancount +traninsertbulk#!#sourceTable#!#destinationTable +SELECT @@trancount +traninsertbulk#!#sourceTable1#!#destinationTable +SELECT @@trancount +traninsertbulk#!#sourceTable#!#destinationTable +SELECT @@trancount +traninsertbulk#!#sourceTable1#!#destinationTable +SELECT @@trancount + +txn#!#rollback#!#sp1 + +Select count(c1) from sourceTable +select count(c1) from sourceTable1 + +# Seq scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'on', false); +SELECT set_config('enable_indexscan', 'off', false); +Select count(c1) from destinationTable + +# Index scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'off', false); +SELECT set_config('enable_indexscan', 'on', false); +Select count(c1) from destinationTable + +SELECT set_config('enable_bitmapscan', 'on', false); +SELECT set_config('enable_seqscan', 'on', false); +SELECT set_config('enable_indexscan', 'on', false); + +txn#!#rollback + +drop table sourceTable +drop table sourceTable1 +drop table destinationTable + + +####### Savepoint commit with and without error ####### +# last row is error (last packet will be flushed) +create table sourceTable(c1 int, c2 CHAR(1024)) +INSERT INTO sourceTable SELECT generate_series(1, 1001, 1), 'Foo' +create table destinationTable(c1 int, c2 CHAR(1024), check(c1 < 1000)) +create index idx on destinationTable(c1); + +# 1st row is error (remaining packets to be discarded) +create table sourceTable1(c1 int, c2 CHAR(1024)) +INSERT INTO sourceTable1 VALUES(1001, 'Foo') +INSERT INTO sourceTable1 SELECT generate_series(1, 1000, 1), 'Foo' + +txn#!#begin +INSERT INTO destinationTable VALUES (-1, 'Foo'); +txn#!#savepoint#!#sp1 +INSERT INTO destinationTable VALUES (-2, 'Foo'); + +###### WITHOUT ERROR ###### +SELECT @@trancount +traninsertbulk#!#destinationTable#!#destinationTable +###### WITH ERROR ###### +SELECT @@trancount +traninsertbulk#!#sourceTable#!#destinationTable +SELECT @@trancount +traninsertbulk#!#sourceTable1#!#destinationTable +SELECT @@trancount +traninsertbulk#!#sourceTable#!#destinationTable +SELECT @@trancount +traninsertbulk#!#sourceTable1#!#destinationTable +SELECT @@trancount + +txn#!#commit#!#sp1 + +Select count(c1) from sourceTable +select count(c1) from sourceTable1 + +# Seq scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'on', false); +SELECT set_config('enable_indexscan', 'off', false); +Select count(c1) from destinationTable + +# Index scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'off', false); +SELECT set_config('enable_indexscan', 'on', false); +Select count(c1) from destinationTable + +SELECT set_config('enable_bitmapscan', 'on', false); +SELECT set_config('enable_seqscan', 'on', false); +SELECT set_config('enable_indexscan', 'on', false); + +drop table sourceTable +drop table sourceTable1 +drop table destinationTable + + SET implicit_transactions ON +####### implicit_transactions rollback with and without error ####### + +# last row is error (last packet will be flushed) +create table sourceTable(c1 int, c2 CHAR(1024)) +INSERT INTO sourceTable SELECT generate_series(1, 1001, 1), 'Foo' +create table destinationTable(c1 int, c2 CHAR(1024), check(c1 < 1000)) +create index idx on destinationTable(c1); + +# 1st row is error (remaining packets to be discarded) +create table sourceTable1(c1 int, c2 CHAR(1024)) +INSERT INTO sourceTable1 VALUES(1001, 'Foo') +INSERT INTO sourceTable1 SELECT generate_series(1, 1000, 1), 'Foo' + +txn#!#begin +INSERT INTO destinationTable VALUES (-1, 'Foo'); +txn#!#savepoint#!#sp1 +INSERT INTO destinationTable VALUES (-2, 'Foo'); + +###### WITHOUT ERROR ###### +SELECT @@trancount +traninsertbulk#!#destinationTable#!#destinationTable + +###### WITH ERROR ###### +SELECT @@trancount +traninsertbulk#!#sourceTable#!#destinationTable +SELECT @@trancount +traninsertbulk#!#sourceTable1#!#destinationTable +SELECT @@trancount +traninsertbulk#!#sourceTable#!#destinationTable +SELECT @@trancount +traninsertbulk#!#sourceTable1#!#destinationTable +SELECT @@trancount + +txn#!#rollback#!#sp1 + +Select count(c1) from sourceTable +select count(c1) from sourceTable1 + +# Seq scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'on', false); +SELECT set_config('enable_indexscan', 'off', false); +Select count(c1) from destinationTable + +# Index scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'off', false); +SELECT set_config('enable_indexscan', 'on', false); +Select count(c1) from destinationTable + +SELECT set_config('enable_bitmapscan', 'on', false); +SELECT set_config('enable_seqscan', 'on', false); +SELECT set_config('enable_indexscan', 'on', false); + +txn#!#rollback + +drop table sourceTable +drop table sourceTable1 +drop table destinationTable + +####### implicit_transactions commit with and without error ####### +# last row is error (last packet will be flushed) +create table sourceTable(c1 int, c2 CHAR(1024)) +INSERT INTO sourceTable SELECT generate_series(1, 1001, 1), 'Foo' +create table destinationTable(c1 int, c2 CHAR(1024), check(c1 < 1000)) +create index idx on destinationTable(c1); + +# 1st row is error (remaining packets to be discarded) +create table sourceTable1(c1 int, c2 CHAR(1024)) +INSERT INTO sourceTable1 VALUES(1001, 'Foo') +INSERT INTO sourceTable1 SELECT generate_series(1, 1000, 1), 'Foo' + +txn#!#begin +INSERT INTO destinationTable VALUES (-1, 'Foo'); +txn#!#savepoint#!#sp1 +INSERT INTO destinationTable VALUES (-2, 'Foo'); + +###### WITHOUT ERROR ###### +SELECT @@trancount +traninsertbulk#!#destinationTable#!#destinationTable +###### WITH ERROR ###### +SELECT @@trancount +traninsertbulk#!#sourceTable#!#destinationTable +SELECT @@trancount +traninsertbulk#!#sourceTable1#!#destinationTable +SELECT @@trancount +traninsertbulk#!#sourceTable#!#destinationTable +SELECT @@trancount +traninsertbulk#!#sourceTable1#!#destinationTable +SELECT @@trancount + +txn#!#commit#!#sp1 + +Select count(c1) from sourceTable +select count(c1) from sourceTable1 + +# Seq scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'on', false); +SELECT set_config('enable_indexscan', 'off', false); +Select count(c1) from destinationTable + +# Index scan +SELECT set_config('enable_bitmapscan', 'off', false); +SELECT set_config('enable_seqscan', 'off', false); +SELECT set_config('enable_indexscan', 'on', false); +Select count(c1) from destinationTable + +SELECT set_config('enable_bitmapscan', 'on', false); +SELECT set_config('enable_seqscan', 'on', false); +SELECT set_config('enable_indexscan', 'on', false); + +drop table sourceTable +drop table sourceTable1 +drop table destinationTable + + SET implicit_transactions OFF \ No newline at end of file diff --git a/test/dotnet/src/BatchRun.cs b/test/dotnet/src/BatchRun.cs index d424a36e28..c2184e7897 100644 --- a/test/dotnet/src/BatchRun.cs +++ b/test/dotnet/src/BatchRun.cs @@ -217,6 +217,15 @@ bool BatchRunner(DbConnection bblCnn, string queryFilePath, Serilog.Core.Logger string destinationTable = result[2]; testFlag &= testUtils.insertBulkCopy(bblCnn, bblCmd, sourceTable, destinationTable, logger, ref stCount); } + else if (strLine.ToLowerInvariant().StartsWith("traninsertbulk")) + { + var result = strLine.Split("#!#", StringSplitOptions.RemoveEmptyEntries); + testUtils.PrintToLogsOrConsole( + $"########################## INSERT BULK:- {strLine} ##########################", logger, "information"); + string sourceTable = result[1]; + string destinationTable = result[2]; + testFlag &= testUtils.insertBulkCopyWithTransaction(bblCnn, bblCmd, sourceTable, destinationTable, bblTransaction, logger, ref stCount); + } /* Case for sp_customtype RPC. */ else if (strLine.ToLowerInvariant().StartsWith("storedp")) { @@ -290,7 +299,7 @@ bool BatchRunner(DbConnection bblCnn, string queryFilePath, Serilog.Core.Logger } else if (query.ToLowerInvariant().StartsWith("insert") || query.ToLowerInvariant().StartsWith("update") || query.ToLowerInvariant().StartsWith("alter") || query.ToLowerInvariant().StartsWith("delete") || query.ToLowerInvariant().StartsWith("begin") || query.ToLowerInvariant().StartsWith("commit") - || query.ToLowerInvariant().StartsWith("rollback") || query.ToLowerInvariant().StartsWith("save") || query.ToLowerInvariant().StartsWith("use") + || query.ToLowerInvariant().StartsWith("rollback") || query.ToLowerInvariant().StartsWith("save") || query.ToLowerInvariant().StartsWith("use") || query.ToLowerInvariant().StartsWith(" set") || query.ToLowerInvariant().StartsWith("create") || query.ToLowerInvariant().StartsWith("drop") || query.ToLowerInvariant().StartsWith("exec") || query.ToLowerInvariant().StartsWith("declare")) { bblCmd?.Dispose(); diff --git a/test/dotnet/utils/ConfigSetup.cs b/test/dotnet/utils/ConfigSetup.cs index 611e5c2049..bb3c74296d 100644 --- a/test/dotnet/utils/ConfigSetup.cs +++ b/test/dotnet/utils/ConfigSetup.cs @@ -9,6 +9,8 @@ public static class ConfigSetup /* Declaring variables required for a Test Run. */ static readonly Dictionary Dictionary = LoadConfig(); public static readonly string BblConnectionString = Dictionary["bblConnectionString"]; + + public static readonly string BCPConnectionString = Dictionary["BCPConnectionString"]; public static readonly string QueryFolder = Dictionary["queryFolder"]; public static readonly string TestName = Dictionary["testName"]; public static readonly bool RunInParallel = bool.Parse(Dictionary["runInParallel"]); @@ -45,6 +47,9 @@ public static Dictionary LoadConfig() /* Creating Server Connection String and Query. */ dictionary["bblConnectionString"] = BuildConnectionString(dictionary["babel_URL"], dictionary["babel_port"], + dictionary["babel_databaseName"], + dictionary["babel_user"], dictionary["babel_password"]) + "pooling=false;"; + dictionary["BCPConnectionString"] = BuildConnectionString(dictionary["babel_URL"], dictionary["babel_port"], dictionary["babel_databaseName"], dictionary["babel_user"], dictionary["babel_password"]); return dictionary; @@ -56,10 +61,10 @@ static string BuildConnectionString(string url, string port, string db, string u { case "oledb": return @"Provider = " + ConfigSetup.Provider + ";Data Source = " + url + "," + port + "; Initial Catalog = " + db - + "; User ID = " + uid + "; Password = " + pwd + ";Pooling=false;"; + + "; User ID = " + uid + "; Password = " + pwd + ";"; case "sql": return @"Data Source = " + url + "," + port + "; Initial Catalog = " + db - + "; User ID = " + uid + "; Password = " + pwd + ";Pooling=false;"; + + "; User ID = " + uid + "; Password = " + pwd + ";"; default: throw new Exception("Driver Not Supported"); } diff --git a/test/dotnet/utils/TestUtils.cs b/test/dotnet/utils/TestUtils.cs index 243ee2e517..dd95723b53 100644 --- a/test/dotnet/utils/TestUtils.cs +++ b/test/dotnet/utils/TestUtils.cs @@ -39,10 +39,50 @@ public bool insertBulkCopy(DbConnection bblCnn, DbCommand bblCmd, String sourceT DbDataReader reader = null; try { + /* To Enforce Reset Connection. */ reader = bblCmd.ExecuteReader(); - SqlBulkCopy bulkCopy = new SqlBulkCopy(ConfigSetup.BblConnectionString); + using (SqlConnection destinationConnection = + new SqlConnection(ConfigSetup.BCPConnectionString)) + { + destinationConnection.Open(); + + SqlBulkCopy bulkCopy = new SqlBulkCopy(destinationConnection); + bulkCopy.DestinationTableName = destinationTable; + bulkCopy.WriteToServer(reader); + } + } + catch (Exception e) + { + PrintToLogsOrConsole("#################################################################", logger, "information"); + PrintToLogsOrConsole( + $"############# ERROR IN EXECUTING WITH BABEL ####################\n{e}\n", + logger, "information"); + stCount--; + return false; + } + finally + { + reader.Close(); + } + return true; + } + + public bool insertBulkCopyWithTransaction(DbConnection bblCnn, DbCommand bblCmd, String sourceTable, String destinationTable, DbTransaction transaction, Logger logger, ref int stCount) + { + bblCmd.CommandText = "Select * from " + sourceTable; + bblCmd.Transaction = transaction; + DbDataReader reader = null; + DataTable dataTable = new DataTable(); + try + { + reader = bblCmd.ExecuteReader(); + dataTable.Load(reader); + reader.Close(); + + /* Set CheckConstraints default for this API since this is the only mechanism to use BCP Options. */ + SqlBulkCopy bulkCopy = new SqlBulkCopy((SqlConnection)bblCnn, SqlBulkCopyOptions.CheckConstraints, (SqlTransaction) transaction); bulkCopy.DestinationTableName = destinationTable; - bulkCopy.WriteToServer(reader); + bulkCopy.WriteToServer(dataTable); } catch (Exception e) { @@ -403,7 +443,7 @@ public string AuthHelper(string strLine) dictionary["others"] = result[i].Split("|-|")[1]; } return @"Data Source = " + dictionary["url"] + "; Initial Catalog = " + dictionary["db"] + - "; User ID = " + dictionary["user"] + "; Password = " + dictionary["pwd"] + ";Pooling=false;" + dictionary["others"]; + "; User ID = " + dictionary["user"] + "; Password = " + dictionary["pwd"] + ";" + dictionary["others"]; } /* Depending on the OS we use the appropriate diff command. */