From 0e8b371e4740f2abe5ccf4c6cba83591183327e0 Mon Sep 17 00:00:00 2001 From: Vu Manh Khieu Date: Thu, 30 Jun 2022 02:59:35 +0000 Subject: [PATCH] Support partition INSERT --- connection.c | 232 ++++++++++++++++++++++++++++++++++++++++++++++- deparse.c | 25 +++-- expected/dml.out | 28 ++++-- mysql_fdw.c | 205 ++++++++++++++++++++++++++++++++++++++--- mysql_fdw.h | 7 +- sql/dml.sql | 24 +++-- 6 files changed, 477 insertions(+), 44 deletions(-) diff --git a/connection.c b/connection.c index 67b1283..3c59bc0 100644 --- a/connection.c +++ b/connection.c @@ -22,6 +22,7 @@ #include "utils/inval.h" #include "utils/memutils.h" #include "utils/syscache.h" +#include "access/xact.h" /* Length of host */ #define HOST_LEN 256 @@ -46,6 +47,8 @@ typedef struct ConnCacheEntry bool invalidated; /* true if reconnect is pending */ uint32 server_hashvalue; /* hash value of foreign server OID */ uint32 mapping_hashvalue; /* hash value of user mapping OID */ + int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 = + * one level of subxact open, etc */ } ConnCacheEntry; /* @@ -53,8 +56,17 @@ typedef struct ConnCacheEntry */ static HTAB *ConnectionHash = NULL; +/* tracks whether any work is needed in callback functions */ +static bool xact_got_connection = false; + static void mysql_inval_callback(Datum arg, int cacheid, uint32 hashvalue); +static void mysql_do_sql_command(MYSQL *conn, const char *sql, int level); +static void mysql_begin_remote_xact(ConnCacheEntry *entry); +static void mysql_xact_callback(XactEvent event, void *arg); +static void mysql_subxact_callback(SubXactEvent event, SubTransactionId mySubid, + SubTransactionId parentSubid, void *arg); + /* * mysql_get_connection: * Get a connection which can be used to execute queries on the remote @@ -92,12 +104,18 @@ mysql_get_connection(ForeignServer *server, UserMapping *user, mysql_opt *opt) mysql_inval_callback, (Datum) 0); CacheRegisterSyscacheCallback(USERMAPPINGOID, mysql_inval_callback, (Datum) 0); + + RegisterXactCallback(mysql_xact_callback, NULL); + RegisterSubXactCallback(mysql_subxact_callback, NULL); } /* Create hash key for the entry. Assume no pad bytes in key struct */ key.serverid = server->serverid; key.userid = user->userid; + /* Set flag that we did GetConnection during the current transaction */ + xact_got_connection = true; + /* * Find or create cached entry for requested connection. */ @@ -109,7 +127,7 @@ mysql_get_connection(ForeignServer *server, UserMapping *user, mysql_opt *opt) } /* If an existing entry has invalid connection then release it */ - if (entry->conn != NULL && entry->invalidated) + if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0) { elog(DEBUG3, "disconnecting mysql_fdw connection %p for option changes to take effect", entry->conn); @@ -122,6 +140,7 @@ mysql_get_connection(ForeignServer *server, UserMapping *user, mysql_opt *opt) entry->conn = mysql_connect(opt); elog(DEBUG3, "new mysql_fdw connection %p for server \"%s\"", entry->conn, server->servername); + entry->xact_depth = 0; /* * Once the connection is established, then set the connection @@ -137,6 +156,12 @@ mysql_get_connection(ForeignServer *server, UserMapping *user, mysql_opt *opt) GetSysCacheHashValue1(USERMAPPINGOID, ObjectIdGetDatum(user->umid)); } + + /* + * Start a new transaction or subtransaction if needed. + */ + mysql_begin_remote_xact(entry); + return entry->conn; } @@ -284,3 +309,208 @@ mysql_inval_callback(Datum arg, int cacheid, uint32 hashvalue) entry->invalidated = true; } } + +/* + * Convenience subroutine to issue a non-data-returning SQL command to remote + */ +static void +mysql_do_sql_command(MYSQL *conn, const char *sql, int level) +{ + elog(DEBUG3, "do_sql_command %s", sql); + + if (mysql_query(conn, sql) != 0) + { + ereport(level, + (errcode(ERRCODE_FDW_ERROR), + errmsg("Failed to execute sql: %s, Error %u: %s\n", sql, mysql_errno(conn), mysql_error(conn)) + )); + } +} + +/* + * Start remote transaction or subtransaction, if needed. + */ +static void +mysql_begin_remote_xact(ConnCacheEntry *entry) +{ + int curlevel = GetCurrentTransactionNestLevel(); + + /* Start main transaction if we haven't yet */ + if (entry->xact_depth <= 0) + { + const char *sql = "START TRANSACTION"; + + elog(DEBUG3, "starting remote transaction on connection %p", + entry->conn); + + mysql_do_sql_command(entry->conn, sql, ERROR); + entry->xact_depth = 1; + } + + /* + * If we're in a subtransaction, stack up savepoints to match our level. + * This ensures we can rollback just the desired effects when a + * subtransaction aborts. + */ + while (entry->xact_depth < curlevel) + { + const char *sql = psprintf("SAVEPOINT s%d", entry->xact_depth + 1); + + mysql_do_sql_command(entry->conn, sql, ERROR); + entry->xact_depth++; + } +} + +/* + * mysql_xact_callback --- cleanup at main-transaction end. + */ +static void +mysql_xact_callback(XactEvent event, void *arg) +{ + HASH_SEQ_STATUS scan; + ConnCacheEntry *entry; + + /* Quick exit if no connections were touched in this transaction. */ + if (!xact_got_connection) + return; + + elog(DEBUG1, "xact_callback %d", event); + + /* + * Scan all connection cache entries to find open remote transactions, and + * close them. + */ + hash_seq_init(&scan, ConnectionHash); + while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) + { + /* Ignore cache entry if no open connection right now */ + if (entry->conn == NULL) + continue; + + /* If it has an open remote transaction, try to close it */ + if (entry->xact_depth > 0) + { + elog(DEBUG3, "closing remote transaction on connection %p", + entry->conn); + + switch (event) + { + case XACT_EVENT_PARALLEL_PRE_COMMIT: + case XACT_EVENT_PRE_COMMIT: + /* Commit all remote transactions */ + mysql_do_sql_command(entry->conn, "COMMIT", ERROR); + break; + case XACT_EVENT_PRE_PREPARE: + /* + * We disallow remote transactions that modified anything, + * since it's not very reasonable to hold them open until + * the prepared transaction is committed. For the moment, + * throw error unconditionally; later we might allow + * read-only cases. Note that the error will cause us to + * come right back here with event == XACT_EVENT_ABORT, so + * we'll clean up the connection state at that point. + */ + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot prepare a transaction that modified remote tables"))); + break; + case XACT_EVENT_PARALLEL_COMMIT: + case XACT_EVENT_COMMIT: + case XACT_EVENT_PREPARE: + /* Pre-commit should have closed the open transaction */ + elog(ERROR, "missed cleaning up connection during pre-commit"); + break; + case XACT_EVENT_PARALLEL_ABORT: + case XACT_EVENT_ABORT: + { + elog(DEBUG3, "abort transaction"); + /* + * rollback if in transaction + */ + mysql_do_sql_command(entry->conn, "ROLLBACK", WARNING); + break; + } + } + } + + /* Reset state to show we're out of a transaction */ + entry->xact_depth = 0; + } + /* + * Regardless of the event type, we can now mark ourselves as out of the + * transaction. + */ + xact_got_connection = false; +} + +/* + * mysql_subxact_callback --- cleanup at subtransaction end. + */ +static void +mysql_subxact_callback(SubXactEvent event, SubTransactionId mySubid, + SubTransactionId parentSubid, void *arg) +{ + HASH_SEQ_STATUS scan; + ConnCacheEntry *entry; + int curlevel; + + /* Nothing to do at subxact start, nor after commit. */ + if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB || + event == SUBXACT_EVENT_ABORT_SUB)) + return; + + /* Quick exit if no connections were touched in this transaction. */ + if (!xact_got_connection) + return; + + /* + * Scan all connection cache entries to find open remote subtransactions + * of the current level, and close them. + */ + curlevel = GetCurrentTransactionNestLevel(); + hash_seq_init(&scan, ConnectionHash); + while ((entry = (ConnCacheEntry *) hash_seq_search(&scan))) + { + char sql[100]; + + /* + * We only care about connections with open remote subtransactions of + * the current level. + */ + if (entry->conn == NULL || entry->xact_depth < curlevel) + continue; + + if (entry->xact_depth > curlevel) + elog(ERROR, "missed cleaning up remote subtransaction at level %d", + entry->xact_depth); + + if (event == SUBXACT_EVENT_PRE_COMMIT_SUB) + { + /* Commit all remote subtransactions during pre-commit */ + snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel); + mysql_do_sql_command(entry->conn, sql, ERROR); + } + else if (in_error_recursion_trouble()) + { + /* + * Don't try to clean up the connection if we're already in error + * recursion trouble. + */ + } + else + { + /* Rollback all remote subtransactions during abort */ + snprintf(sql, sizeof(sql), + "ROLLBACK TO SAVEPOINT s%d", + curlevel); + mysql_do_sql_command(entry->conn, sql, ERROR); + snprintf(sql, sizeof(sql), + "RELEASE SAVEPOINT s%d", + curlevel); + mysql_do_sql_command(entry->conn, sql, ERROR); + } + + /* OK, we're outta that level of subtransaction */ + entry->xact_depth--; + } +} diff --git a/deparse.c b/deparse.c index ddc8fff..dc4fac7 100644 --- a/deparse.c +++ b/deparse.c @@ -135,12 +135,12 @@ static void mysql_print_remote_param(int paramindex, Oid paramtype, static void mysql_print_remote_placeholder(Oid paramtype, int32 paramtypmod, deparse_expr_cxt *context); static void mysql_deparse_relation(StringInfo buf, Relation rel); -static void mysql_deparse_target_list(StringInfo buf, PlannerInfo *root, +static void mysql_deparse_target_list(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel, Bitmapset *attrs_used, List **retrieved_attrs); static void mysql_deparse_column_ref(StringInfo buf, int varno, int varattno, - PlannerInfo *root, bool qualify_col); + RangeTblEntry *rte, bool qualify_col); static void mysql_deparse_select_sql(List *tlist, List **retrieved_attrs, deparse_expr_cxt *context); static void mysql_append_conditions(List *exprs, deparse_expr_cxt *context); @@ -370,7 +370,7 @@ mysql_deparse_select_sql(List *tlist, List **retrieved_attrs, rel = table_open(rte->relid, NoLock); #endif - mysql_deparse_target_list(buf, root, foreignrel->relid, rel, + mysql_deparse_target_list(buf, rte, foreignrel->relid, rel, fpinfo->attrs_used, retrieved_attrs); #if PG_VERSION_NUM < 130000 @@ -449,9 +449,10 @@ mysql_deparse_from_expr(List *quals, deparse_expr_cxt *context) * The statement text is appended to buf, and we also create an integer List * of the columns being retrieved by RETURNING (if any), which is returned * to *retrieved_attrs. + * rte = planner_rt_fetch(rtindex, root) */ void -mysql_deparse_insert(StringInfo buf, PlannerInfo *root, Index rtindex, +mysql_deparse_insert(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel, List *targetAttrs) { ListCell *lc; @@ -475,7 +476,7 @@ mysql_deparse_insert(StringInfo buf, PlannerInfo *root, Index rtindex, appendStringInfoString(buf, ", "); first = false; - mysql_deparse_column_ref(buf, rtindex, attnum, root, false); + mysql_deparse_column_ref(buf, rtindex, attnum, rte, false); } appendStringInfoString(buf, ") VALUES ("); @@ -513,7 +514,7 @@ mysql_deparse_analyze(StringInfo sql, char *dbname, char *relname) * This is used for both SELECT and RETURNING targetlists. */ static void -mysql_deparse_target_list(StringInfo buf, PlannerInfo *root, Index rtindex, +mysql_deparse_target_list(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel, Bitmapset *attrs_used, List **retrieved_attrs) { @@ -544,7 +545,7 @@ mysql_deparse_target_list(StringInfo buf, PlannerInfo *root, Index rtindex, appendStringInfoString(buf, ", "); first = false; - mysql_deparse_column_ref(buf, rtindex, i, root, false); + mysql_deparse_column_ref(buf, rtindex, i, rte, false); *retrieved_attrs = lappend_int(*retrieved_attrs, i); } } @@ -560,9 +561,8 @@ mysql_deparse_target_list(StringInfo buf, PlannerInfo *root, Index rtindex, */ static void mysql_deparse_column_ref(StringInfo buf, int varno, int varattno, - PlannerInfo *root, bool qualify_col) + RangeTblEntry *rte, bool qualify_col) { - RangeTblEntry *rte; char *colname = NULL; List *options; ListCell *lc; @@ -570,9 +570,6 @@ mysql_deparse_column_ref(StringInfo buf, int varno, int varattno, /* varno must not be any of OUTER_VAR, INNER_VAR and INDEX_VAR. */ Assert(!IS_SPECIAL_VARNO(varno)); - /* Get RangeTblEntry from array in PlannerInfo. */ - rte = planner_rt_fetch(varno, root); - /* * If it's a column of a foreign table, and it has the column_name FDW * option, use that value. @@ -832,7 +829,7 @@ mysql_deparse_update(StringInfo buf, PlannerInfo *root, Index rtindex, appendStringInfoString(buf, ", "); first = false; - mysql_deparse_column_ref(buf, rtindex, attnum, root, false); + mysql_deparse_column_ref(buf, rtindex, attnum, planner_rt_fetch(rtindex, root), false); appendStringInfo(buf, " = ?"); pindex++; } @@ -876,7 +873,7 @@ mysql_deparse_var(Var *node, deparse_expr_cxt *context) { /* Var belongs to foreign table */ mysql_deparse_column_ref(context->buf, node->varno, node->varattno, - context->root, qualify_col); + planner_rt_fetch(node->varno, context->root), qualify_col); } else { diff --git a/expected/dml.out b/expected/dml.out index ddd5d43..c1db1b6 100644 --- a/expected/dml.out +++ b/expected/dml.out @@ -233,22 +233,34 @@ HINT: Try the COPY (SELECT ...) TO variant. COPY (SELECT * FROM f_mysql_test) TO stdout; 1 1 COPY (SELECT a FROM f_mysql_test) TO '/tmp/copy_test.txt' delimiter ','; --- Should give error message as copy from with foreign table not supported +-- Copy from with foreign table are supported DO $$ BEGIN COPY f_mysql_test(a) FROM '/tmp/copy_test.txt' delimiter ','; EXCEPTION WHEN others THEN - IF SQLERRM = 'COPY and foreign partition routing not supported in mysql_fdw' OR - SQLERRM = 'cannot copy to foreign table "f_mysql_test"' THEN - RAISE NOTICE 'ERROR: COPY and foreign partition routing not supported in mysql_fdw'; - ELSE - RAISE NOTICE '%', SQLERRM; - END IF; + RAISE NOTICE '%', SQLERRM; END; $$ LANGUAGE plpgsql; -NOTICE: ERROR: COPY and foreign partition routing not supported in mysql_fdw +NOTICE: failed to execute the MySQL query: +Column 'b' cannot be null +COPY (SELECT a + 1, b + 1 FROM f_mysql_test) TO '/tmp/copy_test.txt' delimiter ','; +COPY (SELECT * FROM f_mysql_test) TO stdout; +1 1 +DO +$$ +BEGIN + COPY f_mysql_test(a, b) FROM '/tmp/copy_test.txt' delimiter ','; + EXCEPTION WHEN others THEN + RAISE NOTICE '%', SQLERRM; +END; +$$ +LANGUAGE plpgsql; +COPY (SELECT * FROM f_mysql_test) TO stdout; +1 1 +2 2 +DELETE FROM f_mysql_test WHERE a = 2; -- Cleanup DELETE FROM fdw126_ft1; DELETE FROM f_empdata; diff --git a/mysql_fdw.c b/mysql_fdw.c index 8f7a7b2..a10d1fb 100644 --- a/mysql_fdw.c +++ b/mysql_fdw.c @@ -1654,7 +1654,7 @@ mysqlPlanForeignModify(PlannerInfo *root, switch (operation) { case CMD_INSERT: - mysql_deparse_insert(&sql, root, resultRelation, rel, targetAttrs); + mysql_deparse_insert(&sql, rte, resultRelation, rel, targetAttrs); break; case CMD_UPDATE: mysql_deparse_update(&sql, root, resultRelation, rel, targetAttrs, @@ -1798,6 +1798,9 @@ mysqlBeginForeignModify(ModifyTableState *mtstate, strlen(fmstate->query)) != 0) mysql_stmt_error_print(fmstate, "failed to prepare the MySQL query"); + /* Initialize auxiliary state */ + fmstate->aux_fmstate = NULL; + resultRelInfo->ri_FdwState = fmstate; } @@ -2373,31 +2376,207 @@ mysqlImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid) * mysqlBeginForeignInsert * Prepare for an insert operation triggered by partition routing * or COPY FROM. - * - * This is not yet supported, so raise an error. */ static void mysqlBeginForeignInsert(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo) { - ereport(ERROR, - (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION), - errmsg("COPY and foreign partition routing not supported in mysql_fdw"))); + MySQLFdwExecState *fmstate; + ModifyTable *plan = castNode(ModifyTable, mtstate->ps.plan); + EState *estate = mtstate->ps.state; + Index resultRelation; + Relation rel = resultRelInfo->ri_RelationDesc; + RangeTblEntry *rte; + TupleDesc tupdesc = RelationGetDescr(rel); + int attnum; + StringInfoData sql; + List *targetAttrs = NIL; + AttrNumber n_params; + Oid typefnoid = InvalidOid; + bool isvarlena = false; + ListCell *lc; + Oid foreignTableId = InvalidOid; + Oid userid; + ForeignServer *server; + UserMapping *user; + ForeignTable *table; + + /* + * If the foreign table we are about to insert routed rows into is also an + * UPDATE subplan result rel that will be updated later, proceeding with + * the INSERT will result in the later UPDATE incorrectly modifying those + * routed rows, so prevent the INSERT --- it would be nice if we could + * handle this case; but for now, throw an error for safety. + */ + if (plan && plan->operation == CMD_UPDATE && + (resultRelInfo->ri_usesFdwDirectModify || + resultRelInfo->ri_FdwState) +#if PG_VERSION_NUM < 140000 + && resultRelInfo > mtstate->resultRelInfo + mtstate->mt_whichplan +#endif + ) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("cannot route tuples into foreign table to be updated \"%s\"", + RelationGetRelationName(rel)))); + + initStringInfo(&sql); + + /* We transmit all columns that are defined in the foreign table. */ + for (attnum = 1; attnum <= tupdesc->natts; attnum++) + { + Form_pg_attribute attr = TupleDescAttr(tupdesc, attnum - 1); + + if (!attr->attisdropped) + targetAttrs = lappend_int(targetAttrs, attnum); +#if PG_VERSION_NUM >= 140000 + /* Ignore generated columns; they are set to DEFAULT */ + if (attr->attgenerated) + continue; +#endif + } + + /* Check if we add the ON CONFLICT clause to the remote query. */ + if (plan) + { + OnConflictAction onConflictAction = plan->onConflictAction; + + /* We only support DO NOTHING without an inference specification. */ + if (onConflictAction != ONCONFLICT_NONE) + elog(ERROR, "unexpected ON CONFLICT specification: %d", + (int) onConflictAction); + } + + /* + * If the foreign table is a partition that doesn't have a corresponding + * RTE entry, we need to create a new RTE describing the foreign table for + * use by deparseInsertSql and create_foreign_modify() below, after first + * copying the parent's RTE and modifying some fields to describe the + * foreign partition to work on. However, if this is invoked by UPDATE, + * the existing RTE may already correspond to this partition if it is one + * of the UPDATE subplan target rels; in that case, we can just use the + * existing RTE as-is. + */ + if (resultRelInfo->ri_RangeTableIndex == 0) + { + ResultRelInfo *rootResultRelInfo = resultRelInfo->ri_RootResultRelInfo; + + rte = exec_rt_fetch(rootResultRelInfo->ri_RangeTableIndex, estate); + rte = copyObject(rte); + rte->relid = RelationGetRelid(rel); + rte->relkind = RELKIND_FOREIGN_TABLE; + + /* + * For UPDATE, we must use the RT index of the first subplan target + * rel's RTE, because the core code would have built expressions for + * the partition, such as RETURNING, using that RT index as varno of + * Vars contained in those expressions. + */ + if (plan && plan->operation == CMD_UPDATE && + rootResultRelInfo->ri_RangeTableIndex == plan->rootRelation) + resultRelation = mtstate->resultRelInfo[0].ri_RangeTableIndex; + else + resultRelation = rootResultRelInfo->ri_RangeTableIndex; + } + else + { + resultRelation = resultRelInfo->ri_RangeTableIndex; + rte = exec_rt_fetch(resultRelation, estate); + } + + /* Construct the SQL command string. */ + mysql_deparse_insert(&sql, rte, resultRelation, rel, targetAttrs); + + /* Begin constructing MySQLFdwExecState. */ + userid = rte->checkAsUser ? rte->checkAsUser : GetUserId(); + foreignTableId = RelationGetRelid(rel); + table = GetForeignTable(foreignTableId); + server = GetForeignServer(table->serverid); + user = GetUserMapping(userid, server->serverid); + + fmstate = (MySQLFdwExecState *) palloc0(sizeof(MySQLFdwExecState)); + + fmstate->rel = rel; + fmstate->mysqlFdwOptions = mysql_get_options(foreignTableId, true); + fmstate->conn = mysql_get_connection(server, user, + fmstate->mysqlFdwOptions); + fmstate->query = sql.data; + fmstate->retrieved_attrs = targetAttrs; + n_params = list_length(fmstate->retrieved_attrs); + fmstate->p_flinfo = (FmgrInfo *) palloc0(sizeof(FmgrInfo) * n_params); + fmstate->p_nums = 0; + fmstate->temp_cxt = AllocSetContextCreate(estate->es_query_cxt, + "mysql_fdw temporary data", + ALLOCSET_DEFAULT_SIZES); + /* Initialize auxiliary state */ + fmstate->aux_fmstate = NULL; + + /* Set up for remaining transmittable parameters */ + foreach(lc, fmstate->retrieved_attrs) + { + int attnum = lfirst_int(lc); + Form_pg_attribute attr = TupleDescAttr(RelationGetDescr(rel), + attnum - 1); + + Assert(!attr->attisdropped); + + getTypeOutputInfo(attr->atttypid, &typefnoid, &isvarlena); + fmgr_info(typefnoid, &fmstate->p_flinfo[fmstate->p_nums]); + fmstate->p_nums++; + } + Assert(fmstate->p_nums <= n_params); + + /* Initialize mysql statment */ + fmstate->stmt = mysql_stmt_init(fmstate->conn); + if (!fmstate->stmt) + ereport(ERROR, + (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION), + errmsg("failed to initialize the MySQL query: \n%s", + mysql_error(fmstate->conn)))); + + /* Prepare mysql statment */ + if (mysql_stmt_prepare(fmstate->stmt, fmstate->query, + strlen(fmstate->query)) != 0) + mysql_stmt_error_print(fmstate, "failed to prepare the MySQL query"); + + /* + * If the given resultRelInfo already has PgFdwModifyState set, it means + * the foreign table is an UPDATE subplan result rel; in which case, store + * the resulting state into the aux_fmstate of the PgFdwModifyState. + */ + if (resultRelInfo->ri_FdwState) + { + Assert(plan && plan->operation == CMD_UPDATE); + Assert(resultRelInfo->ri_usesFdwDirectModify == false); + ((MySQLFdwExecState *) resultRelInfo->ri_FdwState)->aux_fmstate = fmstate; + } + else + resultRelInfo->ri_FdwState = fmstate; } /* * mysqlEndForeignInsert - * BeginForeignInsert() is not yet implemented, hence we do not - * have anything to cleanup as of now. We throw an error here just - * to make sure when we do that we do not forget to cleanup - * resources. + * Clean up resource in func BeginForeignInsert() */ static void mysqlEndForeignInsert(EState *estate, ResultRelInfo *resultRelInfo) { - ereport(ERROR, - (errcode(ERRCODE_FDW_UNABLE_TO_CREATE_EXECUTION), - errmsg("COPY and foreign partition routing not supported in mysql_fdw"))); + MySQLFdwExecState *fmstate = (MySQLFdwExecState *) resultRelInfo->ri_FdwState; + + Assert(fmstate != NULL); + + /* + * If the fmstate has aux_fmstate set, get the aux_fmstate (see + * mysqlBeginForeignInsert()) + */ + if (fmstate->aux_fmstate) + fmstate = fmstate->aux_fmstate; + + if (fmstate && fmstate->stmt) + { + mysql_stmt_close(fmstate->stmt); + fmstate->stmt = NULL; + } } #endif diff --git a/mysql_fdw.h b/mysql_fdw.h index db87793..5c9693c 100644 --- a/mysql_fdw.h +++ b/mysql_fdw.h @@ -172,6 +172,7 @@ typedef struct MySQLFdwExecState MYSQL_STMT *stmt; /* MySQL prepared stament handle */ mysql_table *table; char *query; /* Query string */ + Relation rel; /* relcache entry for the foreign table */ List *retrieved_attrs; /* list of target attribute numbers */ bool query_executed; /* have we executed the query? */ int numParams; /* number of parameters passed to query */ @@ -205,6 +206,10 @@ typedef struct MySQLFdwExecState /* Array for holding column values. */ Datum *wr_values; bool *wr_nulls; + + /* for update row movement if subplan result rel */ + struct MySQLFdwExecState *aux_fmstate; /* foreign-insert state, if + * created */ } MySQLFdwExecState; typedef struct MySQLFdwRelationInfo @@ -304,7 +309,7 @@ extern bool mysql_is_valid_option(const char *option, Oid context); extern mysql_opt *mysql_get_options(Oid foreigntableid, bool is_foreigntable); /* depare.c headers */ -extern void mysql_deparse_insert(StringInfo buf, PlannerInfo *root, +extern void mysql_deparse_insert(StringInfo buf, RangeTblEntry *rte, Index rtindex, Relation rel, List *targetAttrs); extern void mysql_deparse_update(StringInfo buf, PlannerInfo *root, diff --git a/sql/dml.sql b/sql/dml.sql index 4496f19..4766dc2 100644 --- a/sql/dml.sql +++ b/sql/dml.sql @@ -201,22 +201,32 @@ COPY f_mysql_test (a) TO stdout; COPY (SELECT * FROM f_mysql_test) TO stdout; COPY (SELECT a FROM f_mysql_test) TO '/tmp/copy_test.txt' delimiter ','; --- Should give error message as copy from with foreign table not supported +-- Copy from with foreign table are supported DO $$ BEGIN COPY f_mysql_test(a) FROM '/tmp/copy_test.txt' delimiter ','; EXCEPTION WHEN others THEN - IF SQLERRM = 'COPY and foreign partition routing not supported in mysql_fdw' OR - SQLERRM = 'cannot copy to foreign table "f_mysql_test"' THEN - RAISE NOTICE 'ERROR: COPY and foreign partition routing not supported in mysql_fdw'; - ELSE - RAISE NOTICE '%', SQLERRM; - END IF; + RAISE NOTICE '%', SQLERRM; +END; +$$ +LANGUAGE plpgsql; + +COPY (SELECT a + 1, b + 1 FROM f_mysql_test) TO '/tmp/copy_test.txt' delimiter ','; +COPY (SELECT * FROM f_mysql_test) TO stdout; +DO +$$ +BEGIN + COPY f_mysql_test(a, b) FROM '/tmp/copy_test.txt' delimiter ','; + EXCEPTION WHEN others THEN + RAISE NOTICE '%', SQLERRM; END; $$ LANGUAGE plpgsql; +COPY (SELECT * FROM f_mysql_test) TO stdout; +DELETE FROM f_mysql_test WHERE a = 2; + -- Cleanup DELETE FROM fdw126_ft1; DELETE FROM f_empdata;