Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support partition INSERT #256

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 231 additions & 1 deletion connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "utils/inval.h"
#include "utils/memutils.h"
#include "utils/syscache.h"
#include "access/xact.h"

/* Length of host */
#define HOST_LEN 256
Expand All @@ -46,15 +47,26 @@ typedef struct ConnCacheEntry
bool invalidated; /* true if reconnect is pending */
uint32 server_hashvalue; /* hash value of foreign server OID */
uint32 mapping_hashvalue; /* hash value of user mapping OID */
int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 =
* one level of subxact open, etc */
} ConnCacheEntry;

/*
* Connection cache (initialized on first use)
*/
static HTAB *ConnectionHash = NULL;

/* tracks whether any work is needed in callback functions */
static bool xact_got_connection = false;

static void mysql_inval_callback(Datum arg, int cacheid, uint32 hashvalue);

static void mysql_do_sql_command(MYSQL *conn, const char *sql, int level);
static void mysql_begin_remote_xact(ConnCacheEntry *entry);
static void mysql_xact_callback(XactEvent event, void *arg);
static void mysql_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
SubTransactionId parentSubid, void *arg);

/*
* mysql_get_connection:
* Get a connection which can be used to execute queries on the remote
Expand Down Expand Up @@ -92,12 +104,18 @@ mysql_get_connection(ForeignServer *server, UserMapping *user, mysql_opt *opt)
mysql_inval_callback, (Datum) 0);
CacheRegisterSyscacheCallback(USERMAPPINGOID,
mysql_inval_callback, (Datum) 0);

RegisterXactCallback(mysql_xact_callback, NULL);
RegisterSubXactCallback(mysql_subxact_callback, NULL);
}

/* Create hash key for the entry. Assume no pad bytes in key struct */
key.serverid = server->serverid;
key.userid = user->userid;

/* Set flag that we did GetConnection during the current transaction */
xact_got_connection = true;

/*
* Find or create cached entry for requested connection.
*/
Expand All @@ -109,7 +127,7 @@ mysql_get_connection(ForeignServer *server, UserMapping *user, mysql_opt *opt)
}

/* If an existing entry has invalid connection then release it */
if (entry->conn != NULL && entry->invalidated)
if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
{
elog(DEBUG3, "disconnecting mysql_fdw connection %p for option changes to take effect",
entry->conn);
Expand All @@ -122,6 +140,7 @@ mysql_get_connection(ForeignServer *server, UserMapping *user, mysql_opt *opt)
entry->conn = mysql_connect(opt);
elog(DEBUG3, "new mysql_fdw connection %p for server \"%s\"",
entry->conn, server->servername);
entry->xact_depth = 0;

/*
* Once the connection is established, then set the connection
Expand All @@ -137,6 +156,12 @@ mysql_get_connection(ForeignServer *server, UserMapping *user, mysql_opt *opt)
GetSysCacheHashValue1(USERMAPPINGOID,
ObjectIdGetDatum(user->umid));
}

/*
* Start a new transaction or subtransaction if needed.
*/
mysql_begin_remote_xact(entry);

return entry->conn;
}

Expand Down Expand Up @@ -284,3 +309,208 @@ mysql_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
entry->invalidated = true;
}
}

/*
* Convenience subroutine to issue a non-data-returning SQL command to remote
*/
static void
mysql_do_sql_command(MYSQL *conn, const char *sql, int level)
{
elog(DEBUG3, "do_sql_command %s", sql);

if (mysql_query(conn, sql) != 0)
{
ereport(level,
(errcode(ERRCODE_FDW_ERROR),
errmsg("Failed to execute sql: %s, Error %u: %s\n", sql, mysql_errno(conn), mysql_error(conn))
));
}
}

/*
* Start remote transaction or subtransaction, if needed.
*/
static void
mysql_begin_remote_xact(ConnCacheEntry *entry)
{
int curlevel = GetCurrentTransactionNestLevel();

/* Start main transaction if we haven't yet */
if (entry->xact_depth <= 0)
{
const char *sql = "START TRANSACTION";

elog(DEBUG3, "starting remote transaction on connection %p",
entry->conn);

mysql_do_sql_command(entry->conn, sql, ERROR);
entry->xact_depth = 1;
}

/*
* If we're in a subtransaction, stack up savepoints to match our level.
* This ensures we can rollback just the desired effects when a
* subtransaction aborts.
*/
while (entry->xact_depth < curlevel)
{
const char *sql = psprintf("SAVEPOINT s%d", entry->xact_depth + 1);

mysql_do_sql_command(entry->conn, sql, ERROR);
entry->xact_depth++;
}
}

/*
* mysql_xact_callback --- cleanup at main-transaction end.
*/
static void
mysql_xact_callback(XactEvent event, void *arg)
{
HASH_SEQ_STATUS scan;
ConnCacheEntry *entry;

/* Quick exit if no connections were touched in this transaction. */
if (!xact_got_connection)
return;

elog(DEBUG1, "xact_callback %d", event);

/*
* Scan all connection cache entries to find open remote transactions, and
* close them.
*/
hash_seq_init(&scan, ConnectionHash);
while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
{
/* Ignore cache entry if no open connection right now */
if (entry->conn == NULL)
continue;

/* If it has an open remote transaction, try to close it */
if (entry->xact_depth > 0)
{
elog(DEBUG3, "closing remote transaction on connection %p",
entry->conn);

switch (event)
{
case XACT_EVENT_PARALLEL_PRE_COMMIT:
case XACT_EVENT_PRE_COMMIT:
/* Commit all remote transactions */
mysql_do_sql_command(entry->conn, "COMMIT", ERROR);
break;
case XACT_EVENT_PRE_PREPARE:
/*
* We disallow remote transactions that modified anything,
* since it's not very reasonable to hold them open until
* the prepared transaction is committed. For the moment,
* throw error unconditionally; later we might allow
* read-only cases. Note that the error will cause us to
* come right back here with event == XACT_EVENT_ABORT, so
* we'll clean up the connection state at that point.
*/
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("cannot prepare a transaction that modified remote tables")));
break;
case XACT_EVENT_PARALLEL_COMMIT:
case XACT_EVENT_COMMIT:
case XACT_EVENT_PREPARE:
/* Pre-commit should have closed the open transaction */
elog(ERROR, "missed cleaning up connection during pre-commit");
break;
case XACT_EVENT_PARALLEL_ABORT:
case XACT_EVENT_ABORT:
{
elog(DEBUG3, "abort transaction");
/*
* rollback if in transaction
*/
mysql_do_sql_command(entry->conn, "ROLLBACK", WARNING);
break;
}
}
}

/* Reset state to show we're out of a transaction */
entry->xact_depth = 0;
}
/*
* Regardless of the event type, we can now mark ourselves as out of the
* transaction.
*/
xact_got_connection = false;
}

/*
* mysql_subxact_callback --- cleanup at subtransaction end.
*/
static void
mysql_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
SubTransactionId parentSubid, void *arg)
{
HASH_SEQ_STATUS scan;
ConnCacheEntry *entry;
int curlevel;

/* Nothing to do at subxact start, nor after commit. */
if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
event == SUBXACT_EVENT_ABORT_SUB))
return;

/* Quick exit if no connections were touched in this transaction. */
if (!xact_got_connection)
return;

/*
* Scan all connection cache entries to find open remote subtransactions
* of the current level, and close them.
*/
curlevel = GetCurrentTransactionNestLevel();
hash_seq_init(&scan, ConnectionHash);
while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
{
char sql[100];

/*
* We only care about connections with open remote subtransactions of
* the current level.
*/
if (entry->conn == NULL || entry->xact_depth < curlevel)
continue;

if (entry->xact_depth > curlevel)
elog(ERROR, "missed cleaning up remote subtransaction at level %d",
entry->xact_depth);

if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
{
/* Commit all remote subtransactions during pre-commit */
snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
mysql_do_sql_command(entry->conn, sql, ERROR);
}
else if (in_error_recursion_trouble())
{
/*
* Don't try to clean up the connection if we're already in error
* recursion trouble.
*/
}
else
{
/* Rollback all remote subtransactions during abort */
snprintf(sql, sizeof(sql),
"ROLLBACK TO SAVEPOINT s%d",
curlevel);
mysql_do_sql_command(entry->conn, sql, ERROR);
snprintf(sql, sizeof(sql),
"RELEASE SAVEPOINT s%d",
curlevel);
mysql_do_sql_command(entry->conn, sql, ERROR);
}

/* OK, we're outta that level of subtransaction */
entry->xact_depth--;
}
}
25 changes: 11 additions & 14 deletions deparse.c
Original file line number Diff line number Diff line change
Expand Up @@ -135,12 +135,12 @@ static void mysql_print_remote_param(int paramindex, Oid paramtype,
static void mysql_print_remote_placeholder(Oid paramtype, int32 paramtypmod,
deparse_expr_cxt *context);
static void mysql_deparse_relation(StringInfo buf, Relation rel);
static void mysql_deparse_target_list(StringInfo buf, PlannerInfo *root,
static void mysql_deparse_target_list(StringInfo buf, RangeTblEntry *rte,
Index rtindex, Relation rel,
Bitmapset *attrs_used,
List **retrieved_attrs);
static void mysql_deparse_column_ref(StringInfo buf, int varno, int varattno,
PlannerInfo *root, bool qualify_col);
RangeTblEntry *rte, bool qualify_col);
static void mysql_deparse_select_sql(List *tlist, List **retrieved_attrs,
deparse_expr_cxt *context);
static void mysql_append_conditions(List *exprs, deparse_expr_cxt *context);
Expand Down Expand Up @@ -370,7 +370,7 @@ mysql_deparse_select_sql(List *tlist, List **retrieved_attrs,
rel = table_open(rte->relid, NoLock);
#endif

mysql_deparse_target_list(buf, root, foreignrel->relid, rel,
mysql_deparse_target_list(buf, rte, foreignrel->relid, rel,
fpinfo->attrs_used, retrieved_attrs);

#if PG_VERSION_NUM < 130000
Expand Down Expand Up @@ -449,9 +449,10 @@ mysql_deparse_from_expr(List *quals, deparse_expr_cxt *context)
* The statement text is appended to buf, and we also create an integer List
* of the columns being retrieved by RETURNING (if any), which is returned
* to *retrieved_attrs.
* rte = planner_rt_fetch(rtindex, root)
*/
void
mysql_deparse_insert(StringInfo buf, PlannerInfo *root, Index rtindex,
mysql_deparse_insert(StringInfo buf, RangeTblEntry *rte, Index rtindex,
Relation rel, List *targetAttrs)
{
ListCell *lc;
Expand All @@ -475,7 +476,7 @@ mysql_deparse_insert(StringInfo buf, PlannerInfo *root, Index rtindex,
appendStringInfoString(buf, ", ");
first = false;

mysql_deparse_column_ref(buf, rtindex, attnum, root, false);
mysql_deparse_column_ref(buf, rtindex, attnum, rte, false);
}

appendStringInfoString(buf, ") VALUES (");
Expand Down Expand Up @@ -513,7 +514,7 @@ mysql_deparse_analyze(StringInfo sql, char *dbname, char *relname)
* This is used for both SELECT and RETURNING targetlists.
*/
static void
mysql_deparse_target_list(StringInfo buf, PlannerInfo *root, Index rtindex,
mysql_deparse_target_list(StringInfo buf, RangeTblEntry *rte, Index rtindex,
Relation rel, Bitmapset *attrs_used,
List **retrieved_attrs)
{
Expand Down Expand Up @@ -544,7 +545,7 @@ mysql_deparse_target_list(StringInfo buf, PlannerInfo *root, Index rtindex,
appendStringInfoString(buf, ", ");
first = false;

mysql_deparse_column_ref(buf, rtindex, i, root, false);
mysql_deparse_column_ref(buf, rtindex, i, rte, false);
*retrieved_attrs = lappend_int(*retrieved_attrs, i);
}
}
Expand All @@ -560,19 +561,15 @@ mysql_deparse_target_list(StringInfo buf, PlannerInfo *root, Index rtindex,
*/
static void
mysql_deparse_column_ref(StringInfo buf, int varno, int varattno,
PlannerInfo *root, bool qualify_col)
RangeTblEntry *rte, bool qualify_col)
{
RangeTblEntry *rte;
char *colname = NULL;
List *options;
ListCell *lc;

/* varno must not be any of OUTER_VAR, INNER_VAR and INDEX_VAR. */
Assert(!IS_SPECIAL_VARNO(varno));

/* Get RangeTblEntry from array in PlannerInfo. */
rte = planner_rt_fetch(varno, root);

/*
* If it's a column of a foreign table, and it has the column_name FDW
* option, use that value.
Expand Down Expand Up @@ -832,7 +829,7 @@ mysql_deparse_update(StringInfo buf, PlannerInfo *root, Index rtindex,
appendStringInfoString(buf, ", ");
first = false;

mysql_deparse_column_ref(buf, rtindex, attnum, root, false);
mysql_deparse_column_ref(buf, rtindex, attnum, planner_rt_fetch(rtindex, root), false);
appendStringInfo(buf, " = ?");
pindex++;
}
Expand Down Expand Up @@ -876,7 +873,7 @@ mysql_deparse_var(Var *node, deparse_expr_cxt *context)
{
/* Var belongs to foreign table */
mysql_deparse_column_ref(context->buf, node->varno, node->varattno,
context->root, qualify_col);
planner_rt_fetch(node->varno, context->root), qualify_col);
}
else
{
Expand Down
Loading