Skip to content

Commit

Permalink
Merge pull request #486 from jimmccusker/sparql_batch_update
Browse files Browse the repository at this point in the history
Batch commits for SPARQLUpdateStore
  • Loading branch information
joernhees committed Aug 11, 2015
2 parents afa6521 + 371263f commit b2f9138
Showing 1 changed file with 60 additions and 21 deletions.
81 changes: 60 additions & 21 deletions rdflib/plugins/stores/sparqlstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@
ORDERBY = 'ORDER BY'

import re
import collections
import urllib2

# import warnings
try:
from SPARQLWrapper import SPARQLWrapper, XML, POST, GET, URLENCODED, POSTDIRECTLY
Expand Down Expand Up @@ -519,12 +522,34 @@ def __init__(self,
queryEndpoint=None, update_endpoint=None,
bNodeAsURI=False, sparql11=True,
context_aware=True,
postAsEncoded=True):
postAsEncoded=True, autocommit=True):

SPARQLStore.__init__(self,
queryEndpoint, bNodeAsURI, sparql11, context_aware, updateEndpoint=update_endpoint)

self.postAsEncoded = postAsEncoded
self.autocommit = autocommit
self._edits = None

def query(self,*args, **kwargs):
if not self.autocommit:
self.commit()
return SPARQLStore.query(self,*args, **kwargs)

def triples(self,*args, **kwargs):
if not self.autocommit:
self.commit()
return SPARQLStore.triples(self,*args, **kwargs)

def contexts(self,*args, **kwargs):
if not self.autocommit:
self.commit()
return SPARQLStore.contexts(self,*args, **kwargs)

def __len__(self,*args, **kwargs):
if not self.autocommit:
self.commit()
return SPARQLStore.__len__(self,*args, **kwargs)

def open(self, configuration, create=False):
"""
Expand All @@ -547,6 +572,11 @@ def open(self, configuration, create=False):
if not self.updateEndpoint:
self.updateEndpoint = self.endpoint

def _transaction(self):
if self._edits == None:
self._edits = []
return self._edits

def __set_update_endpoint(self, update_endpoint):
self.updateEndpoint = update_endpoint

Expand All @@ -561,10 +591,18 @@ def __get_update_endpoint(self):

# Transactional interfaces
def commit(self):
raise TypeError('The SPARQL Update store is not transaction aware!')
""" add(), addN(), and remove() are transactional to reduce overhead of many small edits.
Read and update() calls will automatically commit any outstanding edits.
This should behave as expected most of the time, except that alternating writes
and reads can degenerate to the original call-per-triple situation that originally existed.
"""
if self._edits and len(self._edits) > 0:
r = self._do_update('\n;\n'.join(self._edits))
self._edits = None
return r

def rollback(self):
raise TypeError('The SPARQL Update store is not transaction aware')
self._edits = None

def add(self, spo, context=None, quoted=False):
""" Add a triple to the store of triples. """
Expand All @@ -588,28 +626,25 @@ def add(self, spo, context=None, quoted=False):
context.identifier.n3(), triple)
else:
q = "INSERT DATA { %s }" % triple
self._do_update(q)
self._transaction().append(q)
if self.autocommit:
self.commit()

def addN(self, quads):
""" Add a list of quads to the store. """
if not self.endpoint:
raise Exception("UpdateEndpoint is not set - call 'open'")

data = ""
for spoc in quads:
(subject, predicate, obj, context) = spoc

if ( isinstance(subject, BNode) or
isinstance(predicate, BNode) or
isinstance(obj, BNode) ):
raise Exception("SPARQLStore does not support Bnodes! "
"See http://www.w3.org/TR/sparql11-query/#BGPsparqlBNodes")


triple = "%s %s %s ." % (subject.n3(), predicate.n3(), obj.n3())
data += "INSERT DATA { GRAPH <%s> { %s } }\n" % (
context.identifier, triple)
self._do_update(data)
contexts = collections.defaultdict(list)
for subject, predicate, obj, context in quads:
contexts[context].append((subject,predicate,obj))
data = []
for context in contexts:
triples = ["%s %s %s ." % (x[0].n3(), x[1].n3(), x[2].n3()) for x in contexts[context]]
data.append("INSERT DATA { GRAPH <%s> { %s } }\n" % (context.identifier, '\n'.join(triples)))
self._transaction().extend(data)
if self.autocommit:
self.commit()

def remove(self, spo, context):
""" Remove a triple from the store """
Expand All @@ -631,7 +666,9 @@ def remove(self, spo, context):
context.identifier.n3(), triple)
else:
q = "DELETE { %s } WHERE { %s } " % (triple, triple)
self._do_update(q)
self._transaction().append(q)
if self.autocommit:
self.commit()

def _do_update(self, update):
self.resetQuery()
Expand Down Expand Up @@ -703,7 +740,9 @@ def update(self, query,

query = self.where_pattern.sub("WHERE { " + values, query)

self._do_update(query)
self._transaction().append(query)
if self.autocommit:
self.commit()

def _insert_named_graph(self, query, query_graph):
"""
Expand Down

0 comments on commit b2f9138

Please sign in to comment.