Skip to content

Commit

Permalink
IGNITE-23441 Sql. Cancellation of script execution (#4706)
Browse files Browse the repository at this point in the history
  • Loading branch information
xtern authored Nov 27, 2024
1 parent 974c601 commit e6b817b
Show file tree
Hide file tree
Showing 32 changed files with 509 additions and 104 deletions.
21 changes: 21 additions & 0 deletions modules/api/src/main/java/org/apache/ignite/sql/IgniteSql.java
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,16 @@ <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
*/
void executeScript(String query, @Nullable Object... arguments);

/**
* Executes a multi-statement SQL query.
*
* @param cancellationToken Cancellation token or {@code null}.
* @param query SQL query template.
* @param arguments Arguments for the template (optional).
* @throws SqlException If failed.
*/
void executeScript(@Nullable CancellationToken cancellationToken, String query, @Nullable Object... arguments);

/**
* Executes a multi-statement SQL query.
*
Expand All @@ -389,4 +399,15 @@ <T> CompletableFuture<AsyncResultSet<T>> executeAsync(
*/
CompletableFuture<Void> executeScriptAsync(String query, @Nullable Object... arguments);

/**
* Executes a multi-statement SQL query.
*
* @param cancellationToken Cancellation token or {@code null}.
* @param query SQL query template.
* @param arguments Arguments for the template (optional).
* @return Operation future.
* @throws SqlException If failed.
*/
CompletableFuture<Void> executeScriptAsync(@Nullable CancellationToken cancellationToken, String query, @Nullable Object... arguments);

}
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ public CompletableFuture<BatchedResult<T>> requestNextAsync(int rows) {

/** {@inheritDoc} */
@Override
public CompletableFuture<Void> closeAsync() {
return cur.closeAsync();
public CompletableFuture<Void> closeAsync(boolean cancelled) {
return cur.closeAsync(cancelled);
}

/** {@inheritDoc} */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,9 @@ public static CompletableFuture<Void> process(
HybridTimestamp clientTs = HybridTimestamp.nullableHybridTimestamp(in.unpackLong());
transactions.updateObservableTimestamp(clientTs);

// TODO https://issues.apache.org/jira/browse/IGNITE-23646 Pass cancellation token to the query processor.
return IgniteSqlImpl.executeScriptCore(
sql, transactions.observableTimestampTracker(), () -> true, () -> {}, script, arguments, props.toSqlProps()
sql, transactions.observableTimestampTracker(), () -> true, () -> {}, script, null, arguments, props.toSqlProps()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,13 @@ public void executeScript(String query, @Nullable Object... arguments) {
}
}

/** {@inheritDoc} */
@Override
public void executeScript(@Nullable CancellationToken cancellationToken, String query, @Nullable Object... arguments) {
// TODO https://issues.apache.org/jira/browse/IGNITE-23646 Support cancellation token.
throw new UnsupportedOperationException();
}

/** {@inheritDoc} */
@Override
public CompletableFuture<AsyncResultSet<SqlRow>> executeAsync(
Expand Down Expand Up @@ -335,6 +342,14 @@ public CompletableFuture<Void> executeScriptAsync(String query, @Nullable Object
return ch.serviceAsync(ClientOp.SQL_EXEC_SCRIPT, payloadWriter, null);
}

/** {@inheritDoc} */
@Override
public CompletableFuture<Void> executeScriptAsync(@Nullable CancellationToken cancellationToken, String query,
@Nullable Object... arguments) {
// TODO https://issues.apache.org/jira/browse/IGNITE-23646 Support cancellation token.
throw new UnsupportedOperationException();
}

private static void packProperties(
PayloadOutputChannel w,
@Nullable Map<String, Object> statementProps) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public class FakeCursor implements AsyncSqlCursor<InternalSqlRow> {
}

@Override
public CompletableFuture<Void> closeAsync() {
public CompletableFuture<Void> closeAsync(boolean cancelled) {
return nullCompletedFuture();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,17 @@ public interface AsyncCursor<T> {
*
* @return A future which will be completed when the resources will be actually released.
*/
CompletableFuture<Void> closeAsync();
default CompletableFuture<Void> closeAsync() {
return closeAsync(false);
}

/**
* Releases resources acquired by the cursor.
*
* @param cancelled Whether the cursor is closed due to query cancellation.
* @return A future which will be completed when the resources will be actually released.
*/
CompletableFuture<Void> closeAsync(boolean cancelled);

/**
* Batch of the items.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public CompletableFuture<BatchedResult<T>> requestNextAsync(int rows) {

/** {@inheritDoc} */
@Override
public CompletableFuture<Void> closeAsync() {
public CompletableFuture<Void> closeAsync(boolean ignore) {
if (!cancelled) {
synchronized (lock) {
if (!cancelled) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,22 @@ public void executeScript(String query, @Nullable Object... arguments) {
attachmentLock.consumeAttached(ignite -> ignite.sql().executeScript(query, arguments));
}

@Override
public void executeScript(@Nullable CancellationToken cancellationToken, String query, @Nullable Object... arguments) {
attachmentLock.consumeAttached(ignite -> ignite.sql().executeScript(cancellationToken, query, arguments));
}

@Override
public CompletableFuture<Void> executeScriptAsync(String query, @Nullable Object... arguments) {
return attachmentLock.attachedAsync(ignite -> ignite.sql().executeScriptAsync(query, arguments));
}

@Override
public CompletableFuture<Void> executeScriptAsync(@Nullable CancellationToken cancellationToken, String query,
@Nullable Object... arguments) {
return attachmentLock.attachedAsync(ignite -> ignite.sql().executeScriptAsync(cancellationToken, query, arguments));
}

@Override
public <T> T unwrap(Class<T> classToUnwrap) {
return attachmentLock.attached(ignite -> Wrappers.unwrap(ignite.sql(), classToUnwrap));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@
import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCause;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrowsWithCode;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotNull;
Expand All @@ -36,6 +39,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand All @@ -44,7 +48,10 @@
import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
import org.apache.ignite.internal.sql.ColumnMetadataImpl;
import org.apache.ignite.internal.sql.ColumnMetadataImpl.ColumnOriginImpl;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
import org.apache.ignite.internal.tx.TxManager;
import org.apache.ignite.lang.CancelHandle;
import org.apache.ignite.lang.CancellationToken;
import org.apache.ignite.lang.CursorClosedException;
import org.apache.ignite.lang.ErrorGroups.Common;
import org.apache.ignite.lang.ErrorGroups.Sql;
Expand All @@ -65,10 +72,12 @@
import org.apache.ignite.table.Table;
import org.apache.ignite.tx.Transaction;
import org.apache.ignite.tx.TransactionOptions;
import org.awaitility.Awaitility;
import org.hamcrest.Matcher;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.AssertionFailureBuilder;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
Expand Down Expand Up @@ -929,6 +938,51 @@ public void testQueryTimeout() {
});
}

@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-23693")
public void cancelScript() {
IgniteSql sql = igniteSql();

sql("CREATE TABLE test (id INT PRIMARY KEY);");

// DML is used because the cursor will be closed as soon as the first page is ready.
String script =
"INSERT INTO test SELECT x FROM system_range(0, 10000000000);"
+ "SELECT 1;";

CancelHandle cancelHandle = CancelHandle.create();
CancellationToken token = cancelHandle.token();

CompletableFuture<Void> scriptFut = IgniteTestUtils.runAsync(() -> executeScript(sql, token, script));

// Wait until FIRST script statement is started to execute.
Awaitility.await().untilAsserted(() -> assertThat(queryProcessor().runningQueries(), greaterThan(1)));

assertThat(scriptFut.isDone(), is(false));

String expectedErrMsg = "The query was cancelled while executing.";

cancelHandle.cancel();

assertThrowsSqlException(
Sql.EXECUTION_CANCELLED_ERR,
expectedErrMsg,
() -> IgniteTestUtils.await(scriptFut)
);

assertThat(queryProcessor().runningQueries(), is(0));
assertThat(txManager().pending(), is(0));

// Checks the exception that is thrown if a query is canceled before a cursor is obtained.
assertThrowsSqlException(
Sql.EXECUTION_CANCELLED_ERR,
expectedErrMsg,
() -> executeScript(sql, token, "SELECT 1; SELECT 2;")
);
assertThat(queryProcessor().runningQueries(), is(0));
assertThat(txManager().pending(), is(0));
}

@Test
public void testQueryTimeoutIsPropagatedFromTheServer() throws Exception {
Statement stmt = igniteSql().statementBuilder()
Expand Down Expand Up @@ -1041,6 +1095,8 @@ protected ResultProcessor execute(IgniteSql sql, String query, Object... args) {

protected abstract void executeScript(IgniteSql sql, String query, Object... args);

protected abstract void executeScript(IgniteSql sql, CancellationToken token, String query, Object... args);

protected abstract void rollback(Transaction outerTx);

protected abstract void commit(Transaction outerTx);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.ignite.internal.sql.api;

import static org.apache.ignite.internal.sql.engine.util.SqlTestUtils.expectQueryCancelled;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Expand Down Expand Up @@ -55,7 +56,6 @@
/**
* Tests for asynchronous SQL API.
*/
@SuppressWarnings("ThrowableNotThrown")
public class ItSqlAsynchronousApiTest extends ItSqlApiBaseTest {
@Test
public void pageSequence() {
Expand Down Expand Up @@ -120,6 +120,13 @@ public void cancelQueryString() throws InterruptedException {

return sql.executeAsync(transaction, token, query);
});

// Checks the exception that is thrown if a query is canceled before a cursor is obtained.
CancelHandle cancelHandle = CancelHandle.create();
CancellationToken token = cancelHandle.token();
cancelHandle.cancel();

expectQueryCancelled(() -> await(sql.executeAsync(null, token, "SELECT 1")));
}

@Test
Expand Down Expand Up @@ -221,6 +228,11 @@ protected void executeScript(IgniteSql sql, String query, Object... args) {
await(sql.executeScriptAsync(query, args));
}

@Override
protected void executeScript(IgniteSql sql, CancellationToken cancellationToken, String query, Object... args) {
await(sql.executeScriptAsync(cancellationToken, query, args));
}

@Override
protected void rollback(Transaction tx) {
await(tx.rollbackAsync());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

/**
* Tests for asynchronous client SQL API.
Expand All @@ -41,18 +42,27 @@ public void stopClient() {
client.close();
}

@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-23646")
@Override
public void cancelQueryString() throws InterruptedException {
super.cancelQueryString();
}

@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-23646")
@Override
public void cancelStatement() throws InterruptedException {
super.cancelStatement();
}

@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-23646")
@Override
public void cancelScript() {
super.cancelScript();
}

@Override
protected IgniteSql igniteSql() {
return client.sql();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

/**
* Tests for synchronous client SQL API.
Expand All @@ -41,18 +42,27 @@ public void stopClient() {
client.close();
}

@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-23646")
@Override
public void cancelQueryString() throws InterruptedException {
super.cancelQueryString();
}

@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-23646")
@Override
public void cancelStatement() throws InterruptedException {
super.cancelStatement();
}

@Test
@Disabled("https://issues.apache.org/jira/browse/IGNITE-23646")
@Override
public void cancelScript() {
super.cancelScript();
}

@Override
protected IgniteSql igniteSql() {
return client.sql();
Expand Down
Loading

0 comments on commit e6b817b

Please sign in to comment.