Skip to content

Commit

Permalink
Feat: rdf4j repository materialiser (#292)
Browse files Browse the repository at this point in the history
  • Loading branch information
pj-cegeka authored and Yalz committed Aug 17, 2023
1 parent b2e1b9b commit f367a5d
Show file tree
Hide file tree
Showing 15 changed files with 241 additions and 125 deletions.
2 changes: 1 addition & 1 deletion docs/_core/ldi-outputs/repository-materialiser.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,4 @@ parent: LDI Outputs
# Repository Materialiser

The repository materialiser is used to materialise an LDES stream into a triplestore.
Any triplestore with a SPARQL endpoint can be used.
Any triplestore that supports the RDF4J remote repository API can be used.
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package ldes.client.performance;

import com.github.tomakehurst.wiremock.WireMockServer;
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
import com.github.tomakehurst.wiremock.extension.responsetemplating.ResponseTemplateTransformer;
import ldes.client.performance.csvwriter.CsvFile;
import ldes.client.treenodesupplier.TreeNodeProcessor;
import org.junit.jupiter.api.*;
Expand All @@ -11,6 +8,10 @@
import java.time.temporal.ChronoUnit;
import java.util.List;

import com.github.tomakehurst.wiremock.WireMockServer;
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
import com.github.tomakehurst.wiremock.extension.responsetemplating.ResponseTemplateTransformer;

import static org.apache.commons.io.FilenameUtils.separatorsToSystem;

/**
Expand Down
47 changes: 44 additions & 3 deletions ldi-core/repository-materialiser/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,53 @@
<version>1.4.0-SNAPSHOT</version>
</parent>

<properties>
<!-- Java properties -->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<!-- RDF4j -->
<rdf4j.version>4.3.3</rdf4j.version>
<!-- Code formatter -->
<relative-formatter-path>../..</relative-formatter-path>
</properties>

<artifactId>repository-materialiser</artifactId>

<dependencyManagement>
<dependencies>
<!-- https://mvnrepository.com/artifact/org.eclipse.rdf4j/rdf4j-bom -->
<dependency>
<groupId>org.eclipse.rdf4j</groupId>
<artifactId>rdf4j-bom</artifactId>
<version>${rdf4j.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.jena</groupId>
<artifactId>jena-rdfconnection</artifactId>
<version>${jena.version}</version>
<groupId>org.eclipse.rdf4j</groupId>
<artifactId>rdf4j-client</artifactId>
<type>pom</type>
</dependency>

<!-- Test Dependencies -->
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.rdf4j</groupId>
<artifactId>rdf4j-sail-memory</artifactId>
<version>${rdf4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.rdf4j</groupId>
<artifactId>rdf4j-repository-sail</artifactId>
<version>${rdf4j.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,52 +1,59 @@
package be.vlaanderen.informatievlaanderen.ldes.ldi;

import org.apache.jena.rdf.model.Model;
import org.apache.jena.rdf.model.RDFNode;
import org.apache.jena.rdf.model.Resource;
import org.apache.jena.rdfconnection.RDFConnection;
import org.apache.jena.rdfconnection.RDFConnectionRemote;
import org.apache.jena.rdfconnection.RDFConnectionRemoteBuilder;

import be.vlaanderen.informatievlaanderen.ldes.ldi.exceptions.ModelParseIOException;
import org.eclipse.rdf4j.common.transaction.IsolationLevels;
import org.eclipse.rdf4j.model.Model;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.model.base.AbstractIRI;
import org.eclipse.rdf4j.repository.Repository;
import org.eclipse.rdf4j.repository.RepositoryConnection;
import org.eclipse.rdf4j.repository.manager.RemoteRepositoryManager;
import org.eclipse.rdf4j.repository.manager.RepositoryManager;
import org.eclipse.rdf4j.rio.RDFFormat;
import org.eclipse.rdf4j.rio.Rio;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.*;

public class Materialiser {

private final String sparqlEndpoint;
private final String repositoryId;
private final String namedGraph;
protected RDFConnectionRemoteBuilder builder;
protected RepositoryManager repositoryManager;

public Materialiser(String hostUrl, String repositoryId, String namedGraph) {
this(constructRDF4JSparqlEndpoint(hostUrl, repositoryId), namedGraph);
}

public Materialiser(String sparqlEndpoint, String namedGraph) {
this.sparqlEndpoint = sparqlEndpoint;
this.repositoryId = repositoryId;
this.namedGraph = namedGraph;
initRepositoryManager(new RemoteRepositoryManager(hostUrl));
}

protected void setConnectionBuilder(RDFConnectionRemoteBuilder builder) {
this.builder = builder;
protected void initRepositoryManager(RepositoryManager manager) {
this.repositoryManager = manager;
}

public void initConnection() {
if (builder == null) {
setConnectionBuilder(
RDFConnectionRemote.service(sparqlEndpoint));
}
}
public void process(String content) {
final Repository repository = repositoryManager.getRepository(repositoryId);

try (RepositoryConnection dbConnection = repository.getConnection()) {
dbConnection.setIsolationLevel(IsolationLevels.NONE);
dbConnection.begin();

public void process(Model content) {
try (RDFConnection connection = builder.build()) {
var updateModel = readInputString(content);

Set<Resource> entityIds = getSubjectsFromModel(content);
deleteEntitiesFromRepo(entityIds, connection);
Set<Resource> entityIds = getSubjectsFromModel(updateModel);
deleteEntitiesFromRepo(entityIds, dbConnection);

if (namedGraph != null && !namedGraph.isEmpty()) {
connection.load(namedGraph, content);
var namedGraphIRI = dbConnection.getValueFactory().createIRI(namedGraph);
dbConnection.add(updateModel, namedGraphIRI);
} else {
connection.load(content);
dbConnection.add(updateModel);
}
connection.commit();
dbConnection.commit();
}
}

Expand All @@ -57,12 +64,12 @@ public void process(Model content) {
* A graph
* @return A set of subject URIs.
*/
private Set<Resource> getSubjectsFromModel(Model model) {
protected static Set<Resource> getSubjectsFromModel(Model model) {
Set<Resource> entityIds = new HashSet<>();

model.listStatements().forEach(statement -> {
if (statement.getSubject().isURIResource()) {
entityIds.add(statement.getSubject());
model.subjects().forEach((Resource subject) -> {
if (subject instanceof AbstractIRI) {
entityIds.add(subject);
}
});

Expand All @@ -77,7 +84,7 @@ private Set<Resource> getSubjectsFromModel(Model model) {
* @param connection
* The DB connection.
*/
private void deleteEntitiesFromRepo(Set<Resource> entityIds, RDFConnection connection) {
protected static void deleteEntitiesFromRepo(Set<Resource> entityIds, RepositoryConnection connection) {
Deque<Resource> subjectStack = new ArrayDeque<>();
entityIds.forEach(subjectStack::push);

Expand All @@ -87,22 +94,26 @@ private void deleteEntitiesFromRepo(Set<Resource> entityIds, RDFConnection conne
* inside blank nodes, we need to keep track of them as they are encountered by
* adding them to the stack.
*/

while (!subjectStack.isEmpty()) {
Resource subject = subjectStack.pop();

connection.fetch(subject.toString()).listStatements().forEach(statement -> {
RDFNode object = statement.getObject();
if (object.isAnon()) {
connection.getStatements(subject, null, null).forEach((Statement statement) -> {
Value object = statement.getObject();
if (object.isBNode()) {
subjectStack.push((Resource) object);
}
});

connection.delete(subject.toString());
connection.remove(subject, null, null);
}
}

protected static String constructRDF4JSparqlEndpoint(String hostUrl, String repositoryId) {
return hostUrl + "/repositories/" + repositoryId + "/statements";
private Model readInputString(String content) {
InputStream in = new ByteArrayInputStream(content.getBytes(StandardCharsets.UTF_8));
try {
return Rio.parse(in, "", RDFFormat.NQUADS);
} catch (IOException e) {
throw new ModelParseIOException(content, e.getMessage());
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package be.vlaanderen.informatievlaanderen.ldes.ldi.exceptions;

public class ModelParseIOException extends RuntimeException {

private final String model;
private final String cause;

public ModelParseIOException(String model, String cause) {
this.model = model;
this.cause = cause;
}

@Override
public String getMessage() {
return "Could not parse content to model. cause:\n" + cause + "\nContent:\n" + model;
}
}
Loading

0 comments on commit f367a5d

Please sign in to comment.