Skip to content

Commit

Permalink
apacheGH-2924: Lateral - fixed injection for tables, enhanced QueryEx…
Browse files Browse the repository at this point in the history
…ec API for easier testing.
  • Loading branch information
Aklakan committed Jan 8, 2025
1 parent 742e51e commit 2f7c4cf
Show file tree
Hide file tree
Showing 20 changed files with 201 additions and 34 deletions.
4 changes: 4 additions & 0 deletions jena-arq/src/main/java/org/apache/jena/query/ResultSet.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,8 @@ public default ResultSet materialise() {
}

public void close();

default RowSet toRowSet() {
return RowSet.adapt(this);
}
}
20 changes: 13 additions & 7 deletions jena-arq/src/main/java/org/apache/jena/sparql/algebra/Algebra.java
Original file line number Diff line number Diff line change
Expand Up @@ -161,22 +161,28 @@ public static Binding merge(Binding bindingLeft, Binding bindingRight) {

// If compatible, merge. Iterate over variables in right but not in left.
BindingBuilder b = Binding.builder(bindingLeft);
for ( Iterator<Var> vIter = bindingRight.vars() ; vIter.hasNext() ; ) {
Var v = vIter.next();
Node n = bindingRight.get(v);
bindingRight.forEach((v, n) -> {
if ( !bindingLeft.contains(v) )
b.add(v, n);
}
});
return b.build();
}

public static boolean compatible(Binding bindingLeft, Binding bindingRight) {
// Test to see if compatible: Iterate over variables in left
for ( Iterator<Var> vIter = bindingLeft.vars() ; vIter.hasNext() ; ) {
Var v = vIter.next();
return compatible(bindingLeft, bindingRight, bindingLeft.vars());
}

/** Test to see if bindings are compatible for all variables of the provided iterator. */
public static boolean compatible(Binding bindingLeft, Binding bindingRight, Iterator<Var> vars) {
while (vars.hasNext() ) {
Var v = vars.next();
Node nLeft = bindingLeft.get(v);
Node nRight = bindingRight.get(v);
if ( nLeft == null ) {
continue;
}

Node nRight = bindingRight.get(v);
if ( nRight != null && !nRight.equals(nLeft) )
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,40 +18,65 @@

package org.apache.jena.sparql.algebra;

import java.util.Iterator;
import java.util.List ;

import org.apache.jena.graph.Node ;
import org.apache.jena.query.ResultSet;
import org.apache.jena.sparql.algebra.table.Table1 ;
import org.apache.jena.sparql.algebra.table.TableEmpty ;
import org.apache.jena.sparql.algebra.table.TableN ;
import org.apache.jena.sparql.algebra.table.TableUnit ;
import org.apache.jena.sparql.core.Var ;
import org.apache.jena.sparql.engine.QueryIterator ;
import org.apache.jena.sparql.engine.binding.Binding;
import org.apache.jena.sparql.exec.RowSet;

public class TableFactory
{
public static Table createUnit()
{ return new TableUnit() ; }

public static Table createEmpty()
{ return new TableEmpty() ; }

public static Table create()
{ return new TableN() ; }

public static Table create(List<Var> vars)
{ return new TableN(vars) ; }

public static Table create(QueryIterator queryIterator)
{
{
if ( queryIterator.isJoinIdentity() ) {
queryIterator.close();
return createUnit() ;
}

return new TableN(queryIterator) ;
}

public static Table create(Var var, Node value)
{ return new Table1(var, value) ; }

public static Table create(List<Var> vars, Iterable<Binding> bindings)
{ return create(vars, bindings.iterator()); }

public static Table create(List<Var> vars, Iterator<Binding> bindings) {
Table result = create(vars);
bindings.forEachRemaining(b -> result.addBinding(b.materialize()));
return result;
}

public static Table create(RowSet rs)
{
List<Var> vars = rs.getResultVars();
return create(vars, rs);
}

public static Table create(ResultSet rs)
{
RowSet rowSet = RowSet.adapt(rs);
return create(rowSet);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,4 +90,11 @@ public default boolean contains(String varName) {

@Override
public boolean equals(Object other);

/**
* Returns a materialized binding which is guaranteed to be independent of
* any resources such as an ongoing query execution.
* Returns the same binding if it is already materialized.
*/
public Binding materialize();
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,9 @@ protected void forEach1(BiConsumer<Var, Node> action) { }

@Override
protected Node get1(Var var) { return null; }

@Override
public Binding materialize() {
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,9 @@ protected Node get1(Var v) {
return value;
return null;
}

@Override
public Binding materialize() {
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,9 @@ protected Node get1(Var v)
return value2;
return null;
}

@Override
public Binding materialize() {
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -132,4 +132,9 @@ protected Node get1(Var var) {

return null;
}

@Override
public Binding materialize() {
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,9 @@ protected Node get1(Var var) {

return null;
}

@Override
public Binding materialize() {
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,4 +61,9 @@ protected int size1() {
protected boolean isEmpty1() {
return map.isEmpty();
}

@Override
public Binding materialize() {
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,12 @@ public BindingProject(Collection<Var> vars, Binding bind) {
protected boolean accept(Var var) {
return projectionVars.contains(var) ;
}

@Override
public Binding materialize() {
Binding materialized = binding.materialize();
return materialized == binding
? this
: new BindingProject(projectionVars, materialized);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,13 @@
import org.apache.jena.graph.Node ;
import org.apache.jena.sparql.core.Var ;

/** Common framework for projection;
/** Common framework for projection;
* the projection policy is provided by
* abstract method {@link #accept(Var)}
* abstract method {@link #accept(Var)}
*/
public abstract class BindingProjectBase extends BindingBase {
private List<Var> actualVars = null ;
private final Binding binding ;
protected final Binding binding ;

public BindingProjectBase(Binding bind) {
super(null) ;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,4 +33,12 @@ public BindingProjectNamed(Binding bind) {
protected boolean accept(Var var) {
return var.isNamedVar() ;
}

@Override
public Binding materialize() {
Binding materialized = binding.materialize();
return materialized == binding
? this
: new BindingProjectNamed(materialized);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.jena.sparql.engine.iterator;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
Expand All @@ -27,6 +29,7 @@
import org.apache.jena.atlas.lib.SetUtils;
import org.apache.jena.graph.Node;
import org.apache.jena.graph.Triple;
import org.apache.jena.sparql.algebra.Algebra;
import org.apache.jena.sparql.algebra.Op;
import org.apache.jena.sparql.algebra.Table;
import org.apache.jena.sparql.algebra.TransformCopy;
Expand Down Expand Up @@ -292,15 +295,31 @@ public Op transform(OpTable opTable) {
// By the assignment restriction, the binding only needs to be added to each row of the table.
Table table = opTable.getTable();
// Table vars.
List<Var> vars = new ArrayList<>(table.getVars());
binding.vars().forEachRemaining(vars::add);
List<Var> tableVars = table.getVars();
List<Var> vars = new ArrayList<>(tableVars);

// Track variables that appear both in the table and the binding.
List<Var> commonVars = new ArrayList<>();

// Index variables in a set if there are more than a few of them.
Collection<Var> tableVarsIndex = tableVars.size() > 4 ? new HashSet<>(tableVars) : tableVars;
binding.vars().forEachRemaining(v -> {
if (tableVarsIndex.contains(v)) {
commonVars.add(v);
} else {
vars.add(v);
}
});

TableN table2 = new TableN(vars);
BindingBuilder builder = BindingFactory.builder();
table.iterator(null).forEachRemaining(row->{
builder.reset();
builder.addAll(row);
builder.addAll(binding);
table2.addBinding(builder.build());
if (Algebra.compatible(row, binding, commonVars.iterator())) {
builder.reset();
builder.addAll(row);
binding.forEach(builder::set);
table2.addBinding(builder.build());
}
});
return OpTable.create(table2);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ public static QueryExecBuilder graph(Graph graph) {
return QueryExecDatasetBuilder.create().graph(graph);
}

/** Create a {@link QueryExecBuilder} initialized with an empty dataset. */
public static QueryExecBuilder emptyDataset() {
DatasetGraph empty = DatasetGraphFactory.create();
return dataset(empty);
}

/** Create a {@link QueryExecBuilder} for a remote endpoint. */
public static QueryExecBuilder service(String serviceURL) {
return QueryExecHTTPBuilder.create().endpoint(serviceURL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import org.apache.jena.query.ARQ;
import org.apache.jena.query.Query;
import org.apache.jena.query.Syntax;
import org.apache.jena.riot.rowset.RowSetOnClose;
import org.apache.jena.sparql.algebra.Table;
import org.apache.jena.sparql.algebra.TableFactory;
import org.apache.jena.sparql.core.Var;
import org.apache.jena.sparql.engine.binding.Binding;
import org.apache.jena.sparql.util.Context;
Expand Down Expand Up @@ -81,9 +84,16 @@ public default QueryExecBuilder substitution(String var, Node value) {

// build-and-use short cuts

/** Build and execute as a SELECT query. */
/**
* Build and execute as a SELECT query.
* The caller must eventually close the returned RowSet
* in order to free any associated resources.
* Use {@link #table()} to obtain an independent in-memory copy of the row set.
*/
public default RowSet select() {
return build().select();
QueryExec qExec = build();
RowSet core = qExec.select();
return new RowSetOnClose(core, qExec::close);
}

/** Build and execute as a CONSTRUCT query. */
Expand All @@ -106,4 +116,18 @@ public default boolean ask() {
return qExec.ask();
}
}

/**
* Build and execute as a SELECT query.
* Creates and returns an independent in-memory table by materializing the underlying row set.
* Subsequently, {@link Table#toRowSet()} can be used to obtain a fresh row set view over the table.
*/
public default Table table() {
Table result;
try (QueryExec qExec = build()) {
RowSet rowSet = qExec.select();
result = TableFactory.create(rowSet);
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,4 +93,8 @@ public default Stream<Binding> stream() {
public static RowSet create(QueryIterator qIter, List<Var> vars) {
return RowSetStream.create(vars, qIter);
}

default ResultSet toResultSet() {
return ResultSet.adapt(this);
}
}
Loading

0 comments on commit 2f7c4cf

Please sign in to comment.