Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write minimal columns #154

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ directories.stamp:

$(OBJS): directories.stamp

install: python_code
install: python_code

rpm: setup.py
@echo "About to make RPM"
$(PYTHON) ./setup.py bdist --format=rpm

sql/$(EXTENSION)--$(EXTVERSION).sql: sql/$(EXTENSION).sql directories.stamp
cp $< $@
Expand Down
2 changes: 1 addition & 1 deletion multicorn.control
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
comment = 'Multicorn Python bindings for Postgres 9.2.* Foreign Data Wrapper'
default_version = '1.3.2'
module_pathname = '$libdir/multicorn'
relocatable = true
relocatable = True
10 changes: 9 additions & 1 deletion python/multicorn/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ def insert(self, values):

def update(self, oldvalues, newvalues):
"""
Update a tuple containing ''oldvalues'' to the ''newvalues''.
Update a identified by ''rowid'' to the ''newvalues''.

Args:
oldvalues (dict): a dictionary mapping from column
Expand All @@ -366,6 +366,14 @@ def update(self, oldvalues, newvalues):
"""
raise NotImplementedError("This FDW does not support the writable API")

def update(self, rowid, update_columns, newvalues):
"""
Update only the columns identified in ''update_columns''
Identify the row by the rowid parameter.
If this object is not overloaded by the derriving class then call the old API (without the update_columns parameter)
"""
return self.update(rowid, newvalues)

def delete(self, oldvalues):
"""
Delete a tuple identified by ``oldvalues``
Expand Down
3 changes: 3 additions & 0 deletions rpm/post_install.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
default_version=$(grep default_version $(pg_config --sharedir)/extension/multicorn.control | sed -e "s/default_version[[:space:]]*=[[:space:]]*'\([^']*\)'/\1/")

ln $(pg_config --sharedir)/extension/multicorn.sql $(pg_config --sharedir)/extension/multicorn--${default_version}.sql
3 changes: 3 additions & 0 deletions rpm/pre_uninstall.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
default_version=$(grep default_version $(pg_config --sharedir)/extension/multicorn.control | sed -e "s/default_version[[:space:]]*=[[:space:]]*'\([^']*\)'/\1/")

rm $(pg_config --sharedir)/extension/multicorn--${default_version}.sql
50 changes: 42 additions & 8 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@
import sys
from setuptools import setup, find_packages, Extension


import os
from sys import platform
from setuptools.command.install import install
from distutils.command.build import build

# hum... borrowed from psycopg2
def get_pg_config(kind, pg_config="pg_config"):
p = subprocess.Popen([pg_config, '--%s' % kind], stdout=subprocess.PIPE)
Expand All @@ -26,12 +32,40 @@ def get_pg_config(kind, pg_config="pg_config"):
elif sys.version_info[1] < 6:
sys.exit("Sorry, you need at least python 2.6 for Multicorn")

class MulticornBuild(build):
def run(self):
# Original build
build.run(self)
r = subprocess.check_output(['/usr/bin/make', 'multicorn.so'])
r = r.strip().decode('utf8')
if not r:
raise Warning(p[2].readline())
# After original build

execfile('multicorn.control')


setup(
name='multicorn',
version='__VERSION__',
author='Kozea',
license='Postgresql',
package_dir={'': 'python'},
packages=['multicorn', 'multicorn.fsfdw'],
ext_modules = [multicorn_utils_module]
)
name='multicorn',
# version='__VERSION__',
version=default_version,
author='Kozea',
license='Postgresql',
description='Multicorn Python bindings for Postgres 9.5+ FDW',
long_description='Multicorn is a PostgreSQL 9.5+ extension meant to make Foreign Data Wrapper development easy, by allowing the programmer to use the Python programming language.',

options={'bdist_rpm': {'post_install': 'rpm/post_install.sh',
'pre_uninstall': 'rpm/pre_uninstall.sh',
'requires': 'postgresql95-server',
}},
package_dir={'': 'python'},
packages=['multicorn', 'multicorn.fsfdw'],
ext_modules=[multicorn_utils_module],
data_files=[
('%s/extension' % get_pg_config('sharedir'), ['multicorn.control', 'sql/multicorn.sql', 'doc/multicorn.md']),
(get_pg_config('libdir'), ['multicorn.so'])
],
cmdclass={
'build': MulticornBuild,
}
)
6 changes: 1 addition & 5 deletions src/errors.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@
#include "multicorn.h"
#include "bytesobject.h"
#include "access/xact.h"

void reportException(PyObject *pErrType,
PyObject *pErrValue,
PyObject *pErrTraceback);

#include "errors.h"

void
errorCheck()
Expand Down
12 changes: 12 additions & 0 deletions src/errors.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
#include "multicorn.h"
#include "bytesobject.h"
#include "access/xact.h"


#ifndef PG_ERRORS_H
#define PG_ERRORS_H

void errorCheck(void);
void reportException(PyObject *pErrType, PyObject *pErrValue, PyObject *pErrTraceback);

#endif
136 changes: 73 additions & 63 deletions src/multicorn.c
Original file line number Diff line number Diff line change
Expand Up @@ -25,84 +25,52 @@
#include "utils/rel.h"
#include "parser/parsetree.h"

#include "multicorn.h"
#include "errors.h"
#include "query.h"
#include "python.h"

PG_MODULE_MAGIC;


extern Datum multicorn_handler(PG_FUNCTION_ARGS);
extern Datum multicorn_validator(PG_FUNCTION_ARGS);


PG_FUNCTION_INFO_V1(multicorn_handler);
PG_FUNCTION_INFO_V1(multicorn_validator);


void _PG_init(void);
void _PG_fini(void);

/*
* FDW functions declarations
/* Static FWD definitions
*/
static void multicornGetForeignRelSize(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid);
static void multicornGetForeignPaths(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid);

static void multicornGetForeignRelSize(PlannerInfo *root,
RelOptInfo *baserel,
Oid foreigntableid);
static void multicornGetForeignPaths(PlannerInfo *root,
RelOptInfo *baserel,
Oid foreigntableid);
static ForeignScan *multicornGetForeignPlan(PlannerInfo *root,
RelOptInfo *baserel,
Oid foreigntableid,
ForeignPath *best_path,
List *tlist,
List *scan_clauses
#if PG_VERSION_NUM >= 90500
, Plan *outer_plan
static ForeignScan * multicornGetForeignPlan(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid, ForeignPath *best_path, List *tlist, List *scan_clauses, Plan *outer_plan);
#else
static ForeignScan * multicornGetForeignPlan(PlannerInfo *root, RelOptInfo *baserel, Oid foreigntableid, ForeignPath *best_path, List *tlist, List *scan_clauses);
#endif
);

static void multicornExplainForeignScan(ForeignScanState *node, ExplainState *es);
static void multicornBeginForeignScan(ForeignScanState *node, int eflags);
static TupleTableSlot *multicornIterateForeignScan(ForeignScanState *node);
static TupleTableSlot * multicornIterateForeignScan(ForeignScanState *node);
static void multicornReScanForeignScan(ForeignScanState *node);
static void multicornEndForeignScan(ForeignScanState *node);

#if PG_VERSION_NUM >= 90300
static void multicornAddForeignUpdateTargets(Query *parsetree,
RangeTblEntry *target_rte,
Relation target_relation);

static List *multicornPlanForeignModify(PlannerInfo *root,
ModifyTable *plan,
Index resultRelation,
int subplan_index);
static void multicornBeginForeignModify(ModifyTableState *mtstate,
ResultRelInfo *resultRelInfo,
List *fdw_private,
int subplan_index,
int eflags);
static TupleTableSlot *multicornExecForeignInsert(EState *estate, ResultRelInfo *resultRelInfo,
TupleTableSlot *slot,
TupleTableSlot *planslot);
static TupleTableSlot *multicornExecForeignDelete(EState *estate, ResultRelInfo *resultRelInfo,
TupleTableSlot *slot, TupleTableSlot *planSlot);
static TupleTableSlot *multicornExecForeignUpdate(EState *estate, ResultRelInfo *resultRelInfo,
TupleTableSlot *slot, TupleTableSlot *planSlot);
static void multicornAddForeignUpdateTargets(Query *parsetree, RangeTblEntry *target_rte, Relation target_relation);
static List * multicornPlanForeignModify(PlannerInfo *root, ModifyTable *plan, Index resultRelation, int subplan_index);
static void multicornBeginForeignModify(ModifyTableState *mtstate, ResultRelInfo *resultRelInfo, List *fdw_private, int subplan_index, int eflags);
static TupleTableSlot * multicornExecForeignInsert(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot);
static TupleTableSlot * multicornExecForeignDelete(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot);
static TupleTableSlot * multicornExecForeignUpdate(EState *estate, ResultRelInfo *resultRelInfo, TupleTableSlot *slot, TupleTableSlot *planSlot);
static void multicornEndForeignModify(EState *estate, ResultRelInfo *resultRelInfo);

static void multicorn_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
SubTransactionId parentSubid, void *arg);
static void multicorn_subxact_callback(SubXactEvent event, SubTransactionId mySubid, SubTransactionId parentSubid, void *arg);
#endif

static void multicorn_xact_callback(XactEvent event, void *arg);

#if PG_VERSION_NUM >= 90500
static List *multicornImportForeignSchema(ImportForeignSchemaStmt * stmt,
Oid serverOid);
static List * multicornImportForeignSchema(ImportForeignSchemaStmt * stmt, Oid serverOid);
#endif

static void multicorn_xact_callback(XactEvent event, void *arg);

/* Helpers functions */
void *serializePlanState(MulticornPlanState * planstate);
MulticornExecState *initializeExecState(void *internal_plan_state);

PG_FUNCTION_INFO_V1(multicorn_handler);
PG_FUNCTION_INFO_V1(multicorn_validator);


/* Hash table mapping oid to fdw instances */
HTAB *InstancesHash;
Expand Down Expand Up @@ -267,8 +235,10 @@ multicornGetForeignRelSize(PlannerInfo *root,
}
else
{
Query* parsetree = root->parse;
/* Pull "var" clauses to build an appropriate target list */
foreach(lc, extractColumns(baserel->reltargetlist, baserel->baserestrictinfo))
// foreach(lc, extractColumns(baserel->reltargetlist, baserel->baserestrictinfo))
foreach(lc, extractColumns(parsetree->targetList, baserel->baserestrictinfo))
{
Var *var = (Var *) lfirst(lc);
Value *colname;
Expand Down Expand Up @@ -621,7 +591,40 @@ multicornPlanForeignModify(PlannerInfo *root,
Index resultRelation,
int subplan_index)
{
return NULL;


CmdType operation = plan->operation;
RangeTblEntry *rte = planner_rt_fetch(resultRelation, root);
List *update_columns_list = NIL;
Relation rel;

rel = heap_open(rte->relid, NoLock);

if (operation == CMD_UPDATE)
{
ListCell *lc;

TupleDesc desc = RelationGetDescr(rel);

int col=-1;
while ((col = bms_next_member(rte->updatedCols, col)) >= 0)
{
/* bit numbers are offset by FirstLowInvalidHeapAttributeNumber */
AttrNumber attno = col + FirstLowInvalidHeapAttributeNumber;

if (attno <= InvalidAttrNumber) /* shouldn't happen */
elog(ERROR, "system-column update is not supported");

Form_pg_attribute att = desc->attrs[attno-1];

update_columns_list = lappend(update_columns_list, makeString(NameStr(att->attname)));

}
}

heap_close(rel, NoLock);

return list_make1(update_columns_list);
}


Expand Down Expand Up @@ -675,6 +678,11 @@ multicornBeginForeignModify(ModifyTableState *mtstate,
}
modstate->rowidAttno = ExecFindJunkAttributeInTlist(subplan->targetlist, modstate->rowidAttrName);
resultRelInfo->ri_FdwState = modstate;

if (fdw_private) {
// deserialise fdw_private and put update list into the mtstate variable
modstate->update_columns_list = (List *) list_nth(fdw_private, 0);
}
}

/*
Expand Down Expand Up @@ -753,14 +761,15 @@ multicornExecForeignUpdate(EState *estate, ResultRelInfo *resultRelInfo,
PyObject *fdw_instance = modstate->fdw_instance,
*p_row_id,
*p_new_value,
*p_value = tupleTableSlotToPyObject(slot, modstate->cinfos);
*p_value = tupleTableSlotToPyObject(slot, modstate->cinfos),
*p_update_columns = valuesToPySet(modstate->update_columns_list);
bool is_null;
ConversionInfo *cinfo = modstate->rowidCinfo;
Datum value = ExecGetJunkAttribute(planSlot, modstate->rowidAttno, &is_null);

p_row_id = datumToPython(value, cinfo->atttypoid, cinfo);
p_new_value = PyObject_CallMethod(fdw_instance, "update", "(O,O)", p_row_id,
p_value);
//Change call to the 4 parameters version of the python call to enable passing of p_update_columns. If that call is not utilised in the wrapper (ie old API) then the old 3 parameter is called dropping p_update_columns ensuring same functionality
p_new_value = PyObject_CallMethod(fdw_instance, "update", "(O,O,O)", p_row_id, p_update_columns, p_value);
errorCheck();
if (p_new_value != NULL && p_new_value != Py_None)
{
Expand All @@ -769,6 +778,7 @@ multicornExecForeignUpdate(EState *estate, ResultRelInfo *resultRelInfo,
ExecStoreVirtualTuple(slot);
}
Py_XDECREF(p_new_value);
Py_DECREF(p_update_columns);
Py_DECREF(p_row_id);
errorCheck();
return slot;
Expand Down
Loading