Skip to content

Commit

Permalink
Support CHECK CONSTRAINTS options and Fix Error aborting Cycle in TSQ…
Browse files Browse the repository at this point in the history
…L Bulk Copy (#2629)

This commit has the following changes:

1. Fix Permissions checking missed in d95b7ad
2. Fix the error handling behaviour in Bulk Copy. While processing a batch of inserts, if there are unexpected errors then the current design aborts the insert and later tries to cleanup the stale buffers. During cleanup we were not checking for aborted phase and thus flushing buffers even when we didnt have to. With this commit we fix this by not flushing during cleanup in the abort phase.
3. Also supported CHECK_CONSTRAINTS insert bulk options with this commit. The ideal behaviour is to not check any constraints unless user passes the CHECK_CONSTRAINTS options. We have now implemented the same for Babelfish.

Issues Resolved
BABEL-4200, BABEL-4991

Authored-by: Kushaal Shroff [email protected]
Signed-off-by: Kushaal Shroff [email protected]
  • Loading branch information
KushaalShroff authored May 29, 2024
1 parent 858698e commit d786226
Show file tree
Hide file tree
Showing 11 changed files with 215 additions and 15 deletions.
3 changes: 0 additions & 3 deletions contrib/babelfishpg_tds/src/backend/tds/tdsbulkload.c
Original file line number Diff line number Diff line change
Expand Up @@ -1006,9 +1006,6 @@ ProcessBCPRequest(TDSRequest request)

RESUME_CANCEL_INTERRUPTS();

/* Using Same callback function to do the clean-up. */
pltsql_plugin_handler_ptr->bulk_load_callback(0, 0, NULL, NULL);

if (ret < 0)
TdsErrorContext->err_text = "EOF on TDS socket while fetching For Bulk Load Request";

Expand Down
14 changes: 13 additions & 1 deletion contrib/babelfishpg_tsql/src/pl_exec-2.c
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,12 @@ BulkCopyStmt *cstmt = NULL;
int insert_bulk_rows_per_batch = DEFAULT_INSERT_BULK_ROWS_PER_BATCH;
int insert_bulk_kilobytes_per_batch = DEFAULT_INSERT_BULK_PACKET_SIZE;
bool insert_bulk_keep_nulls = false;
bool insert_bulk_check_constraints = false;

static int prev_insert_bulk_rows_per_batch = DEFAULT_INSERT_BULK_ROWS_PER_BATCH;
static int prev_insert_bulk_kilobytes_per_batch = DEFAULT_INSERT_BULK_PACKET_SIZE;
static bool prev_insert_bulk_keep_nulls = false;
static bool prev_insert_bulk_check_constraints = false;

/* return a underlying node if n is implicit casting and underlying node is a certain type of node */
static Node *get_underlying_node_from_implicit_casting(Node *n, NodeTag underlying_nodetype);
Expand Down Expand Up @@ -2976,6 +2978,11 @@ exec_stmt_insert_bulk(PLtsql_execstate *estate, PLtsql_stmt_insert_bulk *stmt)
prev_insert_bulk_keep_nulls = insert_bulk_keep_nulls;
insert_bulk_keep_nulls = true;
}
if (stmt->check_constraints)
{
prev_insert_bulk_check_constraints = insert_bulk_check_constraints;
insert_bulk_check_constraints = true;
}
return PLTSQL_RC_OK;
}

Expand All @@ -2995,7 +3002,7 @@ execute_bulk_load_insert(int ncol, int nrow,
/* Cleanup all the pointers. */
if (cstmt)
{
EndBulkCopy(cstmt->cstate);
EndBulkCopy(cstmt->cstate, false);
if (cstmt->attlist)
list_free_deep(cstmt->attlist);
if (cstmt->relation)
Expand All @@ -3011,6 +3018,7 @@ execute_bulk_load_insert(int ncol, int nrow,

/* Reset Insert-Bulk Options. */
insert_bulk_keep_nulls = prev_insert_bulk_keep_nulls;
insert_bulk_check_constraints = prev_insert_bulk_check_constraints;
insert_bulk_rows_per_batch = prev_insert_bulk_rows_per_batch;
insert_bulk_kilobytes_per_batch = prev_insert_bulk_kilobytes_per_batch;

Expand Down Expand Up @@ -3041,6 +3049,9 @@ execute_bulk_load_insert(int ncol, int nrow,
*/
MemoryContext oldcontext;

/* Cleanup cstate. */
EndBulkCopy(cstmt->cstate, true);

if (ActiveSnapshotSet() && GetActiveSnapshot() == snap)
PopActiveSnapshot();
oldcontext = CurrentMemoryContext;
Expand All @@ -3060,6 +3071,7 @@ execute_bulk_load_insert(int ncol, int nrow,

/* Reset Insert-Bulk Options. */
insert_bulk_keep_nulls = prev_insert_bulk_keep_nulls;
insert_bulk_check_constraints = prev_insert_bulk_check_constraints;
insert_bulk_rows_per_batch = prev_insert_bulk_rows_per_batch;
insert_bulk_kilobytes_per_batch = prev_insert_bulk_kilobytes_per_batch;

Expand Down
2 changes: 2 additions & 0 deletions contrib/babelfishpg_tsql/src/pltsql.h
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,7 @@ typedef struct PLtsql_stmt_insert_bulk
char *kilobytes_per_batch;
char *rows_per_batch;
bool keep_nulls;
bool check_constraints;
} PLtsql_stmt_insert_bulk;

/*
Expand Down Expand Up @@ -1823,6 +1824,7 @@ extern char *bulk_load_table_name;
extern int insert_bulk_rows_per_batch;
extern int insert_bulk_kilobytes_per_batch;
extern bool insert_bulk_keep_nulls;
extern bool insert_bulk_check_constraints;

/**********************************************************************
* Function declarations
Expand Down
11 changes: 5 additions & 6 deletions contrib/babelfishpg_tsql/src/pltsql_bulkcopy.c
Original file line number Diff line number Diff line change
Expand Up @@ -395,16 +395,15 @@ CopyMultiInsertBufferFlush(CopyMultiInsertInfo *miinfo,
*
* The buffer must be flushed before cleanup.
* Code is copied from src/backend/commands/copyfrom.c
*
* For TSQL, this can be called in abort phase to cleanup.
*/
static inline void
CopyMultiInsertBufferCleanup(CopyMultiInsertInfo *miinfo,
CopyMultiInsertBuffer *buffer)
{
int i;

/* Ensure buffer was flushed. */
Assert(buffer->nused == 0);

/* Remove back-link to ourself. */
buffer->resultRelInfo->ri_CopyMultiInsertBuffer = NULL;

Expand Down Expand Up @@ -728,7 +727,7 @@ ExecuteBulkCopy(BulkCopyState cstate, int rowCount, int colCount,
/*
* If the target is a plain table, check the constraints of the tuple.
*/
if (cstate->resultRelInfo->ri_RelationDesc->rd_att->constr)
if (insert_bulk_check_constraints && cstate->resultRelInfo->ri_RelationDesc->rd_att->constr)
ExecConstraints(cstate->resultRelInfo, myslot, cstate->estate);

/*
Expand Down Expand Up @@ -943,12 +942,12 @@ BeginBulkCopy(Relation rel,
* EndBulkCopy - Clean up storage and release resources for BULK COPY.
*/
void
EndBulkCopy(BulkCopyState cstate)
EndBulkCopy(BulkCopyState cstate, bool aborted)
{
if (cstate)
{
/* Flush any remaining bufferes out to the table. */
if (!CopyMultiInsertInfoIsEmpty(&cstate->multiInsertInfo))
if (!CopyMultiInsertInfoIsEmpty(&cstate->multiInsertInfo) && !aborted)
CopyMultiInsertInfoFlush(&cstate->multiInsertInfo, NULL);

if (cstate->bistate != NULL)
Expand Down
4 changes: 1 addition & 3 deletions contrib/babelfishpg_tsql/src/pltsql_bulkcopy.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,6 @@ typedef struct BulkCopyStateData
int seq_index; /* index for an identity column */
Oid seqid; /* oid of the sequence for an identity column */
int rv_index; /* index for a rowversion datatype column */

} BulkCopyStateData;

/* ----------------------
Expand All @@ -76,5 +75,4 @@ typedef struct BulkCopyStmt
} BulkCopyStmt;

extern void BulkCopy(BulkCopyStmt *stmt, uint64 *processed);
extern void EndBulkCopy(BulkCopyState cstate);

extern void EndBulkCopy(BulkCopyState cstate, bool aborted);
5 changes: 3 additions & 2 deletions contrib/babelfishpg_tsql/src/tsqlIface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4521,8 +4521,9 @@ makeInsertBulkStatement(TSqlParser::Dml_statementContext *ctx)
stmt->keep_nulls = true;

else if (pg_strcasecmp("CHECK_CONSTRAINTS", ::getFullText(option_list[i]->id()).c_str()) == 0)
throw PGErrorWrapperException(ERROR, ERRCODE_FEATURE_NOT_SUPPORTED, "insert bulk option check_constraints is not yet supported in babelfish", getLineAndPos(bulk_ctx->WITH()));

{
stmt->check_constraints = true;
}
else if (pg_strcasecmp("FIRE_TRIGGERS", ::getFullText(option_list[i]->id()).c_str()) == 0)
throw PGErrorWrapperException(ERROR, ERRCODE_FEATURE_NOT_SUPPORTED, "insert bulk option fire_triggers is not yet supported in babelfish", getLineAndPos(bulk_ctx->WITH()));

Expand Down
69 changes: 69 additions & 0 deletions test/dotnet/ExpectedOutput/bcp.out
Original file line number Diff line number Diff line change
Expand Up @@ -660,3 +660,72 @@ bcp#!#in#!#bcp_source#!#destinationTable
#!##!#
#Q#drop table sourceTable
#Q#drop table destinationTable
#Q#Create table sourceTable (a int);
#Q#Create table destinationTable(a int, check (a < 2))
#Q#INSERT INTO sourceTable SELECT generate_series(1, 1000, 1);
#Q#INSERT INTO sourceTable values (2);
bcp#!#out#!#bcp_source#!#sourceTable
bcp#!#in#!#bcp_source#!#destinationTable
#Q#Select count(*) from sourceTable
#D#int
1001
#Q#select count(*) from destinationTable
#D#int
1001
#Q#drop table sourceTable
#Q#drop table destinationTable
#Q#Create table sourceTable (a int);
#Q#Create table destinationTable(a int)
#Q#INSERT INTO sourceTable SELECT generate_series(1, 1500, 1);
bcp#!#out#!#bcp_source#!#sourceTable
bcp#!#in#!#bcp_source#!#destinationTable
#Q#Select count(*) from sourceTable
#D#int
1500
#Q#select count(*) from destinationTable
#D#int
1500
#Q#drop table sourceTable
#Q#drop table destinationTable
#Q#create table sourceTable(c1 int, c2 CHAR(1024))
#Q#INSERT INTO sourceTable SELECT generate_series(1, 1000, 1), 'Foo'
#Q#create table destinationTable(c1 int PRIMARY KEY, c2 CHAR(1024))
#Q#INSERT INTO destinationTable VALUES(1000, 'Foo')
bcp#!#out#!#bcp_source#!#sourceTable
bcp#!#in#!#bcp_source#!#destinationTable
#Q#Select count(*) from sourceTable
#D#int
1000
#Q#select count(*) from destinationTable
#D#int
1
#Q#drop table sourceTable
#Q#drop table destinationTable
#Q#Create table sourceTable (a int);
#Q#Create table destinationTable(a int, check (a < 2))
#Q#INSERT INTO sourceTable SELECT generate_series(1, 1000, 1);
#Q#INSERT INTO sourceTable values (2);
bcp#!#out#!#bcp_source#!#sourceTable
bcp -h "CHECK_CONSTRAINTS"#!#in#!#bcp_source#!#destinationTable
#Q#Select count(*) from sourceTable
#D#int
1001
#Q#select count(*) from destinationTable
#D#int
0
#Q#drop table sourceTable
#Q#drop table destinationTable
#Q#create table sourceTable(c1 int, c2 CHAR(1024))
#Q#INSERT INTO sourceTable SELECT generate_series(1, 1000, 1), 'Foo'
#Q#create table destinationTable(c1 int PRIMARY KEY, c2 CHAR(1024))
#Q#INSERT INTO destinationTable VALUES(1000, 'Foo')
bcp#!#out#!#bcp_source#!#sourceTable
bcp -h "CHECK_CONSTRAINTS"#!#in#!#bcp_source#!#destinationTable
#Q#Select count(*) from sourceTable
#D#int
1000
#Q#select count(*) from destinationTable
#D#int
1
#Q#drop table sourceTable
#Q#drop table destinationTable
3 changes: 3 additions & 0 deletions test/dotnet/ExpectedOutput/bcpOptions.out
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,9 @@ bcp -h "CHECK_CONSTRAINTS"#!#in#!#bcp_source#!#destinationTable2
1#!#1
2#!#2
#Q#Select * from destinationTable2
#D#bigint#!#bigint
1#!#1
2#!#2
#Q#drop table sourceTable
#Q#drop table destinationTable
#Q#drop table destinationTable2
Expand Down
35 changes: 35 additions & 0 deletions test/dotnet/ExpectedOutput/insertBulk.out
Original file line number Diff line number Diff line change
Expand Up @@ -407,3 +407,38 @@ hello#!#jello
10#!#hello
#Q#drop table sourceTable
#Q#drop table destinationTable
#Q#Create table sourceTable (a int);
#Q#Create table destinationTable(a int, check (a < 2))
#Q#INSERT INTO sourceTable SELECT generate_series(1, 1000, 1);
#Q#INSERT INTO sourceTable values (2);
#Q#Select count(*) from sourceTable
#D#int
1001
#Q#select count(*) from destinationTable
#D#int
1001
#Q#drop table sourceTable
#Q#drop table destinationTable
#Q#Create table sourceTable (a int);
#Q#Create table destinationTable(a int)
#Q#INSERT INTO sourceTable SELECT generate_series(1, 1500, 1);
#Q#Select count(*) from sourceTable
#D#int
1500
#Q#select count(*) from destinationTable
#D#int
1500
#Q#drop table sourceTable
#Q#drop table destinationTable
#Q#create table sourceTable(c1 int, c2 CHAR(1024))
#Q#INSERT INTO sourceTable SELECT generate_series(1, 1000, 1), 'Foo'
#Q#create table destinationTable(c1 int PRIMARY KEY, c2 CHAR(1024))
#Q#INSERT INTO destinationTable VALUES(1000, 'Foo')
#Q#Select count(*) from sourceTable
#D#int
1000
#Q#select count(*) from destinationTable
#D#int
1
#Q#drop table sourceTable
#Q#drop table destinationTable
30 changes: 30 additions & 0 deletions test/dotnet/input/InsertBulk/insertBulk.txt
Original file line number Diff line number Diff line change
Expand Up @@ -317,4 +317,34 @@ insertbulk#!#sourceTable#!#destinationTable
Select * from sourceTable
Select * from destinationTable
drop table sourceTable
drop table destinationTable


Create table sourceTable (a int);
Create table destinationTable(a int, check (a < 2))
INSERT INTO sourceTable SELECT generate_series(1, 1000, 1);
INSERT INTO sourceTable values (2);
insertbulk#!#sourceTable#!#destinationTable
Select count(*) from sourceTable
select count(*) from destinationTable
drop table sourceTable
drop table destinationTable

Create table sourceTable (a int);
Create table destinationTable(a int)
INSERT INTO sourceTable SELECT generate_series(1, 1500, 1);
insertbulk#!#sourceTable#!#destinationTable
Select count(*) from sourceTable
select count(*) from destinationTable
drop table sourceTable
drop table destinationTable

create table sourceTable(c1 int, c2 CHAR(1024))
INSERT INTO sourceTable SELECT generate_series(1, 1000, 1), 'Foo'
create table destinationTable(c1 int PRIMARY KEY, c2 CHAR(1024))
INSERT INTO destinationTable VALUES(1000, 'Foo')
insertbulk#!#sourceTable#!#destinationTable
Select count(*) from sourceTable
select count(*) from destinationTable
drop table sourceTable
drop table destinationTable
54 changes: 54 additions & 0 deletions test/dotnet/input/bcp.txt
Original file line number Diff line number Diff line change
Expand Up @@ -469,4 +469,58 @@ bcp#!#in#!#bcp_source#!#destinationTable
Select * from sourceTable
select * from destinationTable
drop table sourceTable
drop table destinationTable

Create table sourceTable (a int);
Create table destinationTable(a int, check (a < 2))
INSERT INTO sourceTable SELECT generate_series(1, 1000, 1);
INSERT INTO sourceTable values (2);
bcp#!#out#!#bcp_source#!#sourceTable
bcp#!#in#!#bcp_source#!#destinationTable
Select count(*) from sourceTable
select count(*) from destinationTable
drop table sourceTable
drop table destinationTable

Create table sourceTable (a int);
Create table destinationTable(a int)
INSERT INTO sourceTable SELECT generate_series(1, 1500, 1);
bcp#!#out#!#bcp_source#!#sourceTable
bcp#!#in#!#bcp_source#!#destinationTable
Select count(*) from sourceTable
select count(*) from destinationTable
drop table sourceTable
drop table destinationTable

create table sourceTable(c1 int, c2 CHAR(1024))
INSERT INTO sourceTable SELECT generate_series(1, 1000, 1), 'Foo'
create table destinationTable(c1 int PRIMARY KEY, c2 CHAR(1024))
INSERT INTO destinationTable VALUES(1000, 'Foo')
bcp#!#out#!#bcp_source#!#sourceTable
bcp#!#in#!#bcp_source#!#destinationTable
Select count(*) from sourceTable
select count(*) from destinationTable
drop table sourceTable
drop table destinationTable

Create table sourceTable (a int);
Create table destinationTable(a int, check (a < 2))
INSERT INTO sourceTable SELECT generate_series(1, 1000, 1);
INSERT INTO sourceTable values (2);
bcp#!#out#!#bcp_source#!#sourceTable
bcp -h "CHECK_CONSTRAINTS"#!#in#!#bcp_source#!#destinationTable
Select count(*) from sourceTable
select count(*) from destinationTable
drop table sourceTable
drop table destinationTable

create table sourceTable(c1 int, c2 CHAR(1024))
INSERT INTO sourceTable SELECT generate_series(1, 1000, 1), 'Foo'
create table destinationTable(c1 int PRIMARY KEY, c2 CHAR(1024))
INSERT INTO destinationTable VALUES(1000, 'Foo')
bcp#!#out#!#bcp_source#!#sourceTable
bcp -h "CHECK_CONSTRAINTS"#!#in#!#bcp_source#!#destinationTable
Select count(*) from sourceTable
select count(*) from destinationTable
drop table sourceTable
drop table destinationTable

0 comments on commit d786226

Please sign in to comment.