diff --git a/python/multicorn/__init__.py b/python/multicorn/__init__.py index 72d8ebb75..639ba672b 100755 --- a/python/multicorn/__init__.py +++ b/python/multicorn/__init__.py @@ -402,6 +402,26 @@ def commit(self): """ pass + def release_before(self): + """ + Hook called just before locks are released after a commit. + """ + pass + + def release(self): + """ + Hook called to release locks after a commit. + """ + pass + + def release_after(self): + """ + Hook called just after locks are released after a commit. + Useful if you need to mess with the database and need to + wait for locks to be released. + """ + pass + def end_scan(self): """ Hook called at the end of a foreign scan. diff --git a/python/multicorn/utils.py b/python/multicorn/utils.py index b6f2749da..57b2f8f03 100644 --- a/python/multicorn/utils.py +++ b/python/multicorn/utils.py @@ -1,8 +1,16 @@ + from logging import ERROR, INFO, DEBUG, WARNING, CRITICAL try: from ._utils import _log_to_postgres from ._utils import check_interrupts + from ._utils import _execute + from ._utils import _prepare + from ._utils import _execute_stmt + from ._utils import _fetch except ImportError as e: + # We really want to know when this happens + # for testing purposes, just ignore the output. + logging.exception("Importing utils") from warnings import warn warn("Not executed in a postgresql server," " disabling log_to_postgres", ImportWarning) @@ -10,6 +18,35 @@ def _log_to_postgres(message, level=0, hint=None, detail=None): pass + def _execute(sql, readonly, count): + raise Exception('Not Connected to Postgres') + + def _prepare(sql, *argv): + """ + _prepare takes a sql statement and a + list of converter objects. The + converter objects define getOID + to get the postgres oid for + that argument and a getdataum method. + The getdatum method returns the + takes two a single object as + an argument (as well as self) and + returns the value of that argument + as something the sql converter + can understand. Currently this is limited + to a string, long, int, or float. So + for example dates and time stamps would + have to be passed in as strings and + the sql would have to convert them. + """ + raise Exception('Not Connected to Postgres') + + def _execute_stmt(stmt, args, converters): + raise Exception('Not Connected to Postgres') + + def _fetch(): + raise Exception('Not Connected to Postgres') + REPORT_CODES = { DEBUG: 0, @@ -19,6 +56,108 @@ def _log_to_postgres(message, level=0, hint=None, detail=None): CRITICAL: 4 } +class StatementException(Exception): + pass + +def _booleanXForm(bl): + if isinstance(bl, basestring): + if bl.lower() in ('0', 'false', 'f', ''): + return 0 + if bl: + return 1 + + return 0 + +class Statement(object): + type_map = None + + simple_converters = { + 'int8': { 'func': lambda x: long(x) }, + 'int4': { 'func': lambda x: int(x) }, + 'float8':{ 'func': lambda x: float(x) }, + 'text': { 'lob': True }, + 'bool': { 'func': _booleanXForm }, + } + + class Converter(object): + converter_map = {} + + def __init__(self, pgtype, **kwargs): + self.pgtype = pgtype + self.lob = kwargs.get('lob', False) + + + def isLob(self): + return self.lob + + def getdatum(self, obj): + return obj + + def getOID(self): + oid = Statement.getOID(self.pgtype); + return oid + + @staticmethod + def converter(*args): + def decorator(cls): + for name in args: + Converter.converter_map[name]=cls + return cls + return decorator + + for c,p in simple_converters.items(): + name="SimpleConverter%s" %c + + d = { + '__init__': lambda s, n=c, p=p: Statement.Converter.__init__(s, n, **p) + } + + if 'func' in p: + d['getdatum'] = lambda s, o, f=p['func']: f(o) + + Converter.converter_map[c] = type(name, (Converter,), d) + + + def __init__(self, sql, pytypes): + self.converters = [] + self.results = None + for t in pytypes: + if t not in Statement.Converter.converter_map: + raise StatementException("Unhandled Data Type %s" %t) + self.converters.append(Statement.Converter.converter_map[t]()) + self.stmt = prepare(sql, self.converters) + + def execute(self, data): + if not isinstance(data, tuple): + data = tuple(data) + + ret = execute_stmt(self.stmt, data, tuple(self.converters)) + self.results = fetch() + return ret + + def getResults(self): + return self.results + + + @staticmethod + def getOID(typname): + # not thread safe. + # XXXXX FIXME, need to consider namespaces. + if Statement.type_map is None: + sql = "select typname, oid::int4 from pg_type;" + res = execute(sql) + types = fetch() + Statement.type_map = {} + for t in types: + Statement.type_map[t['typname']] = t['oid'] + + if typname not in Statement.type_map: + logging.error('type %s not in type_map' %typname) + raise Exception('unknown type(%s) is getoid' %typname) + return Statement.type_map.get(typname, None) + + + def log_to_postgres(message, level=INFO, hint=None, detail=None): code = REPORT_CODES.get(level, None) @@ -26,3 +165,22 @@ def log_to_postgres(message, level=INFO, hint=None, detail=None): raise KeyError("Not a valid log level") _log_to_postgres(message, code, hint=hint, detail=detail) + +def execute(sql, read_only=False, count=0): + return _execute(sql, None, count) + +def prepare(sql, converters): + return _prepare(sql, *converters) + +def execute_stmt(plan, args, converters): + if not isinstance(args, tuple): + args = tuple( a for a in args ) + + if not isinstance(converters, tuple): + converters = tuple (c for c in converters ); + return _execute_stmt(plan, args, converters) + +def fetch(): + return _fetch() + + diff --git a/setup.py b/setup.py index bcd3aa7e4..0ab823b35 100755 --- a/setup.py +++ b/setup.py @@ -15,7 +15,7 @@ def get_pg_config(kind, pg_config="pg_config"): multicorn_utils_module = Extension('multicorn._utils', include_dirs=include_dirs, - extra_compile_args = ['-shared'], + extra_compile_args = ['-shared', '-g'], sources=['src/utils.c']) requires=[] diff --git a/src/errors.c b/src/errors.c index ae1d877fd..3dfd53169 100644 --- a/src/errors.c +++ b/src/errors.c @@ -54,8 +54,8 @@ reportException(PyObject *pErrType, PyObject *pErrValue, PyObject *pErrTraceback errValue = PyString_AsString(PyObject_Str(pErrValue)); if (pErrTraceback != NULL) { - traceback_list = PyObject_CallFunction(format_exception, "(O,O,O)", pErrType, pErrValue, pErrTraceback); - errTraceback = PyString_AsString(PyObject_CallMethod(newline, "join", "(O)", traceback_list)); + traceback_list = PYOBJECT_CALLFUNCTION(format_exception, "(O,O,O)", pErrType, pErrValue, pErrTraceback); + errTraceback = PyString_AsString(PYOBJECT_CALLMETHOD(newline, "join", "(O)", traceback_list)); Py_DECREF(pErrTraceback); Py_DECREF(traceback_list); } diff --git a/src/multicorn.c b/src/multicorn.c index c02d23a76..e1c40b80e 100644 --- a/src/multicorn.c +++ b/src/multicorn.c @@ -1,6 +1,6 @@ /* * The Multicorn Foreign Data Wrapper allows you to fetch foreign data in - * Python in your PostgreSQL server + * Python in your PostgreSQL servner * * This software is released under the postgresql licence * @@ -25,17 +25,16 @@ #include "miscadmin.h" #include "utils/lsyscache.h" #include "utils/rel.h" +#include "utils/resowner.h" #include "parser/parsetree.h" +#include "executor/spi.h" #include "fmgr.h" - PG_MODULE_MAGIC; - extern Datum multicorn_handler(PG_FUNCTION_ARGS); extern Datum multicorn_validator(PG_FUNCTION_ARGS); - PG_FUNCTION_INFO_V1(multicorn_handler); PG_FUNCTION_INFO_V1(multicorn_validator); @@ -103,6 +102,11 @@ static List *multicornImportForeignSchema(ImportForeignSchemaStmt * stmt, static void multicorn_xact_callback(XactEvent event, void *arg); +static void multicorn_release_callback(ResourceReleasePhase phase, + bool isCommit, + bool isTopLevel, + void *arg); + /* Helpers functions */ void *serializePlanState(MulticornPlanState * planstate); MulticornExecState *initializeExecState(void *internal_plan_state); @@ -110,7 +114,6 @@ MulticornExecState *initializeExecState(void *internal_plan_state); /* Hash table mapping oid to fdw instances */ HTAB *InstancesHash; - void _PG_init() { @@ -139,6 +142,11 @@ _PG_init() #if PG_VERSION_NUM >= 90300 RegisterSubXactCallback(multicorn_subxact_callback, NULL); #endif + /* RegisterResourceReleaseBallback dates back to 2004 and is in + the 8.0 release. Not sure if we need a version number + restriction around it. */ + RegisterResourceReleaseCallback(multicorn_release_callback, NULL); + /* Initialize the global oid -> python instances hash */ MemSet(&ctl, 0, sizeof(ctl)); ctl.keysize = sizeof(Oid); @@ -261,6 +269,12 @@ multicornGetForeignRelSize(PlannerInfo *root, Relation rel = RelationIdGetRelation(ftable->relid); AttInMetadata *attinmeta; + if (rel == NULL) + { + ereport(ERROR, (errmsg("Can't get relation for relid=%d ftable oid=%d", + ftable->relid, foreigntableid), errhint("%s", "It's buggy"))); + } + desc = RelationGetDescr(rel); attinmeta = TupleDescGetAttInMetadata(desc); planstate->numattrs = RelationGetNumberOfAttributes(rel); @@ -496,20 +510,19 @@ multicornBeginForeignScan(ForeignScanState *node, int eflags) { ForeignScan *fscan = (ForeignScan *) node->ss.ps.plan; MulticornExecState *execstate; - TupleDesc tupdesc = RelationGetDescr(node->ss.ss_currentRelation); ListCell *lc; execstate = initializeExecState(fscan->fdw_private); - execstate->values = palloc(sizeof(Datum) * tupdesc->natts); - execstate->nulls = palloc(sizeof(bool) * tupdesc->natts); execstate->qual_list = NULL; + execstate->values = NULL; + execstate->nulls = NULL; + foreach(lc, fscan->fdw_exprs) { extractRestrictions(bms_make_singleton(fscan->scan.scanrelid), ((Expr *) lfirst(lc)), &execstate->qual_list); } - initConversioninfo(execstate->cinfos, TupleDescGetAttInMetadata(tupdesc)); node->fdw_state = execstate; } @@ -529,6 +542,63 @@ multicornIterateForeignScan(ForeignScanState *node) MulticornExecState *execstate = node->fdw_state; PyObject *p_value; + Assert(slot != NULL); + + if (1 || execstate->tt_tupleDescriptor != + slot->tts_tupleDescriptor) + { + if (execstate->tt_tupleDescriptor != NULL) + { + if (errstart(WARNING, __FILE__, + __LINE__, PG_FUNCNAME_MACRO, + TEXTDOMAIN)) + { + errmsg("tupleDescriptor Changed"); + errdetail("Reallocing and reintializec cinfo struct may be a performance hit."); + } + errfinish(0); + } + + + execstate->tt_tupleDescriptor = slot->tts_tupleDescriptor; + + // This is becoming corrupted. What happens if we realloc it every time? + execstate->cinfos = NULL; + + if (execstate->cinfos != NULL) { + execstate->cinfos = repalloc(execstate->cinfos, + sizeof(ConversionInfo *) * + slot->tts_tupleDescriptor->natts); + } else { + execstate->cinfos = palloc(sizeof(ConversionInfo *) * + slot->tts_tupleDescriptor->natts); + } + memset(execstate->cinfos, + 0, + sizeof(ConversionInfo *) * + slot->tts_tupleDescriptor->natts); + if (execstate->values != NULL) { + execstate->values = repalloc(execstate->values, + sizeof(Datum) * + slot->tts_tupleDescriptor->natts); + } else { + execstate->values = palloc(sizeof(Datum) * + slot->tts_tupleDescriptor->natts); + } + + + if (execstate->nulls != NULL) { + execstate->nulls = repalloc(execstate->nulls, + sizeof(bool) * + slot->tts_tupleDescriptor->natts); + } else { + execstate->nulls = palloc(sizeof(bool) * + slot->tts_tupleDescriptor->natts); + } + initConversioninfo(execstate->cinfos, + TupleDescGetAttInMetadata(execstate->tt_tupleDescriptor)); + } + if (execstate->p_iterator == NULL) { execute(node, NULL); @@ -548,8 +618,8 @@ multicornIterateForeignScan(ForeignScanState *node) Py_XDECREF(p_value); return slot; } - slot->tts_values = execstate->values; - slot->tts_isnull = execstate->nulls; + //slot->tts_values = execstate->values; + //slot->tts_isnull = execstate->nulls; pythonResultToTuple(p_value, slot, execstate->cinfos, execstate->buffer); ExecStoreVirtualTuple(slot); Py_DECREF(p_value); @@ -573,6 +643,8 @@ multicornReScanForeignScan(ForeignScanState *node) } } + + /* * multicornEndForeignScan * Finish scanning foreign table and dispose objects used for this scan. @@ -581,7 +653,7 @@ static void multicornEndForeignScan(ForeignScanState *node) { MulticornExecState *state = node->fdw_state; - PyObject *result = PyObject_CallMethod(state->fdw_instance, "end_scan", "()"); + PyObject *result = PYOBJECT_CALLMETHOD(state->fdw_instance, "end_scan", "()"); errorCheck(); Py_DECREF(result); @@ -603,14 +675,13 @@ multicornAddForeignUpdateTargets(Query *parsetree, Relation target_relation) { Var *var = NULL; - TargetEntry *tle, - *returningTle; + TargetEntry *tle, *returningTle; PyObject *instance = getInstance(target_relation->rd_id); const char *attrname = getRowIdColumn(instance); TupleDesc desc = target_relation->rd_att; int i; ListCell *cell; - + foreach(cell, parsetree->returningList) { returningTle = lfirst(cell); @@ -673,10 +744,10 @@ multicornPlanForeignModify(PlannerInfo *root, */ static void multicornBeginForeignModify(ModifyTableState *mtstate, - ResultRelInfo *resultRelInfo, - List *fdw_private, - int subplan_index, - int eflags) + ResultRelInfo *resultRelInfo, + List *fdw_private, + int subplan_index, + int eflags) { MulticornModifyState *modstate = palloc0(sizeof(MulticornModifyState)); Relation rel = resultRelInfo->ri_RelationDesc; @@ -724,6 +795,7 @@ multicornBeginForeignModify(ModifyTableState *mtstate, * Execute a foreign insert operation * This is done by calling the python "insert" method. */ + static TupleTableSlot * multicornExecForeignInsert(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot) @@ -731,11 +803,14 @@ multicornExecForeignInsert(EState *estate, ResultRelInfo *resultRelInfo, MulticornModifyState *modstate = resultRelInfo->ri_FdwState; PyObject *fdw_instance = modstate->fdw_instance; PyObject *values = tupleTableSlotToPyObject(slot, modstate->cinfos); - PyObject *p_new_value = PyObject_CallMethod(fdw_instance, "insert", "(O)", values); + PyObject *p_new_value = PYOBJECT_CALLMETHOD(fdw_instance, "insert", "(O)", values); errorCheck(); if (p_new_value && p_new_value != Py_None) { + // XXXXXX FIXME: If there is no result tuple, this assumes + // that the given slot matches the table. + // This does not appear to be the case. ExecClearTuple(slot); pythonResultToTuple(p_new_value, slot, modstate->cinfos, modstate->buffer); ExecStoreVirtualTuple(slot); @@ -765,7 +840,7 @@ multicornExecForeignDelete(EState *estate, ResultRelInfo *resultRelInfo, Datum value = ExecGetJunkAttribute(planSlot, modstate->rowidAttno, &is_null); p_row_id = datumToPython(value, cinfo->atttypoid, cinfo); - p_new_value = PyObject_CallMethod(fdw_instance, "delete", "(O)", p_row_id); + p_new_value = PYOBJECT_CALLMETHOD(fdw_instance, "delete", "(O)", p_row_id); errorCheck(); if (p_new_value == NULL || p_new_value == Py_None) { @@ -801,7 +876,7 @@ multicornExecForeignUpdate(EState *estate, ResultRelInfo *resultRelInfo, Datum value = ExecGetJunkAttribute(planSlot, modstate->rowidAttno, &is_null); p_row_id = datumToPython(value, cinfo->atttypoid, cinfo); - p_new_value = PyObject_CallMethod(fdw_instance, "update", "(O,O)", p_row_id, + p_new_value = PYOBJECT_CALLMETHOD(fdw_instance, "update", "(O,O)", p_row_id, p_value); errorCheck(); if (p_new_value != NULL && p_new_value != Py_None) @@ -825,7 +900,7 @@ multicornEndForeignModify(EState *estate, ResultRelInfo *resultRelInfo) { MulticornModifyState *modstate = resultRelInfo->ri_FdwState; - PyObject *result = PyObject_CallMethod(modstate->fdw_instance, "end_modify", "()"); + PyObject *result = PYOBJECT_CALLMETHOD(modstate->fdw_instance, "end_modify", "()"); errorCheck(); Py_DECREF(modstate->fdw_instance); @@ -860,11 +935,11 @@ multicorn_subxact_callback(SubXactEvent event, SubTransactionId mySubid, instance = entry->value; if (event == SUBXACT_EVENT_PRE_COMMIT_SUB) { - PyObject_CallMethod(instance, "sub_commit", "(i)", curlevel); + PYOBJECT_CALLMETHOD(instance, "sub_commit", "(i)", curlevel); } else { - PyObject_CallMethod(instance, "sub_rollback", "(i)", curlevel); + PYOBJECT_CALLMETHOD(instance, "sub_rollback", "(i)", curlevel); } errorCheck(); entry->xact_depth--; @@ -881,7 +956,7 @@ multicorn_xact_callback(XactEvent event, void *arg) PyObject *instance; HASH_SEQ_STATUS status; CacheEntry *entry; - + hash_seq_init(&status, InstancesHash); while ((entry = (CacheEntry *) hash_seq_search(&status)) != NULL) { @@ -893,15 +968,15 @@ multicorn_xact_callback(XactEvent event, void *arg) { #if PG_VERSION_NUM >= 90300 case XACT_EVENT_PRE_COMMIT: - PyObject_CallMethod(instance, "pre_commit", "()"); + PYOBJECT_CALLMETHOD(instance, "pre_commit", "()"); break; #endif case XACT_EVENT_COMMIT: - PyObject_CallMethod(instance, "commit", "()"); + PYOBJECT_CALLMETHOD(instance, "commit", "()"); entry->xact_depth = 0; break; case XACT_EVENT_ABORT: - PyObject_CallMethod(instance, "rollback", "()"); + PYOBJECT_CALLMETHOD(instance, "rollback", "()"); entry->xact_depth = 0; break; default: @@ -911,6 +986,57 @@ multicorn_xact_callback(XactEvent event, void *arg) } } +/* + * Callback for after commit to relase any locks or other + * resources. Key thing is locks have already been released. + * This allows updates/inserts back to postgres without worrying + * about locks. Can be a big performance win in a few + * corner cases. + */ +static void +multicorn_release_callback(ResourceReleasePhase phase, bool isCommit, + bool isTopLevel, void *arg) { + PyObject *instance; + HASH_SEQ_STATUS status; + CacheEntry *entry; + + if (!isTopLevel) { + return; + } + + hash_seq_init(&status, InstancesHash); + while ((entry = (CacheEntry *) hash_seq_search(&status)) != NULL) + { + instance = entry->value; + if (entry->xact_depth == 0) + continue; + + switch (phase) + { + /* XXXXX FIXME: + * Do we want to pass isCommit? + */ + case RESOURCE_RELEASE_BEFORE_LOCKS: + PYOBJECT_CALLMETHOD(instance, "release_before", "()"); + break; + + case RESOURCE_RELEASE_LOCKS: + PYOBJECT_CALLMETHOD(instance, "release", "()"); + break; + + case RESOURCE_RELEASE_AFTER_LOCKS: + PYOBJECT_CALLMETHOD(instance, "release_after", "()"); + break; + + default: + break; + } + errorCheck(); + } + +} + + #if PG_VERSION_NUM >= 90500 static List * multicornImportForeignSchema(ImportForeignSchemaStmt * stmt, @@ -985,7 +1111,7 @@ multicornImportForeignSchema(ImportForeignSchemaStmt * stmt, Py_DECREF(p_tablename); } errorCheck(); - p_tables = PyObject_CallMethod(p_class, "import_schema", "(s, O, O, s, O)", + p_tables = PYOBJECT_CALLMETHOD(p_class, "import_schema", "(s, O, O, s, O)", stmt->remote_schema, p_srv_options, p_options, restrict_type, p_restrict_list); errorCheck(); @@ -1000,8 +1126,9 @@ multicornImportForeignSchema(ImportForeignSchemaStmt * stmt, PyObject *p_string; char *value; - p_string = PyObject_CallMethod(p_item, "to_statement", "(s,s)", - stmt->local_schema, f_server->servername); + p_string = PYOBJECT_CALLMETHOD(p_item, "to_statement", "(s,s)", + stmt->local_schema, + f_server->servername); errorCheck(); value = PyString_AsString(p_string); errorCheck(); @@ -1046,7 +1173,6 @@ initializeExecState(void *internalstate) { MulticornExecState *execstate = palloc0(sizeof(MulticornExecState)); List *values = (List *) internalstate; - AttrNumber attnum = ((Const *) linitial(values))->constvalue; Oid foreigntableid = ((Const *) lsecond(values))->constvalue; List *pathkeys; @@ -1057,8 +1183,67 @@ initializeExecState(void *internalstate) execstate->pathkeys = deserializeDeparsedSortGroup(pathkeys); execstate->fdw_instance = getInstance(foreigntableid); execstate->buffer = makeStringInfo(); - execstate->cinfos = palloc0(sizeof(ConversionInfo *) * attnum); - execstate->values = palloc(attnum * sizeof(Datum)); - execstate->nulls = palloc(attnum * sizeof(bool)); + execstate->cinfos = NULL; + execstate->values = NULL; + execstate->nulls = NULL; + execstate->tt_tupleDescriptor = NULL; return execstate; } + + +static int multicorn_SPI_depth = 0; +static bool multicorn_SPI_connected = false; + +void +multicorn_connect(void) { + + if (multicorn_SPI_depth == 0) + { + if (errstart(FATAL, __FILE__, __LINE__, + PG_FUNCNAME_MACRO, TEXTDOMAIN)) + { + errmsg("Attempting to connect to SPI without wrapper"); + errfinish(0); + } + return; + + } + + if (!multicorn_SPI_connected) + { + /* if (SPI_connect() != SPI_OK_CONNECT) */ + /* { */ + /* if (errstart(FATAL, __FILE__, __LINE__, */ + /* PG_FUNCNAME_MACRO, TEXTDOMAIN)) */ + /* { */ + /* errmsg("Unable to connect to SPI"); */ + /* errfinish(0); */ + /* } */ + /* return; */ + /* } */ + multicorn_SPI_connected = true; + } +} + +/* + * Pass through the pyobject so we can easily + * have a macro call this for every + * PyObject_CallMethod and PyObject_CallFunction + * call. + */ +PyObject * +multicorn_spi_leave(PyObject *po) { + + if (--multicorn_SPI_depth == 0 && multicorn_SPI_connected) + { + multicorn_SPI_connected = false; + //SPI_finish(); + } + return po; +} + +PyObject * +multicorn_spi_enter(PyObject *po) { + multicorn_SPI_depth++; + return po; +} diff --git a/src/multicorn.h b/src/multicorn.h index 8022cf30b..548d3f377 100644 --- a/src/multicorn.h +++ b/src/multicorn.h @@ -96,6 +96,7 @@ typedef struct MulticornExecState AttrNumber rowidAttno; char *rowidAttrName; List *pathkeys; /* list of MulticornDeparsedSortGroup) */ + TupleDesc tt_tupleDescriptor; } MulticornExecState; typedef struct MulticornModifyState @@ -213,6 +214,15 @@ PyObject *datumToPython(Datum node, Oid typeoid, ConversionInfo * cinfo); List *serializeDeparsedSortGroup(List *pathkeys); List *deserializeDeparsedSortGroup(List *items); +/* pass through the PyObject to make the macros easier. */ +extern PyObject *multicorn_spi_enter(PyObject *); +extern PyObject *multicorn_spi_leave(PyObject *); +extern void multicorn_connect(void); + +#define PYOBJECT_CALLMETHOD(po, ...) multicorn_spi_leave(PyObject_CallMethod(multicorn_spi_enter(po),__VA_ARGS__)) + +#define PYOBJECT_CALLFUNCTION(po, ...) multicorn_spi_leave(PyObject_CallFunction(multicorn_spi_enter(po), __VA_ARGS__)) + #endif /* PG_MULTICORN_H */ char *PyUnicode_AsPgString(PyObject *p_unicode); @@ -221,6 +231,6 @@ char *PyUnicode_AsPgString(PyObject *p_unicode); PyObject *PyString_FromString(const char *s); PyObject *PyString_FromStringAndSize(const char *s, Py_ssize_t size); char *PyString_AsString(PyObject *unicode); -int PyString_AsStringAndSize(PyObject *unicode, char **tempbuffer, Py_ssize_t *length); +int PyString_AsStringAndSize(PyObject *unicode, char **tempbuffer, Py_ssize_t *length); #endif diff --git a/src/python.c b/src/python.c index 394236f91..8ff55f475 100644 --- a/src/python.c +++ b/src/python.c @@ -205,7 +205,7 @@ PyObject * getClass(PyObject *className) { PyObject *p_multicorn = PyImport_ImportModule("multicorn"), - *p_class = PyObject_CallMethod(p_multicorn, "get_class", "(O)", + *p_class = PYOBJECT_CALLMETHOD(p_multicorn, "get_class", "(O)", className); errorCheck(); @@ -451,7 +451,7 @@ getColumnsFromTable(TupleDesc desc, PyObject **p_columns, List **columns) *p_collections = PyImport_ImportModule("collections"), *p_dictclass = PyObject_GetAttrString(p_collections, "OrderedDict"); - columns_dict = PyObject_CallFunction(p_dictclass, "()"); + columns_dict = PYOBJECT_CALLFUNCTION(p_dictclass, "()"); for (i = 0; i < desc->natts; i++) { @@ -468,7 +468,7 @@ getColumnsFromTable(TupleDesc desc, PyObject **p_columns, List **columns) List *options = GetForeignColumnOptions(att->attrelid, att->attnum); PyObject *p_options = optionsListToPyDict(options); - PyObject *column = PyObject_CallFunction(p_columnclass, + PyObject *column = PYOBJECT_CALLFUNCTION(p_columnclass, "(s,i,i,s,s,O)", key, typOid, @@ -619,7 +619,7 @@ getCacheEntry(Oid foreigntableid) entry->value = NULL; getColumnsFromTable(desc, &p_columns, &columns); PyDict_DelItemString(p_options, "wrapper"); - p_instance = PyObject_CallFunction(p_class, "(O,O)", p_options, + p_instance = PYOBJECT_CALLFUNCTION(p_class, "(O,O)", p_options, p_columns); errorCheck(); /* Cleanup the old context, containing the old columns and options */ @@ -678,7 +678,7 @@ begin_remote_xact(CacheEntry * entry) /* Start main transaction if we haven't yet */ if (entry->xact_depth <= 0) { - rv = PyObject_CallMethod(entry->value, "begin", "(i)", IsolationIsSerializable()); + rv = PYOBJECT_CALLMETHOD(entry->value, "begin", "(i)", IsolationIsSerializable()); Py_XDECREF(rv); errorCheck(); entry->xact_depth = 1; @@ -687,7 +687,7 @@ begin_remote_xact(CacheEntry * entry) while (entry->xact_depth < curlevel) { entry->xact_depth++; - rv = PyObject_CallMethod(entry->value, "sub_begin", "(i)", entry->xact_depth); + rv = PYOBJECT_CALLMETHOD(entry->value, "sub_begin", "(i)", entry->xact_depth); Py_XDECREF(rv); errorCheck(); } @@ -715,7 +715,7 @@ getRelSize(MulticornPlanState * state, p_targets_set = valuesToPySet(state->target_list); p_quals = qualDefsToPyList(state->qual_list, state->cinfos); - p_rows_and_width = PyObject_CallMethod(state->fdw_instance, "get_rel_size", + p_rows_and_width = PYOBJECT_CALLMETHOD(state->fdw_instance, "get_rel_size", "(O,O)", p_quals, p_targets_set); errorCheck(); Py_DECREF(p_targets_set); @@ -816,7 +816,7 @@ pythonQual(char *operatorname, } columnName = PyUnicode_Decode(cinfo->attrname, strlen(cinfo->attrname), getPythonEncodingName(), NULL); - qualInstance = PyObject_CallFunction(qualClass, "(O,O,O)", + qualInstance = PYOBJECT_CALLFUNCTION(qualClass, "(O,O,O)", columnName, operator, value); @@ -853,7 +853,7 @@ getSortKey(MulticornDeparsedSortGroup *key) } else p_collate = PyUnicode_Decode(NameStr(*(key->collate)), strlen(NameStr(*(key->collate))), getPythonEncodingName(), NULL); - SortKeyInstance = PyObject_CallFunction(SortKeyClass, "(O,i,O,O,O)", + SortKeyInstance = PYOBJECT_CALLFUNCTION(SortKeyClass, "(O,i,O,O,O)", p_attname, key->attnum, p_reversed, @@ -1127,7 +1127,7 @@ pydateToCString(PyObject *pyobject, StringInfo buffer, Py_ssize_t strlength = 0; PyObject *formatted_date; - formatted_date = PyObject_CallMethod(pyobject, "isoformat", "()"); + formatted_date = PYOBJECT_CALLMETHOD(pyobject, "isoformat", "()"); PyString_AsStringAndSize(formatted_date, &tempbuffer, &strlength); appendBinaryStringInfo(buffer, tempbuffer, strlength); Py_DECREF(formatted_date); @@ -1216,10 +1216,48 @@ pythonDictToTuple(PyObject *p_value, p_object = PyMapping_GetItemString(p_value, key); if (p_object != NULL && p_object != Py_None) { + /* attr->attypid 0 seems to flag a junk column + such as .....pg.droped.xxx.... */ + if(key == NULL || + attr->atttypid == 0 || + attr->attisdropped != 0 || + strcmp(key, attr->attname.data) != 0) + { + if (key == NULL) + { + key="NULL"; + } + if (errstart(ERROR, + __FILE__, + __LINE__, + PG_FUNCNAME_MACRO, + TEXTDOMAIN)) + { + errmsg("Bad Attribute in multicorn"); + errhint("Multicorn needs to be fixed"); + errdetail("attr->atttypid=%d, attr->attlen=%d, attr->attisdropped=%d, attr->attname=%s key=%s", + attr->atttypid, attr->attlen, + attr->attisdropped, + attr->attname.data, key); + errfinish(0); + } + + } + if (errstart(INFO, + __FILE__, + __LINE__, + PG_FUNCNAME_MACRO, + TEXTDOMAIN)) + { + errmsg("Multicorn: Found %s in dict.", key); + errhint("attr->attname.data=%s", + attr->attname.data); + errfinish(0); + } resetStringInfo(buffer); values[i] = pyobjectToDatum(p_object, - buffer, - cinfos[cinfo_idx]); + buffer, + cinfos[cinfo_idx]); if (buffer->data == NULL) { nulls[i] = true; @@ -1233,6 +1271,17 @@ pythonDictToTuple(PyObject *p_value, { /* "KeyError", doesnt matter. */ PyErr_Clear(); + if (errstart(INFO, + __FILE__, + __LINE__, + PG_FUNCNAME_MACRO, + TEXTDOMAIN)) + { + errmsg("Multicorn: Didn't find %s in dict.", key); + errhint("attr->attname.data=%s", + attr->attname.data); + errfinish(0); + } values[i] = (Datum) NULL; nulls[i] = true; } @@ -1551,7 +1600,7 @@ pathKeys(MulticornPlanState * state) PyObject *fdw_instance = state->fdw_instance, *p_pathkeys; - p_pathkeys = PyObject_CallMethod(fdw_instance, "get_path_keys", "()"); + p_pathkeys = PYOBJECT_CALLMETHOD(fdw_instance, "get_path_keys", "()"); errorCheck(); for (i = 0; i < PySequence_Length(p_pathkeys); i++) { @@ -1628,7 +1677,7 @@ canSort(MulticornPlanState * state, List *deparsed) Py_DECREF(python_sortkey); } - p_sortable = PyObject_CallMethod(fdw_instance, "can_sort", "(O)", p_pathkeys); + p_sortable = PYOBJECT_CALLMETHOD(fdw_instance, "can_sort", "(O)", p_pathkeys); errorCheck(); for (i = 0; i < PySequence_Length(p_sortable); i++) { @@ -1689,9 +1738,11 @@ getRowIdColumn(PyObject *fdw_instance) char *result; errorCheck(); - if (value == Py_None) + if (value == Py_None || value == NULL) { - Py_DECREF(value); + if (value != NULL) { + Py_DECREF(value); + } elog(ERROR, "This FDW does not support the writable API"); } result = PyString_AsString(value); diff --git a/src/query.c b/src/query.c index dd8997e41..3a11220d5 100644 --- a/src/query.c +++ b/src/query.c @@ -66,7 +66,6 @@ extractColumns(List *reltargetlist, List *restrictinfolist) { ListCell *lc; List *columns = NULL; - int i = 0; foreach(lc, reltargetlist) { @@ -82,7 +81,6 @@ extractColumns(List *reltargetlist, List *restrictinfolist) PVC_RECURSE_PLACEHOLDERS); #endif columns = list_union(columns, targetcolumns); - i++; } foreach(lc, restrictinfolist) { diff --git a/src/utils.c b/src/utils.c index 885dad720..24b691d95 100644 --- a/src/utils.c +++ b/src/utils.c @@ -12,17 +12,23 @@ * *------------------------------------------------------------------------- */ +#include #include + #include "postgres.h" #include "multicorn.h" #include "miscadmin.h" - +#include "executor/spi.h" struct module_state { PyObject *error; }; +/* Used to name the PyCapsule type */ +#define STATEMENT_NAME "prepared statement" + + #if PY_MAJOR_VERSION >= 3 #define GETSTATE(m) ((struct module_state*)PyModule_GetState(m)) #else @@ -30,41 +36,64 @@ struct module_state static struct module_state _state; #endif +static void mylog(const char *m, const char *h, const char *d, + const char *f, int line, + const char *fn) { + + /* FILE *fp = fopen("/tmp/mylog", "a"); */ + /* fprintf(fp, "mylog(%s, %s, %s, %s, %d, %s)\n", m, h, d, f, line, fn); */ + /* fclose(fp); */ + + if (errstart(WARNING, f, line, fn, TEXTDOMAIN)) + { + errmsg("%s", m); + if (h != NULL) + { + errhint("%s", h); + } + if (d != NULL) + { + errdetail("%s", d); + } + errfinish(0); + } +} + static PyObject * log_to_postgres(PyObject *self, PyObject *args, PyObject *kwargs) { char *message = NULL; char *hintstr = NULL, - *detailstr = NULL; + *detailstr = NULL; int level = 1; int severity; PyObject *hint, - *p_message, - *detail; + *p_message, + *detail; if (!PyArg_ParseTuple(args, "O|i", &p_message, &level)) { - errorCheck(); - Py_INCREF(Py_None); - return Py_None; + errorCheck(); + Py_INCREF(Py_None); + return Py_None; } if (PyBytes_Check(p_message)) { - message = PyBytes_AsString(p_message); + message = PyBytes_AsString(p_message); } else if (PyUnicode_Check(p_message)) { - message = strdup(PyUnicode_AsPgString(p_message)); + message = strdup(PyUnicode_AsPgString(p_message)); } else { - PyObject *temp = PyObject_Str(p_message); + PyObject *temp = PyObject_Str(p_message); - errorCheck(); - message = strdup(PyString_AsString(temp)); - errorCheck(); - Py_DECREF(temp); + errorCheck(); + message = strdup(PyString_AsString(temp)); + errorCheck(); + Py_DECREF(temp); } switch (level) { @@ -87,29 +116,30 @@ log_to_postgres(PyObject *self, PyObject *args, PyObject *kwargs) severity = INFO; break; } + hint = PyDict_GetItemString(kwargs, "hint"); detail = PyDict_GetItemString(kwargs, "detail"); if (errstart(severity, __FILE__, __LINE__, PG_FUNCNAME_MACRO, TEXTDOMAIN)) { - errmsg("%s", message); - if (hint != NULL && hint != Py_None) - { - hintstr = PyString_AsString(hint); - errhint("%s", hintstr); - } - if (detail != NULL && detail != Py_None) - { - detailstr = PyString_AsString(detail); - errdetail("%s", detailstr); - } - Py_DECREF(args); - Py_DECREF(kwargs); - errfinish(0); + errmsg("%s", message); + if (hint != NULL && hint != Py_None) + { + hintstr = PyString_AsString(hint); + errhint("%s", hintstr); + } + if (detail != NULL && detail != Py_None) + { + detailstr = PyString_AsString(detail); + errdetail("%s", detailstr); + } + Py_DECREF(args); + Py_DECREF(kwargs); + errfinish(0); } else { - Py_DECREF(args); - Py_DECREF(kwargs); + Py_DECREF(args); + Py_DECREF(kwargs); } Py_INCREF(Py_None); return Py_None; @@ -124,9 +154,569 @@ py_check_interrupts(PyObject *self, PyObject *args, PyObject *kwargs) } +#define MYLOG(m, h, d) mylog(m, h, d, __FILE__, __LINE__, PG_FUNCNAME_MACRO) +#define VLOG(...) do { snprintf(logbuff, sizeof(logbuff), __VA_ARGS__); MYLOG(logbuff, NULL, NULL); } while (0) + +static PyObject * +py_fetch(PyObject *self, PyObject *args) +{ + PyObject *pret = NULL; + const size_t row_count = SPI_processed; + const size_t natts = SPI_tuptable != NULL ? SPI_tuptable->tupdesc->natts:0; + size_t i = 0; + int j = 0; + AttInMetadata *attinmeta = NULL; + ConversionInfo **cinfos = NULL; + char logbuff[200]; + + /* We don't want any args, so ignore them. */ + if (args != NULL) + { + Py_DECREF(args); + args = NULL; + VLOG("Arguments passed to py_fetch"); + } + /* + return none if there are no tuples. + This differs from returning an + empty list if no rows were + returned. + */ + + if (SPI_tuptable == NULL) + { + //VLOG("No SPI_tuple"); + goto errout; + } + + attinmeta = TupleDescGetAttInMetadata(SPI_tuptable->tupdesc); + if (attinmeta == NULL) + { + VLOG("TupleDescGetAttInMetadata returned NULL"); + + goto errout; + + } + + cinfos = SPI_palloc(sizeof(ConversionInfo *) * natts); + if (cinfos == NULL) + { + VLOG("SPI_palloc returned NULL"); + goto errout; + + } + + initConversioninfo(cinfos, attinmeta); + pret = PyTuple_New(row_count); + + if (pret == NULL) + { + goto errout; + } + + for (i = 0; i < row_count; ++i) + { + PyObject *rdict = PyDict_New(); + for (j = 0; j < natts; ++j) { + Form_pg_attribute attr = TupleDescAttr(SPI_tuptable->tupdesc,j); + PyObject *pobj = NULL; + bool isnull = false; + Datum d = SPI_getbinval(SPI_tuptable->vals[i], + SPI_tuptable->tupdesc, + j+1, &isnull); + errorCheck(); + + if (isnull == true) + { + Py_INCREF(Py_None); + pobj = Py_None; + } + else + { + pobj = datumToPython(d, cinfos[attr->attnum - 1]->atttypoid, + cinfos[attr->attnum - 1]); + if (pobj == NULL) + { + Py_DECREF(rdict); + rdict = NULL; + goto errout; + + } + } + PyDict_SetItemString(rdict, cinfos[attr->attnum - 1]->attrname, + pobj); + Py_DECREF(pobj); + pobj = NULL; + } + + PyTuple_SetItem(pret, i, rdict); + /* PyTupe_SetItem steals the reference, + so we don't need to drop our reference + to rdict. + */ + rdict = NULL; + } + out: + // Just in case we misse one somewhere. + errorCheck(); + + if (attinmeta != NULL) + { + //SPI_pfree(attinmeta); + attinmeta = NULL; + } + + if (cinfos != NULL) + { + //SPI_pfree(cinfos); + cinfos = NULL; + } + + return pret; + + errout: + if (pret != NULL) + { + Py_DECREF(pret); + pret = NULL; + } + + Py_INCREF(Py_None); + pret = Py_None; + goto out; +} + +static PyObject * +py_execute_stmt(PyObject *self, PyObject *args, PyObject *kwargs) +{ + PyObject *stmt_object = NULL; + SPIPlanPtr stmt = NULL; + PyObject *pret = NULL; + PyObject *pobj = NULL; + PyObject *converter = NULL; + PyObject *sqlargs_obj = NULL; + PyObject *converters_obj = NULL; + Py_ssize_t arg_count = 0; + int i = 0; + int iret = 0; + bool read_only = false; + char *nulls = NULL; + Datum *stmt_args = NULL; + + char logbuff[200]; + + VLOG("In py_execute_stt"); + + if (!PyArg_ParseTuple(args, "OO!O!", &stmt_object, + &PyTuple_Type, &sqlargs_obj, + &PyTuple_Type, &converters_obj)) + { + VLOG("execute_stmt: ParseTuple Failed"); + goto errout; + } + + stmt = PyCapsule_GetPointer(stmt_object, STATEMENT_NAME); + + if (stmt == NULL) + { + VLOG("execute_smt: stmt is NULL"); + goto errout; + } + + arg_count = PyTuple_Size(sqlargs_obj); + + multicorn_connect(); + + if (arg_count != PyTuple_Size(converters_obj) || + arg_count != SPI_getargcount(stmt)) + { + /* XXXXX FIXME: throw arg count mismatch exception. */ + VLOG("execute_smt: arg_count mismatch"); + goto errout; + } + + + /* The extra 1 is so we can null terminate it and treat it like a cstring. */ + nulls = (char *)SPI_palloc(sizeof(*nulls)*(arg_count+1)); + if (nulls == NULL) + { + VLOG("Allocating %ld chars for nulls failed.", arg_count+1); + goto errout; + } + + memset (nulls, ' ', (arg_count+1)*sizeof(*nulls)); + nulls[arg_count] = 0; + + stmt_args = (Datum *)SPI_palloc(sizeof(*stmt_args)*arg_count); + if (stmt_args == NULL) + { + VLOG("Unable to allocate memory for stmt_args"); + goto errout; + } + + memset (stmt_args, 0, arg_count*sizeof(*stmt_args)); + + for (i = 0; i < arg_count; i++) + { + PyObject *islob_obj = NULL; + bool islob = false; + + /* This pobj is borrowed. + No need to decref. + */ + pobj = PyTuple_GetItem(sqlargs_obj, i); + + if (pobj == NULL || pobj == Py_None) + { + //VLOG("arg %d is NULL", i); + nulls[i] = 'n'; + continue; + } + + converter = PyTuple_GetItem(converters_obj, i); + if (converter == NULL || converter == Py_None) + { + VLOG("arg %d is No Converter", i); + goto errout; + } + + /* Don't use the macro here since we + are in the middle of a pg call. + islob_obj needs to bre decrefed. + */ + islob_obj = PyObject_CallMethod(converter, "isLob", "()"); + + if (islob_obj == NULL) + { + VLOG("arg %d No islob", i); + goto errout; + } + + islob = PyObject_IsTrue(islob_obj); + + Py_DECREF(islob_obj); + islob_obj = NULL; + + /* From here on out, pobj needs to be decrefed. */ + pobj = PyObject_CallMethod(converter, "getdatum", "(O)", pobj); + + if (PyUnicode_Check(pobj)) + { + PyObject *ptmp = PyUnicode_AsEncodedString(pobj, + getPythonEncodingName(), + NULL); + if (ptmp == NULL) { + VLOG("Unable to convert unicode"); + Py_DECREF(pobj); + pobj = NULL; + goto errout; + } + + //VLOG("unicode encoded to %s", PyString_AsString(ptmp)); + Py_DECREF(pobj); + pobj = ptmp; + } + + if (PyString_Check(pobj)) + { + //VLOG("arg %d is a string islob=%d", i, islob); + if (islob) + { + stmt_args[i] = DirectFunctionCall1(textin, + CStringGetDatum(pstrdup(PyString_AsString(pobj)))); + } + else + { + stmt_args[i]=CStringGetDatum(pstrdup(PyString_AsString(pobj))); + } + + if ((unsigned)(stmt_args[i]) == 0) + { + VLOG("Unable to get string"); + Py_DECREF(pobj); + pobj = NULL; + goto errout; + } + + } + else if (PyLong_Check(pobj)) + { + //VLOG("arg %d is long", i); + stmt_args[i] = Int64GetDatum(PyLong_AsLong(pobj)); + } + else if (PyInt_Check(pobj)) + { + //VLOG("arg %d is int", i); + stmt_args[i] = Int32GetDatum(PyInt_AsLong(pobj)); + } + else if (PyFloat_Check(pobj)) + { + //VLOG("arg %d is float", i); + stmt_args[i] = Float8GetDatum(PyFloat_AsDouble(pobj)); + } + else + { + VLOG("Unknown python type in execute"); + Py_DECREF(pobj); + pobj = NULL; + goto errout; + } + + Py_DECREF(pobj); + pobj = NULL; + } + + iret = SPI_execute_plan(stmt, stmt_args, nulls, + read_only, 0); + + VLOG("SPI_execute_plan returned %d", iret); + + pret = PyInt_FromLong(iret); + if (pret == NULL) + { + VLOG("PyInt_FromLong returned NULL"); + goto errout; + } + + out: + if (stmt_args != NULL) + { + //SPI_pfree(stmt_args); + stmt_args = NULL; + } + + if (nulls != NULL) + { + //SPI_pfree(nulls); + nulls = NULL; + } + + if (args != NULL && args != Py_None) + { + Py_DECREF(args); + args = NULL; + } + + if (kwargs != NULL && kwargs != Py_None) + { + Py_DECREF(kwargs); + kwargs = NULL; + } + + return pret; + + errout: + VLOG("execute_stmt errout"); + Py_INCREF(Py_None); + pret = Py_None; + goto out; +} + +static PyObject * +py_execute(PyObject *self, PyObject *args, PyObject *kwargs) +{ + char *sql = NULL; + bool read_only=false; + PyObject *read_only_object = NULL; + int count=0; + PyObject *pret = NULL; + int iret = 0; + + + if (!PyArg_ParseTuple(args, "sOi", &sql, &read_only_object, &count)) + { + goto errout; + } + + read_only = PyObject_IsTrue(read_only_object); + errorCheck(); + + multicorn_connect(); + + iret = SPI_execute(sql, read_only, count); + + pret = PyInt_FromLong(iret); + + out: + if (args != NULL) { + Py_DECREF(args); + } + + if (kwargs != NULL) { + Py_DECREF(kwargs); + } + + return pret; + + errout: + errorCheck(); + Py_INCREF(Py_None); + pret = Py_None; + goto out; + +} + +static void +stmt_destructor(PyObject *po) +{ + SPIPlanPtr stmt = PyCapsule_GetPointer(po, STATEMENT_NAME); + + if (stmt == NULL) + { + errorCheck(); + return; + } + + // Make sure this isn't doing a double free + // or freeing a bad ptr. + //SPI_freeplan(stmt); +} + +static PyObject * +py_prepare(PyObject *self, PyObject *args, PyObject *kwargs) +{ + char *sql = NULL; + PyObject *pret = NULL; + PyObject *pobj = NULL; + SPIPlanPtr stmt = NULL; + Py_ssize_t arg_count = 0; + Oid *argtypes = NULL; + Py_ssize_t i; + char logbuff[200]; + + arg_count = PyTuple_Size(args) - 1; + pobj = PyTuple_GetItem(args, 0); + if (pobj == NULL) + { + goto errout; + } + + sql = PyString_AsString(pobj); + + if (sql == NULL) + { + goto errout; + } + + multicorn_connect(); + + argtypes = SPI_palloc(sizeof(*argtypes) * arg_count); + if (argtypes == NULL) { + goto errout; + } + + memset(argtypes, 0, sizeof(*argtypes) * arg_count); + for (i = 0; i < arg_count; ++i) + { + pobj = PyTuple_GetItem(args, i + 1); + /* Don't use the macro here since we + are in the middle of a pg call. + */ + + /* This pobj needs to be derefed. + The others don't since they + are borrowed. + */ + pobj = PyObject_CallMethod(pobj, "getOID", "()"); + if (pobj == NULL) { + VLOG("getoid call failed"); + goto errout; + } + + if (!PyLong_Check(pobj)) + { + /* XXXXX Fixme: + throw a type error. + */ + VLOG("Wrong type returned from getOID"); + Py_DECREF(pobj); + pobj = NULL; + goto errout; + } + + argtypes[i] = PyLong_AsLong(pobj); + Py_DECREF(pobj); + pobj = NULL; + } + + /* + * SPI_Prepare makes a copy + * of the argtypes in _SPI_make_plan_non_temp + * so we must still free it. + */ + + + stmt = SPI_prepare(sql, arg_count, argtypes); + + // lets see if not freeing argtypes helps. + argtypes = NULL; + + + if (stmt == NULL) + { + goto errout; + } + + if (SPI_keepplan(stmt)) + { + goto errout; + } + + pret = PyCapsule_New(stmt, STATEMENT_NAME, stmt_destructor); + + if (pret == NULL) + { + //SPI_freeplan(stmt); + stmt = NULL; + goto errout; + } + + out: + if (args != NULL) + { + Py_DECREF(args); + args = NULL; + } + + if (kwargs != NULL) + { + Py_DECREF(kwargs); + kwargs = NULL; + } + + if (argtypes != NULL) + { + //SPI_pfree(argtypes); + argtypes = NULL; + } + + return pret; + + errout: + errorCheck(); + pret = Py_None; + Py_INCREF(Py_None); + goto out; + + +} + + + static PyMethodDef UtilsMethods[] = { - {"_log_to_postgres", (PyCFunction) log_to_postgres, METH_VARARGS | METH_KEYWORDS, "Log to postresql client"}, - {"check_interrupts", (PyCFunction) py_check_interrupts, METH_VARARGS | METH_KEYWORDS, "Gives control back to PostgreSQL"}, + {"_log_to_postgres", (PyCFunction) log_to_postgres, + METH_VARARGS | METH_KEYWORDS, "Log to postresql client"}, + {"_execute", (PyCFunction) py_execute, METH_VARARGS | METH_KEYWORDS, + "Execute SQL"}, + {"_fetch", (PyCFunction) py_fetch, METH_NOARGS, + "Fetch a tuple of results from last execute."}, + {"_prepare", (PyCFunction) py_prepare, METH_VARARGS | METH_KEYWORDS, + "Prepare a statement to execute."}, + {"_execute_stmt", (PyCFunction) py_execute_stmt, + METH_VARARGS | METH_KEYWORDS, + "Execute a previously prepared statement."}, + {"check_interrupts", (PyCFunction) py_check_interrupts, + METH_VARARGS | METH_KEYWORDS, "Gives control back to PostgreSQL"}, {NULL, NULL, 0, NULL} };