Skip to content

Commit

Permalink
fix: gh 501 (#536)
Browse files Browse the repository at this point in the history
  • Loading branch information
jobulcke committed Mar 12, 2024
1 parent 7dafac6 commit d614a28
Show file tree
Hide file tree
Showing 20 changed files with 203 additions and 147 deletions.
4 changes: 0 additions & 4 deletions ldi-core/repository-materialiser/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,6 @@
<groupId>org.eclipse.rdf4j</groupId>
<artifactId>rdf4j-rio-rdfxml</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.rdf4j</groupId>
<artifactId>rdf4j-rio-trig</artifactId>
</exclusion>
<exclusion>
<groupId>org.eclipse.rdf4j</groupId>
<artifactId>rdf4j-rio-trix</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
import be.vlaanderen.informatievlaanderen.ldes.ldi.valueobjects.MaterialiserConnection;
import org.apache.jena.rdf.model.Model;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.Value;
import org.eclipse.rdf4j.repository.manager.RemoteRepositoryManager;
import org.eclipse.rdf4j.repository.manager.RepositoryManager;

import java.util.*;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;

public class Materialiser {
private final MaterialiserConnection materialiserConnection;
Expand All @@ -29,52 +29,60 @@ protected MaterialiserConnection getMaterialiserConnection() {
}

public void process(List<Model> jenaModels) {
try {
jenaModels.stream()
.map(JenaToRDF4JConverter::convert)
.forEach(updateModel -> {
Set<Resource> entityIds = ModelSubjectsExtractor.extractSubjects(updateModel);
deleteEntitiesFromRepo(entityIds);
materialiserConnection.add(updateModel);
});
materialiserConnection.commit();
} catch (Exception e) {
materialiserConnection.rollback();
throw new MaterialisationFailedException(e, jenaModels);
synchronized (materialiserConnection) {
try {
jenaModels.stream().map(JenaToRDF4JConverter::convert).forEach(updateModel -> {
Set<Resource> entityIds = ModelSubjectsExtractor.extractSubjects(updateModel);
deleteEntities(entityIds);
materialiserConnection.add(updateModel);
});
materialiserConnection.commit();
} catch (Exception e) {
materialiserConnection.rollback();
throw new MaterialisationFailedException(e);
}
}
}

public CompletableFuture<Void> processAsync(List<Model> jenaModels) {
return CompletableFuture.runAsync(() -> process(jenaModels));
}

public void shutdown() {
materialiserConnection.shutdown();
}

protected void deleteEntities(Set<Resource> subjects) {
subjects.forEach(this::deleteEntity);
}

/**
* Delete an entity, including its blank nodes, from a repository.
*
* @param entityIds The subjects of the entities to delete.
*/
protected void deleteEntitiesFromRepo(Set<Resource> entityIds) {
Deque<Resource> subjectStack = new ArrayDeque<>();
entityIds.forEach(subjectStack::push);

/*
* Entities can contain blank node references. All statements with those blank
* node identifiers need to be removed as well. As blank nodes can be nested
* inside blank nodes, we need to keep track of them as they are encountered by
* adding them to the stack.
*/
while (!subjectStack.isEmpty()) {
Resource subject = subjectStack.pop();
protected void deleteEntity(Resource subject) {
final String query = """
DELETE {
?s ?p ?o .
?blankNode ?blankNodePredicate ?blankNodeObject .
}
WHERE {
?s ?p ?o .
FILTER(?s = <%s>)
materialiserConnection.getStatements(subject, null, null).forEach((Statement statement) -> {
Value object = statement.getObject();
if (object.isBNode()) {
subjectStack.push((Resource) object);
}
});
OPTIONAL {
{
?o ?blankNodePredicate ?blankNodeObject .
FILTER(isBlank(?o))
BIND(?o AS ?blankNode)
}
UNION
{
?o ?nestedPredicate ?nestedObject .
FILTER(isBlank(?nestedObject))
?nestedObject ?blankNodePredicate ?blankNodeObject .
BIND(?nestedObject AS ?blankNode)
}
}
}
""".formatted(subject.stringValue());

materialiserConnection.remove(subject, null, null);
}
materialiserConnection.executeUpdate(query);
}
}
Original file line number Diff line number Diff line change
@@ -1,24 +1,12 @@
package be.vlaanderen.informatievlaanderen.ldes.ldi.exceptions;

import org.apache.jena.rdf.model.Model;
import org.apache.jena.riot.Lang;
import org.apache.jena.riot.RDFWriter;

import java.util.List;
import java.util.stream.Collectors;

public class MaterialisationFailedException extends RuntimeException {
private final transient List<Model> uncommittedModels;
public MaterialisationFailedException(Exception e, List<Model> uncommittedModels) {
public MaterialisationFailedException(Exception e) {
super(e);
this.uncommittedModels = uncommittedModels;
}

@Override
public String getMessage() {
final String uncommittedMembers = uncommittedModels.stream()
.map(model -> RDFWriter.source(model).lang(Lang.TURTLE).asString())
.collect(Collectors.joining("\n"));
return "The following members could not be materialised to the triples store%n%n%s".formatted(uncommittedMembers);
return "Materialisation to triples store failed: %s".formatted(getCause().getMessage());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ public void rollback() {
connection.close();
}

public void executeUpdate(String query) {
holder.getConnection().prepareUpdate(query).execute();
}

public void shutdown() {
holder.shutdown();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import org.eclipse.rdf4j.model.Model;
import org.eclipse.rdf4j.model.Resource;
import org.eclipse.rdf4j.model.Statement;
import org.eclipse.rdf4j.model.impl.SimpleValueFactory;
import org.eclipse.rdf4j.repository.config.RepositoryConfig;
import org.eclipse.rdf4j.repository.manager.LocalRepositoryManager;
import org.eclipse.rdf4j.repository.manager.RepositoryManager;
Expand All @@ -25,6 +26,9 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;

Expand All @@ -35,11 +39,6 @@ class MaterialiserIT {
@TempDir
File dataDir;

private static final String[] TEST_FILES = new String[]{
"src/test/resources/people_data_01.nq",
"src/test/resources/people_data_02.nq"};
private static final String CHANGED_FILE = "src/test/resources/people_data_03.nq";

@BeforeEach
public void setUp() {
materialiser = new Materialiser(LOCAL_SERVER_URL, LOCAL_REPOSITORY_ID, "");
Expand All @@ -57,50 +56,38 @@ public void tearDown() {
materialiser.shutdown();
}

@Test
void when_DataPresent_Then_GetEntityIds() throws Exception {
var updateModel = Rio.parse(new FileInputStream(TEST_FILES[0]), "", RDFFormat.NQUADS);

Set<Resource> entityIds = ModelSubjectsExtractor.extractSubjects(updateModel);

assertThat(entityIds)
.as("Expected all subjects from test data")
.hasSize(2);
}

@Test
void when_DeleteEntities_Then_EntitiesRemovedFromStore() throws Exception {
populateAndCheckRepository(List.of(TEST_FILES));
Model updateModel = Rio.parse(new FileInputStream(TEST_FILES[0]), "", RDFFormat.NQUADS);
Model updateModel2 = Rio.parse(new FileInputStream(TEST_FILES[1]), "", RDFFormat.NQUADS);
Set<Resource> entityIds = ModelSubjectsExtractor.extractSubjects(updateModel);
final List<String> testFiles = IntStream.range(1, 4).mapToObj("src/test/resources/people/%d.nq"::formatted).toList();
populateAndCheckRepository(testFiles);
final Model model = Rio.parse(new FileInputStream("src/test/resources/people/1.nq"), "", RDFFormat.NQUADS);
Set<Resource> entityIds = Stream.of("http://somewhere/BeckySmith/", "http://somewhere/SarahJones/")
.map(iri -> SimpleValueFactory.getInstance().createIRI(iri))
.collect(Collectors.toSet());

materialiser.deleteEntitiesFromRepo(entityIds);
materialiser.deleteEntities(entityIds);

List<Statement> statements = materialiser.getMaterialiserConnection()
.getStatements(null, null, null).stream().toList();

assertThat(statements)
.containsExactlyInAnyOrderElementsOf(updateModel2)
.doesNotContainAnyElementsOf(updateModel);
assertThat(statements).hasSize(4);
}

@Test
void when_UpdateEntities_Then_OldTriplesRemoved() throws Exception {
populateAndCheckRepository(List.of(TEST_FILES));
Model updateModel = Rio.parse(new FileInputStream(TEST_FILES[0]), "", RDFFormat.NQUADS);
Model changedModel = Rio.parse(new FileInputStream(CHANGED_FILE), "", RDFFormat.NQUADS);
final List<String> testFiles = IntStream.range(1, 6).mapToObj("src/test/resources/people/%d.nq"::formatted).toList();
populateAndCheckRepository(testFiles);

List<org.apache.jena.rdf.model.Model> models = RDFParser.source(CHANGED_FILE).toModel().listStatements().toList().stream()
.map(statement -> ModelFactory.createDefaultModel().add(statement))
.toList();
List<org.apache.jena.rdf.model.Model> models = List.of(RDFParser.source("people/5-updated.nq").toModel());
materialiser.process(models);

List<Statement> statements = materialiser.getMaterialiserConnection()
.getStatements(null, null, null).stream().toList();

assertThat(testModelInStatements(changedModel, statements)).isTrue();
assertThat(testModelInStatements(updateModel, statements)).isFalse();
assertThat(statements)
.hasSize(21)
.anyMatch(statement -> statement.getObject().stringValue().equals("CHANGED"))
.noneMatch(statement -> statement.getObject().stringValue().equals("Taylor"));
}

void populateAndCheckRepository(List<String> files) throws IOException {
Expand All @@ -116,14 +103,4 @@ void populateAndCheckRepository(List<String> files) throws IOException {
assertThat(models).allMatch(statements::containsAll);
}

private boolean testModelInStatements(Model model, List<Statement> statements) {
AtomicBoolean result = new AtomicBoolean(true);
model.getStatements(null, null, null).forEach(statement -> {
if (!statements.contains(statement)) {
result.set(false);
}
});
return result.get();
}

}
Loading

0 comments on commit d614a28

Please sign in to comment.