From 1e3e28ec1ffef546413862e7ac2840fdfc3b8700 Mon Sep 17 00:00:00 2001 From: Jonas Arnhold Date: Wed, 15 May 2024 15:43:46 +0200 Subject: [PATCH 1/3] Make SQL executions async --- .../sql/conquery/SqlExecutionManager.java | 39 +++++++++++++------ 1 file changed, 28 insertions(+), 11 deletions(-) diff --git a/backend/src/main/java/com/bakdata/conquery/sql/conquery/SqlExecutionManager.java b/backend/src/main/java/com/bakdata/conquery/sql/conquery/SqlExecutionManager.java index fde5116efb..0ce3760a24 100644 --- a/backend/src/main/java/com/bakdata/conquery/sql/conquery/SqlExecutionManager.java +++ b/backend/src/main/java/com/bakdata/conquery/sql/conquery/SqlExecutionManager.java @@ -1,12 +1,17 @@ package com.bakdata.conquery.sql.conquery; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CompletableFuture; + import com.bakdata.conquery.io.storage.MetaStorage; import com.bakdata.conquery.models.datasets.Dataset; import com.bakdata.conquery.models.execution.ExecutionState; import com.bakdata.conquery.models.execution.InternalExecution; import com.bakdata.conquery.models.execution.ManagedExecution; import com.bakdata.conquery.models.forms.managed.ManagedInternalForm; +import com.bakdata.conquery.models.identifiable.ids.specific.ManagedExecutionId; import com.bakdata.conquery.models.query.ExecutionManager; import com.bakdata.conquery.models.query.ManagedQuery; import com.bakdata.conquery.models.worker.Namespace; @@ -22,29 +27,27 @@ public class SqlExecutionManager extends ExecutionManager { private final SqlExecutionService executionService; private final SqlConverter converter; + private final Map> executionResults; public SqlExecutionManager(final SqlContext context, SqlExecutionService sqlExecutionService, MetaStorage storage) { super(storage); - executionService = sqlExecutionService; - converter = new SqlConverter(context.getSqlDialect(), context.getConfig()); + this.executionService = sqlExecutionService; + this.converter = new SqlConverter(context.getSqlDialect(), context.getConfig()); + this.executionResults = new HashMap<>(); } @Override protected void doExecute(Namespace namespace, InternalExecution execution) { - // todo(tm): Non-blocking execution if (execution instanceof ManagedQuery managedQuery) { - SqlQuery sqlQuery = converter.convert(managedQuery.getQuery()); - SqlExecutionResult result = executionService.execute(sqlQuery); - addResult(managedQuery, result); - managedQuery.setLastResultCount(((long) result.getRowCount())); - managedQuery.finish(ExecutionState.DONE); + CompletableFuture sqlQueryExecution = executeAsync(managedQuery); + executionResults.put(managedQuery.getId(), sqlQueryExecution); return; } if (execution instanceof ManagedInternalForm managedForm) { - managedForm.getSubQueries().values().forEach(subQuery -> doExecute(namespace, subQuery)); - managedForm.finish(ExecutionState.DONE); + CompletableFuture.allOf(managedForm.getSubQueries().values().stream().map(this::executeAsync).toArray(CompletableFuture[]::new)) + .thenRun(() -> managedForm.finish(ExecutionState.DONE)); return; } @@ -53,7 +56,21 @@ protected void doExecute(Namespace namespace, InternalExecution execution) { @Override public void cancelQuery(Dataset dataset, ManagedExecution query) { - // unsupported for now + CompletableFuture sqlQueryExecution = executionResults.get(query.getId()); + if (!sqlQueryExecution.isCancelled()) { + sqlQueryExecution.cancel(true); + } + query.cancel(); + } + + private CompletableFuture executeAsync(ManagedQuery managedQuery) { + SqlQuery sqlQuery = converter.convert(managedQuery.getQuery()); + return CompletableFuture.supplyAsync(() -> executionService.execute(sqlQuery)) + .thenAccept(result -> { + addResult(managedQuery, result); + managedQuery.setLastResultCount(((long) result.getRowCount())); + managedQuery.finish(ExecutionState.DONE); + }); } } From 2d36448c19ca19e7d049ffed561ce006e112cf8b Mon Sep 17 00:00:00 2001 From: Jonas Arnhold Date: Thu, 16 May 2024 13:40:03 +0200 Subject: [PATCH 2/3] ... --- .../forms/managed/ManagedInternalForm.java | 2 ++ .../sql/conquery/SqlExecutionManager.java | 17 +++++++++++++---- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ManagedInternalForm.java b/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ManagedInternalForm.java index 9bd4581ae7..80df26046d 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ManagedInternalForm.java +++ b/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ManagedInternalForm.java @@ -135,6 +135,8 @@ protected void setAdditionalFieldsForStatusWithColumnDescription(Subject subject @Override public void cancel() { + // TODO This will cause an error when SqlExecutionManager calls cancelQuery() for a form. I wonder if we can just cancel all subqueries and remove this + // hard cast? Also, the DistributedExecutionManager#cancelQuery() will send the CancelQuery message anyway. log.debug("Sending cancel message to all workers."); ((DistributedNamespace) getNamespace()).getWorkerHandler().sendToAll(new CancelQuery(getId())); } diff --git a/backend/src/main/java/com/bakdata/conquery/sql/conquery/SqlExecutionManager.java b/backend/src/main/java/com/bakdata/conquery/sql/conquery/SqlExecutionManager.java index 0ce3760a24..a0c1616396 100644 --- a/backend/src/main/java/com/bakdata/conquery/sql/conquery/SqlExecutionManager.java +++ b/backend/src/main/java/com/bakdata/conquery/sql/conquery/SqlExecutionManager.java @@ -27,13 +27,13 @@ public class SqlExecutionManager extends ExecutionManager { private final SqlExecutionService executionService; private final SqlConverter converter; - private final Map> executionResults; + private final Map> runningExecutions; public SqlExecutionManager(final SqlContext context, SqlExecutionService sqlExecutionService, MetaStorage storage) { super(storage); this.executionService = sqlExecutionService; this.converter = new SqlConverter(context.getSqlDialect(), context.getConfig()); - this.executionResults = new HashMap<>(); + this.runningExecutions = new HashMap<>(); } @Override @@ -41,7 +41,7 @@ protected void doExecute(Namespace namespace, InternalExecution execution) { if (execution instanceof ManagedQuery managedQuery) { CompletableFuture sqlQueryExecution = executeAsync(managedQuery); - executionResults.put(managedQuery.getId(), sqlQueryExecution); + runningExecutions.put(managedQuery.getId(), sqlQueryExecution); return; } @@ -56,10 +56,18 @@ protected void doExecute(Namespace namespace, InternalExecution execution) { @Override public void cancelQuery(Dataset dataset, ManagedExecution query) { - CompletableFuture sqlQueryExecution = executionResults.get(query.getId()); + + CompletableFuture sqlQueryExecution = runningExecutions.remove(query.getId()); + + // already finished/canceled + if (sqlQueryExecution == null) { + return; + } + if (!sqlQueryExecution.isCancelled()) { sqlQueryExecution.cancel(true); } + query.cancel(); } @@ -70,6 +78,7 @@ private CompletableFuture executeAsync(ManagedQuery managedQuery) { addResult(managedQuery, result); managedQuery.setLastResultCount(((long) result.getRowCount())); managedQuery.finish(ExecutionState.DONE); + runningExecutions.remove(managedQuery.getId()); }); } From 701f8bda772cdbc2ef721f4b7898186f54d388d5 Mon Sep 17 00:00:00 2001 From: Jonas Arnhold Date: Fri, 17 May 2024 12:29:45 +0200 Subject: [PATCH 3/3] Stop sending CancelQuery message twice for forms --- .../conquery/models/forms/managed/ManagedInternalForm.java | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ManagedInternalForm.java b/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ManagedInternalForm.java index 80df26046d..237a3d4c2e 100644 --- a/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ManagedInternalForm.java +++ b/backend/src/main/java/com/bakdata/conquery/models/forms/managed/ManagedInternalForm.java @@ -21,7 +21,6 @@ import com.bakdata.conquery.models.identifiable.IdMap; import com.bakdata.conquery.models.identifiable.ids.specific.ManagedExecutionId; import com.bakdata.conquery.models.messages.namespaces.WorkerMessage; -import com.bakdata.conquery.models.messages.namespaces.specific.CancelQuery; import com.bakdata.conquery.models.messages.namespaces.specific.ExecuteForm; import com.bakdata.conquery.models.query.ColumnDescriptor; import com.bakdata.conquery.models.query.ManagedQuery; @@ -30,7 +29,6 @@ import com.bakdata.conquery.models.query.resultinfo.ResultInfo; import com.bakdata.conquery.models.query.results.EntityResult; import com.bakdata.conquery.models.query.results.FormShardResult; -import com.bakdata.conquery.models.worker.DistributedNamespace; import com.fasterxml.jackson.annotation.JacksonInject; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.OptBoolean; @@ -135,10 +133,7 @@ protected void setAdditionalFieldsForStatusWithColumnDescription(Subject subject @Override public void cancel() { - // TODO This will cause an error when SqlExecutionManager calls cancelQuery() for a form. I wonder if we can just cancel all subqueries and remove this - // hard cast? Also, the DistributedExecutionManager#cancelQuery() will send the CancelQuery message anyway. - log.debug("Sending cancel message to all workers."); - ((DistributedNamespace) getNamespace()).getWorkerHandler().sendToAll(new CancelQuery(getId())); + subQueries.values().forEach(ManagedQuery::cancel); } @Override