Skip to content

Commit

Permalink
apacheGH-2924: Lateral - fixed injection for tables, enhanced QueryEx…
Browse files Browse the repository at this point in the history
…ec API for easier testing.
  • Loading branch information
Aklakan committed Jan 9, 2025
1 parent 5114670 commit bf5facb
Show file tree
Hide file tree
Showing 22 changed files with 241 additions and 35 deletions.
4 changes: 4 additions & 0 deletions jena-arq/src/main/java/org/apache/jena/query/ResultSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,8 @@ public default ResultSet materialise() {
}

public void close();

default RowSet toRowSet() {
return RowSet.adapt(this);
}
}
19 changes: 12 additions & 7 deletions jena-arq/src/main/java/org/apache/jena/sparql/algebra/Algebra.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,22 +161,27 @@ public static Binding merge(Binding bindingLeft, Binding bindingRight) {

// If compatible, merge. Iterate over variables in right but not in left.
BindingBuilder b = Binding.builder(bindingLeft);
for ( Iterator<Var> vIter = bindingRight.vars() ; vIter.hasNext() ; ) {
Var v = vIter.next();
Node n = bindingRight.get(v);
bindingRight.forEach((v, n) -> {
if ( !bindingLeft.contains(v) )
b.add(v, n);
}
});
return b.build();
}

public static boolean compatible(Binding bindingLeft, Binding bindingRight) {
// Test to see if compatible: Iterate over variables in left
for ( Iterator<Var> vIter = bindingLeft.vars() ; vIter.hasNext() ; ) {
Var v = vIter.next();
return compatible(bindingLeft, bindingRight, bindingLeft.vars());
}

/** Test to see if bindings are compatible for all variables of the provided iterator. */
public static boolean compatible(Binding bindingLeft, Binding bindingRight, Iterator<Var> vars) {
while (vars.hasNext() ) {
Var v = vars.next();
Node nLeft = bindingLeft.get(v);
Node nRight = bindingRight.get(v);
if ( nLeft == null )
continue;

Node nRight = bindingRight.get(v);
if ( nRight != null && !nRight.equals(nLeft) )
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,40 +18,61 @@

package org.apache.jena.sparql.algebra;

import java.util.Iterator;
import java.util.List ;

import org.apache.jena.atlas.iterator.Iter;
import org.apache.jena.graph.Node ;
import org.apache.jena.sparql.algebra.table.Table1 ;
import org.apache.jena.sparql.algebra.table.TableEmpty ;
import org.apache.jena.sparql.algebra.table.TableN ;
import org.apache.jena.sparql.algebra.table.TableUnit ;
import org.apache.jena.sparql.core.Var ;
import org.apache.jena.sparql.engine.QueryIterator ;
import org.apache.jena.sparql.engine.binding.Binding;
import org.apache.jena.sparql.exec.RowSet;

public class TableFactory
{
public static Table createUnit()
{ return new TableUnit() ; }

public static Table createEmpty()
{ return new TableEmpty() ; }

public static Table create()
{ return new TableN() ; }

public static Table create(List<Var> vars)
{ return new TableN(vars) ; }

public static Table create(QueryIterator queryIterator)
{
{
if ( queryIterator.isJoinIdentity() ) {
queryIterator.close();
return createUnit() ;
}

return new TableN(queryIterator) ;
}

public static Table create(Var var, Node value)
{ return new Table1(var, value) ; }

public static Table create(List<Var> vars, Iterable<Binding> bindings)
{ return create(vars, bindings.iterator()); }

public static Table create(List<Var> vars, Iterator<Binding> bindings) {
Table result = create(vars);
bindings.forEachRemaining(result::addBinding);
return result;
}

/** Creates a table from the materialized bindings of the row set. */
public static Table create(RowSet rs)
{
List<Var> vars = rs.getResultVars();
Iterator<Binding> it = Iter.map(rs, Binding::detach);
return create(vars, it);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,11 @@ public default boolean contains(String varName) {

@Override
public boolean equals(Object other);

/**
* Returns a binding which is guaranteed to be independent of
* any resources such as an ongoing query execution or a disk-based dataset.
* May return itself if it is already detached.
*/
public Binding detach();
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,9 @@ protected void forEach1(BiConsumer<Var, Node> action) { }

@Override
protected Node get1(Var var) { return null; }

@Override
protected Binding detachWithNewParent(Binding newParent) {
return new Binding0(newParent);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,9 @@ protected Node get1(Var v) {
return value;
return null;
}

@Override
protected Binding detachWithNewParent(Binding newParent) {
return new Binding1(newParent, var, value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,9 @@ protected Node get1(Var v)
return value2;
return null;
}

@Override
protected Binding detachWithNewParent(Binding newParent) {
return new Binding2(newParent, var1, value1, var2, value2);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,9 @@ protected Node get1(Var var) {

return null;
}

@Override
protected Binding detachWithNewParent(Binding newParent) {
return new Binding3(newParent, var1, value1, var2, value2, var3, value3);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,9 @@ protected Node get1(Var var) {

return null;
}

@Override
protected Binding detachWithNewParent(Binding newParent) {
return new Binding4(newParent, var1, value1, var2, value2, var3, value3, var4, value4);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -202,4 +202,19 @@ public static int hashCode(Binding bind) {
}
return hash;
}

@Override
public Binding detach() {
Binding newParent = parent == null ? null : parent.detach();
Binding result = newParent == parent
? detachWithOriginalParent()
: detachWithNewParent(newParent);
return result;
}

protected Binding detachWithOriginalParent() {
return this;
}

protected abstract Binding detachWithNewParent(Binding newParent);
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,9 @@ protected int size1() {
protected boolean isEmpty1() {
return map.isEmpty();
}

@Override
protected Binding detachWithNewParent(Binding newParent) {
return new BindingOverMap(newParent, map);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,17 @@ public BindingProject(Collection<Var> vars, Binding bind) {
protected boolean accept(Var var) {
return projectionVars.contains(var) ;
}

@Override
public Binding detach() {
Binding b = binding.detach();
return b == binding
? this
: new BindingProject(projectionVars, b);
}

@Override
protected Binding detachWithNewParent(Binding newParent) {
throw new UnsupportedOperationException("Should never be called.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@
import org.apache.jena.graph.Node ;
import org.apache.jena.sparql.core.Var ;

/** Common framework for projection;
/** Common framework for projection;
* the projection policy is provided by
* abstract method {@link #accept(Var)}
* abstract method {@link #accept(Var)}
*/
public abstract class BindingProjectBase extends BindingBase {
private List<Var> actualVars = null ;
private final Binding binding ;
protected final Binding binding ;

public BindingProjectBase(Binding bind) {
super(null) ;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,17 @@ public BindingProjectNamed(Binding bind) {
protected boolean accept(Var var) {
return var.isNamedVar() ;
}

@Override
public Binding detach() {
Binding b = binding.detach();
return b == binding
? this
: new BindingProjectNamed(b);
}

@Override
protected Binding detachWithNewParent(Binding newParent) {
throw new UnsupportedOperationException("Should never be called.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,9 @@ private BindingRoot() {
public void format1(StringBuilder sBuff) {
sBuff.append("[Root]");
}

@Override
protected Binding detachWithNewParent(Binding newParent) {
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.jena.sparql.engine.iterator;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
Expand All @@ -27,6 +29,7 @@
import org.apache.jena.atlas.lib.SetUtils;
import org.apache.jena.graph.Node;
import org.apache.jena.graph.Triple;
import org.apache.jena.sparql.algebra.Algebra;
import org.apache.jena.sparql.algebra.Op;
import org.apache.jena.sparql.algebra.Table;
import org.apache.jena.sparql.algebra.TransformCopy;
Expand Down Expand Up @@ -292,15 +295,31 @@ public Op transform(OpTable opTable) {
// By the assignment restriction, the binding only needs to be added to each row of the table.
Table table = opTable.getTable();
// Table vars.
List<Var> vars = new ArrayList<>(table.getVars());
binding.vars().forEachRemaining(vars::add);
List<Var> tableVars = table.getVars();
List<Var> vars = new ArrayList<>(tableVars);

// Track variables that appear both in the table and the binding.
List<Var> commonVars = new ArrayList<>();

// Index variables in a set if there are more than a few of them.
Collection<Var> tableVarsIndex = tableVars.size() > 4 ? new HashSet<>(tableVars) : tableVars;
binding.vars().forEachRemaining(v -> {
if (tableVarsIndex.contains(v)) {
commonVars.add(v);
} else {
vars.add(v);
}
});

TableN table2 = new TableN(vars);
BindingBuilder builder = BindingFactory.builder();
table.iterator(null).forEachRemaining(row->{
builder.reset();
builder.addAll(row);
builder.addAll(binding);
table2.addBinding(builder.build());
if (Algebra.compatible(row, binding, commonVars.iterator())) {
builder.reset();
builder.addAll(row);
binding.forEach(builder::set);
table2.addBinding(builder.build());
}
});
return OpTable.create(table2);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ public static QueryExecBuilder graph(Graph graph) {
return QueryExecDatasetBuilder.create().graph(graph);
}

/** Create a {@link QueryExecBuilder} initialized with an empty dataset. */
public static QueryExecBuilder emptyDataset() {
DatasetGraph empty = DatasetGraphFactory.create();
return dataset(empty);
}

/** Create a {@link QueryExecBuilder} for a remote endpoint. */
public static QueryExecBuilder service(String serviceURL) {
return QueryExecHTTPBuilder.create().endpoint(serviceURL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import org.apache.jena.query.ARQ;
import org.apache.jena.query.Query;
import org.apache.jena.query.Syntax;
import org.apache.jena.riot.rowset.RowSetOnClose;
import org.apache.jena.sparql.algebra.Table;
import org.apache.jena.sparql.algebra.TableFactory;
import org.apache.jena.sparql.core.Var;
import org.apache.jena.sparql.engine.binding.Binding;
import org.apache.jena.sparql.util.Context;
Expand Down Expand Up @@ -81,9 +84,16 @@ public default QueryExecBuilder substitution(String var, Node value) {

// build-and-use short cuts

/** Build and execute as a SELECT query. */
/**
* Build and execute as a SELECT query.
* The caller must eventually close the returned RowSet
* in order to free any associated resources.
* Use {@link #table()} to obtain an independent in-memory copy of the row set.
*/
public default RowSet select() {
return build().select();
QueryExec qExec = build();
RowSet core = qExec.select();
return new RowSetOnClose(core, qExec::close);
}

/** Build and execute as a CONSTRUCT query. */
Expand All @@ -106,4 +116,18 @@ public default boolean ask() {
return qExec.ask();
}
}

/**
* Build and execute as a SELECT query.
* Creates and returns an independent in-memory table by materializing the underlying row set.
* Subsequently, {@link Table#toRowSet()} can be used to obtain a fresh row set view over the table.
*/
public default Table table() {
Table result;
try (QueryExec qExec = build()) {
RowSet rowSet = qExec.select();
result = TableFactory.create(rowSet);
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,8 @@ public default Stream<Binding> stream() {
public static RowSet create(QueryIterator qIter, List<Var> vars) {
return RowSetStream.create(vars, qIter);
}

default ResultSet toResultSet() {
return ResultSet.adapt(this);
}
}
Loading

0 comments on commit bf5facb

Please sign in to comment.