diff --git a/.codecov.yml b/.codecov.yml
index 7e4eb060..b36ce72b 100644
--- a/.codecov.yml
+++ b/.codecov.yml
@@ -11,4 +11,3 @@ coverage:
target: 80% # the required coverage value
threshold: 1% # the leniency in hitting the target
patch: off
-
diff --git a/.github/ISSUE_TEMPLATE/config.yml b/.github/ISSUE_TEMPLATE/config.yml
index a8199a10..67f6e2a3 100644
--- a/.github/ISSUE_TEMPLATE/config.yml
+++ b/.github/ISSUE_TEMPLATE/config.yml
@@ -4,4 +4,4 @@ contact_links:
about: Please ask and answer questions here.
- name: AWS/Amazon Security
url: https://aws.amazon.com/security/vulnerability-reporting/
- about: Please report security vulnerabilities here.
\ No newline at end of file
+ about: Please report security vulnerabilities here.
diff --git a/.github/dependabot.yml b/.github/dependabot.yml
index 9088426c..f8881e1f 100644
--- a/.github/dependabot.yml
+++ b/.github/dependabot.yml
@@ -15,4 +15,4 @@ updates:
schedule:
interval: "weekly"
commit-message:
- prefix: "dependabot:"
\ No newline at end of file
+ prefix: "dependabot:"
diff --git a/.github/workflows/add-untriaged.yml b/.github/workflows/add-untriaged.yml
index 9dcc7020..15b9a556 100644
--- a/.github/workflows/add-untriaged.yml
+++ b/.github/workflows/add-untriaged.yml
@@ -1,19 +1,19 @@
-name: Apply 'untriaged' label during issue lifecycle
-
-on:
- issues:
- types: [opened, reopened, transferred]
-
-jobs:
- apply-label:
- runs-on: ubuntu-latest
- steps:
- - uses: actions/github-script@v6
- with:
- script: |
- github.rest.issues.addLabels({
- issue_number: context.issue.number,
- owner: context.repo.owner,
- repo: context.repo.repo,
- labels: ['untriaged']
- })
+name: Apply 'untriaged' label during issue lifecycle
+
+on:
+ issues:
+ types: [opened, reopened, transferred]
+
+jobs:
+ apply-label:
+ runs-on: ubuntu-latest
+ steps:
+ - uses: actions/github-script@v6
+ with:
+ script: |
+ github.rest.issues.addLabels({
+ issue_number: context.issue.number,
+ owner: context.repo.owner,
+ repo: context.repo.repo,
+ labels: ['untriaged']
+ })
diff --git a/.github/workflows/auto-release.yml b/.github/workflows/auto-release.yml
index ad9a25b3..90873741 100644
--- a/.github/workflows/auto-release.yml
+++ b/.github/workflows/auto-release.yml
@@ -30,4 +30,4 @@ jobs:
- uses: ncipollo/release-action@v1
with:
github_token: ${{ steps.github_app_token.outputs.token }}
- bodyFile: release-notes/opensearch-asynchronous-search.release-notes-${{steps.tag.outputs.tag}}.md
\ No newline at end of file
+ bodyFile: release-notes/opensearch-asynchronous-search.release-notes-${{steps.tag.outputs.tag}}.md
diff --git a/.github/workflows/release-workflow.yml b/.github/workflows/release-workflow.yml
index aa6d8b71..7d6a57ca 100644
--- a/.github/workflows/release-workflow.yml
+++ b/.github/workflows/release-workflow.yml
@@ -122,4 +122,3 @@ jobs:
with:
name: asynchronous-search-plugin
path: asynchronous-search-artifacts
-
diff --git a/MAINTAINERS.md b/MAINTAINERS.md
index d95ff7e5..5ebd59a4 100644
--- a/MAINTAINERS.md
+++ b/MAINTAINERS.md
@@ -23,4 +23,3 @@ This document contains a list of maintainers in this repo. See [opensearch-proje
| Maintainer | GitHub ID | Affiliation |
|-------------| ------------------------------------------ | ----------- |
| Anshul Agarwal | [anshul291995](https://github.com/anshul291995) | Amazon |
-
diff --git a/README.md b/README.md
index 529a97d6..13bce38e 100644
--- a/README.md
+++ b/README.md
@@ -2,7 +2,7 @@
[![codecov](https://codecov.io/gh/opensearch-project/asynchronous-search/branch/main/graph/badge.svg)](https://codecov.io/gh/opensearch-project/asynchronous-search)
# Asynchronous search
-Asynchronous search makes it possible for users to run such queries without worrying about the query timing out.
+Asynchronous search makes it possible for users to run such queries without worrying about the query timing out.
These queries run in the background, and users can track the progress, and retrieve partial results as they become available.
The asynchronous search plugin supports the below operations
@@ -48,12 +48,12 @@ GET /_plugins/_asynchronous_search/stats
## Setup
1. Check out this package from version control.
-2. Launch Intellij IDEA, choose **Import Project**, and select the `settings.gradle` file in the root of this package.
+2. Launch Intellij IDEA, choose **Import Project**, and select the `settings.gradle` file in the root of this package.
3. To build from the command line, set `JAVA_HOME` to point to a JDK >= 8 before running `./gradlew`.
- Unix System
1. `export JAVA_HOME=jdk-install-dir`: Replace `jdk-install-dir` with the JAVA_HOME directory of your system.
2. `export PATH=$JAVA_HOME/bin:$PATH`
-
+
- Windows System
1. Find **My Computers** from file directory, right click and select **properties**.
2. Select the **Advanced** tab, select **Environment variables**.
@@ -91,7 +91,7 @@ When launching a cluster using one of the above commands, logs are placed in `bu
### Debugging
-Sometimes it is useful to attach a debugger to either the OpenSearch cluster or the integ tests to see what's going on. When running unit tests, hit **Debug** from the IDE's gutter to debug the tests. For the OpenSearch cluster or the integ tests, first, make sure start a debugger listening on port `5005`.
+Sometimes it is useful to attach a debugger to either the OpenSearch cluster or the integ tests to see what's going on. When running unit tests, hit **Debug** from the IDE's gutter to debug the tests. For the OpenSearch cluster or the integ tests, first, make sure start a debugger listening on port `5005`.
To debug the server code, run:
diff --git a/RELEASING.md b/RELEASING.md
index 6903e716..91263ac4 100644
--- a/RELEASING.md
+++ b/RELEASING.md
@@ -1 +1 @@
-This project follows the [OpenSearch release process](https://github.com/opensearch-project/.github/blob/main/RELEASING.md).
\ No newline at end of file
+This project follows the [OpenSearch release process](https://github.com/opensearch-project/.github/blob/main/RELEASING.md).
diff --git a/build.gradle b/build.gradle
index fa90b1fd..38c86daa 100644
--- a/build.gradle
+++ b/build.gradle
@@ -45,6 +45,7 @@ plugins {
id "de.undercouch.download" version "5.3.0"
id 'com.netflix.nebula.ospackage' version "11.10.0"
id 'checkstyle'
+ id "com.diffplug.spotless" version "6.25.0"
}
repositories {
@@ -60,6 +61,7 @@ apply plugin: 'opensearch.opensearchplugin'
apply plugin: 'opensearch.testclusters'
apply plugin: 'opensearch.rest-test'
apply plugin: 'opensearch.pluginzip'
+apply from: 'gradle/formatting.gradle'
checkstyle {
diff --git a/formatter/formatterConfig.xml b/formatter/formatterConfig.xml
new file mode 100644
index 00000000..b0e1eccc
--- /dev/null
+++ b/formatter/formatterConfig.xml
@@ -0,0 +1,362 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/formatter/license-header.txt b/formatter/license-header.txt
new file mode 100644
index 00000000..cf0a0684
--- /dev/null
+++ b/formatter/license-header.txt
@@ -0,0 +1,8 @@
+/*
+ * Copyright OpenSearch Contributors
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ */
diff --git a/gradle/formatting.gradle b/gradle/formatting.gradle
new file mode 100644
index 00000000..4b9714dc
--- /dev/null
+++ b/gradle/formatting.gradle
@@ -0,0 +1,48 @@
+/*
+ * SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
+ *
+ * Modifications Copyright OpenSearch Contributors. See
+ * GitHub history for details.
+ */
+
+allprojects {
+ project.apply plugin: "com.diffplug.spotless"
+ spotless {
+ java {
+ // Normally this isn't necessary, but we have Java sources in
+ // non-standard places
+ target '**/*.java'
+
+ removeUnusedImports()
+ eclipse().configFile rootProject.file('formatter/formatterConfig.xml')
+ trimTrailingWhitespace()
+ endWithNewline();
+
+ custom 'Refuse wildcard imports', {
+ // Wildcard imports can't be resolved; fail the build
+ if (it =~ /\s+import .*\*;/) {
+ throw new AssertionError("Do not use wildcard imports. 'spotlessApply' cannot resolve this issue.")
+ }
+ }
+
+ // See DEVELOPER_GUIDE.md for details of when to enable this.
+ if (System.getProperty('spotless.paddedcell') != null) {
+ paddedCell()
+ }
+ }
+ format 'misc', {
+ target '*.md', '*.gradle', '**/*.json', '**/*.yaml', '**/*.yml', '**/*.svg'
+
+ trimTrailingWhitespace()
+ endWithNewline()
+ }
+ format("license", {
+ licenseHeaderFile("${rootProject.file("formatter/license-header.txt")}", "package ");
+ target("src/*/java/**/*.java")
+ })
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/org/opensearch/search/asynchronous/action/AsynchronousSearchStatsAction.java b/src/main/java/org/opensearch/search/asynchronous/action/AsynchronousSearchStatsAction.java
index 8a9bc1d0..38b85868 100644
--- a/src/main/java/org/opensearch/search/asynchronous/action/AsynchronousSearchStatsAction.java
+++ b/src/main/java/org/opensearch/search/asynchronous/action/AsynchronousSearchStatsAction.java
@@ -1,8 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
-
package org.opensearch.search.asynchronous.action;
import org.opensearch.search.asynchronous.response.AsynchronousSearchStatsResponse;
@@ -11,7 +14,6 @@
public class AsynchronousSearchStatsAction extends ActionType {
-
public static final AsynchronousSearchStatsAction INSTANCE = new AsynchronousSearchStatsAction();
public static final String NAME = "cluster:admin/opendistro/asynchronous_search/stats";
diff --git a/src/main/java/org/opensearch/search/asynchronous/action/DeleteAsynchronousSearchAction.java b/src/main/java/org/opensearch/search/asynchronous/action/DeleteAsynchronousSearchAction.java
index c788462e..a59b09f0 100644
--- a/src/main/java/org/opensearch/search/asynchronous/action/DeleteAsynchronousSearchAction.java
+++ b/src/main/java/org/opensearch/search/asynchronous/action/DeleteAsynchronousSearchAction.java
@@ -1,8 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
-
package org.opensearch.search.asynchronous.action;
import org.opensearch.search.asynchronous.response.AcknowledgedResponse;
diff --git a/src/main/java/org/opensearch/search/asynchronous/action/GetAsynchronousSearchAction.java b/src/main/java/org/opensearch/search/asynchronous/action/GetAsynchronousSearchAction.java
index f654747e..f9fa42c9 100644
--- a/src/main/java/org/opensearch/search/asynchronous/action/GetAsynchronousSearchAction.java
+++ b/src/main/java/org/opensearch/search/asynchronous/action/GetAsynchronousSearchAction.java
@@ -1,8 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
-
package org.opensearch.search.asynchronous.action;
import org.opensearch.search.asynchronous.response.AsynchronousSearchResponse;
diff --git a/src/main/java/org/opensearch/search/asynchronous/action/SubmitAsynchronousSearchAction.java b/src/main/java/org/opensearch/search/asynchronous/action/SubmitAsynchronousSearchAction.java
index 46d282a6..1f84040f 100644
--- a/src/main/java/org/opensearch/search/asynchronous/action/SubmitAsynchronousSearchAction.java
+++ b/src/main/java/org/opensearch/search/asynchronous/action/SubmitAsynchronousSearchAction.java
@@ -1,8 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
-
package org.opensearch.search.asynchronous.action;
import org.opensearch.search.asynchronous.response.AsynchronousSearchResponse;
diff --git a/src/main/java/org/opensearch/search/asynchronous/context/AsynchronousSearchContext.java b/src/main/java/org/opensearch/search/asynchronous/context/AsynchronousSearchContext.java
index 26342e50..2a268ebd 100644
--- a/src/main/java/org/opensearch/search/asynchronous/context/AsynchronousSearchContext.java
+++ b/src/main/java/org/opensearch/search/asynchronous/context/AsynchronousSearchContext.java
@@ -1,8 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
-
package org.opensearch.search.asynchronous.context;
import org.opensearch.commons.authuser.User;
@@ -15,7 +18,6 @@
import java.util.function.LongSupplier;
-
/**
* Wrapper around information that needs to stay around when an asynchronous search has been submitted.
* This class encapsulates the details of the various elements pertaining to an asynchronous search, including the
@@ -35,8 +37,7 @@ public AsynchronousSearchContext(AsynchronousSearchContextId asynchronousSearchC
this.currentTimeSupplier = currentTimeSupplier;
}
- public @Nullable
- AsynchronousSearchProgressListener getAsynchronousSearchProgressListener() {
+ public @Nullable AsynchronousSearchProgressListener getAsynchronousSearchProgressListener() {
return asynchronousSearchProgressListener;
}
@@ -58,22 +59,25 @@ public AsynchronousSearchContextId getContextId() {
public abstract long getStartTimeMillis();
- public abstract @Nullable
- SearchResponse getSearchResponse();
+ public abstract @Nullable SearchResponse getSearchResponse();
- public abstract @Nullable
- Exception getSearchError();
+ public abstract @Nullable Exception getSearchError();
- public abstract @Nullable
- User getUser();
+ public abstract @Nullable User getUser();
public boolean isExpired() {
return getExpirationTimeMillis() < currentTimeSupplier.getAsLong();
}
public AsynchronousSearchResponse getAsynchronousSearchResponse() {
- return new AsynchronousSearchResponse(getAsynchronousSearchId(), getAsynchronousSearchState(), getStartTimeMillis(),
- getExpirationTimeMillis(), getSearchResponse(), getSearchError());
+ return new AsynchronousSearchResponse(
+ getAsynchronousSearchId(),
+ getAsynchronousSearchState(),
+ getStartTimeMillis(),
+ getExpirationTimeMillis(),
+ getSearchResponse(),
+ getSearchError()
+ );
}
public void setState(AsynchronousSearchState targetState) {
diff --git a/src/main/java/org/opensearch/search/asynchronous/context/AsynchronousSearchContextId.java b/src/main/java/org/opensearch/search/asynchronous/context/AsynchronousSearchContextId.java
index 3669f5c3..369674cd 100644
--- a/src/main/java/org/opensearch/search/asynchronous/context/AsynchronousSearchContextId.java
+++ b/src/main/java/org/opensearch/search/asynchronous/context/AsynchronousSearchContextId.java
@@ -1,8 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
-
package org.opensearch.search.asynchronous.context;
import org.opensearch.core.common.io.stream.StreamInput;
diff --git a/src/main/java/org/opensearch/search/asynchronous/context/active/AsynchronousSearchActiveContext.java b/src/main/java/org/opensearch/search/asynchronous/context/active/AsynchronousSearchActiveContext.java
index aee08a7b..776cefea 100644
--- a/src/main/java/org/opensearch/search/asynchronous/context/active/AsynchronousSearchActiveContext.java
+++ b/src/main/java/org/opensearch/search/asynchronous/context/active/AsynchronousSearchActiveContext.java
@@ -1,8 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
-
package org.opensearch.search.asynchronous.context.active;
import org.opensearch.commons.authuser.User;
@@ -56,11 +59,17 @@ public class AsynchronousSearchActiveContext extends AsynchronousSearchContext i
@Nullable
private final User user;
- public AsynchronousSearchActiveContext(AsynchronousSearchContextId asynchronousSearchContextId, String nodeId,
- TimeValue keepAlive, boolean keepOnCompletion, ThreadPool threadPool,
- LongSupplier currentTimeSupplier,
- AsynchronousSearchProgressListener asynchronousSearchProgressListener, @Nullable User user,
- Supplier persistSearchFailureSupplier) {
+ public AsynchronousSearchActiveContext(
+ AsynchronousSearchContextId asynchronousSearchContextId,
+ String nodeId,
+ TimeValue keepAlive,
+ boolean keepOnCompletion,
+ ThreadPool threadPool,
+ LongSupplier currentTimeSupplier,
+ AsynchronousSearchProgressListener asynchronousSearchProgressListener,
+ @Nullable User user,
+ Supplier persistSearchFailureSupplier
+ ) {
super(asynchronousSearchContextId, currentTimeSupplier);
this.keepOnCompletion = keepOnCompletion;
this.error = new SetOnce<>();
@@ -73,8 +82,9 @@ public AsynchronousSearchActiveContext(AsynchronousSearchContextId asynchronousS
this.asynchronousSearchId = new SetOnce<>();
this.completed = new AtomicBoolean(false);
this.closed = new AtomicBoolean(false);
- this.asynchronousSearchContextPermits = keepOnCompletion ? new AsynchronousSearchContextPermits(asynchronousSearchContextId,
- threadPool) : new NoopAsynchronousSearchContextPermits(asynchronousSearchContextId);
+ this.asynchronousSearchContextPermits = keepOnCompletion
+ ? new AsynchronousSearchContextPermits(asynchronousSearchContextId, threadPool)
+ : new NoopAsynchronousSearchContextPermits(asynchronousSearchContextId);
this.user = user;
this.persistSearchFailureSupplier = persistSearchFailureSupplier;
}
@@ -87,8 +97,9 @@ public void setTask(SearchTask searchTask) {
this.searchTask.set(searchTask);
this.startTimeMillis = searchTask.getStartTime();
this.expirationTimeMillis = startTimeMillis + keepAlive.getMillis();
- this.asynchronousSearchId.set(AsynchronousSearchIdConverter.buildAsyncId(new AsynchronousSearchId(nodeId, searchTask.getId(),
- getContextId())));
+ this.asynchronousSearchId.set(
+ AsynchronousSearchIdConverter.buildAsyncId(new AsynchronousSearchId(nodeId, searchTask.getId(), getContextId()))
+ );
}
public void processSearchFailure(Exception e) {
diff --git a/src/main/java/org/opensearch/search/asynchronous/context/active/AsynchronousSearchActiveStore.java b/src/main/java/org/opensearch/search/asynchronous/context/active/AsynchronousSearchActiveStore.java
index 2d6655de..598adaeb 100644
--- a/src/main/java/org/opensearch/search/asynchronous/context/active/AsynchronousSearchActiveStore.java
+++ b/src/main/java/org/opensearch/search/asynchronous/context/active/AsynchronousSearchActiveStore.java
@@ -1,8 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
-
package org.opensearch.search.asynchronous.context.active;
import org.apache.logging.log4j.LogManager;
@@ -25,38 +28,46 @@
import static org.opensearch.common.util.concurrent.ConcurrentCollections.newConcurrentMapLongWithAggressiveConcurrency;
-
public class AsynchronousSearchActiveStore {
private static Logger logger = LogManager.getLogger(AsynchronousSearchActiveStore.class);
private volatile int nodeConcurrentRunningSearches;
public static final int NODE_CONCURRENT_RUNNING_SEARCHES = 20;
public static final Setting NODE_CONCURRENT_RUNNING_SEARCHES_SETTING = Setting.intSetting(
- "plugins.asynchronous_search.node_concurrent_running_searches",
- LegacyOpendistroAsynchronousSearchSettings.NODE_CONCURRENT_RUNNING_SEARCHES_SETTING, 0,
- Setting.Property.Dynamic, Setting.Property.NodeScope);
+ "plugins.asynchronous_search.node_concurrent_running_searches",
+ LegacyOpendistroAsynchronousSearchSettings.NODE_CONCURRENT_RUNNING_SEARCHES_SETTING,
+ 0,
+ Setting.Property.Dynamic,
+ Setting.Property.NodeScope
+ );
private final ConcurrentMapLong activeContexts = newConcurrentMapLongWithAggressiveConcurrency();
public AsynchronousSearchActiveStore(ClusterService clusterService) {
Settings settings = clusterService.getSettings();
nodeConcurrentRunningSearches = NODE_CONCURRENT_RUNNING_SEARCHES_SETTING.get(settings);
- clusterService.getClusterSettings().addSettingsUpdateConsumer(NODE_CONCURRENT_RUNNING_SEARCHES_SETTING,
- this::setNodeConcurrentRunningSearches);
+ clusterService.getClusterSettings()
+ .addSettingsUpdateConsumer(NODE_CONCURRENT_RUNNING_SEARCHES_SETTING, this::setNodeConcurrentRunningSearches);
}
private void setNodeConcurrentRunningSearches(int nodeConcurrentRunningSearches) {
this.nodeConcurrentRunningSearches = nodeConcurrentRunningSearches;
}
- public synchronized void putContext(AsynchronousSearchContextId asynchronousSearchContextId,
- AsynchronousSearchActiveContext asynchronousSearchContext,
- Consumer contextRejectionEventConsumer) {
+ public synchronized void putContext(
+ AsynchronousSearchContextId asynchronousSearchContextId,
+ AsynchronousSearchActiveContext asynchronousSearchContext,
+ Consumer contextRejectionEventConsumer
+ ) {
if (activeContexts.size() >= nodeConcurrentRunningSearches) {
contextRejectionEventConsumer.accept(asynchronousSearchContextId);
- throw new OpenSearchRejectedExecutionException("Trying to create too many concurrent searches. Must be less than or equal to: ["
- + nodeConcurrentRunningSearches + "]. This limit can be set by changing the ["
- + NODE_CONCURRENT_RUNNING_SEARCHES_SETTING.getKey() + "] settings.");
+ throw new OpenSearchRejectedExecutionException(
+ "Trying to create too many concurrent searches. Must be less than or equal to: ["
+ + nodeConcurrentRunningSearches
+ + "]. This limit can be set by changing the ["
+ + NODE_CONCURRENT_RUNNING_SEARCHES_SETTING.getKey()
+ + "] settings."
+ );
}
activeContexts.put(asynchronousSearchContextId.getId(), asynchronousSearchContext);
}
@@ -72,7 +83,6 @@ public Optional getContext(AsynchronousSearchCo
return Optional.empty();
}
-
public Map getAllContexts() {
return CollectionUtils.copyMap(activeContexts);
}
@@ -96,20 +106,18 @@ public boolean freeContext(AsynchronousSearchContextId asynchronousSearchContext
}
private static boolean calledFromAsynchronousSearchStateMachine() {
- return Stream.of(Thread.currentThread().getStackTrace()).
- skip(1). //skip getStackTrace
- limit(10). //limit depth of analysis to 10 frames, it should be enough
- anyMatch(f ->
- {
- try {
- boolean isTestMethodInvocation = f.getClassName().contains("AsynchronousSearchActiveStoreTests");
- boolean isStateMachineTriggerMethodInvocation = AsynchronousSearchStateMachine.class
- .isAssignableFrom(Class.forName(f.getClassName())) && f.getMethodName().equals("trigger");
- return isTestMethodInvocation || isStateMachineTriggerMethodInvocation;
- } catch (Exception ignored) {
- return false;
- }
+ return Stream.of(Thread.currentThread().getStackTrace()).skip(1). // skip getStackTrace
+ limit(10). // limit depth of analysis to 10 frames, it should be enough
+ anyMatch(f -> {
+ try {
+ boolean isTestMethodInvocation = f.getClassName().contains("AsynchronousSearchActiveStoreTests");
+ boolean isStateMachineTriggerMethodInvocation = AsynchronousSearchStateMachine.class.isAssignableFrom(
+ Class.forName(f.getClassName())
+ ) && f.getMethodName().equals("trigger");
+ return isTestMethodInvocation || isStateMachineTriggerMethodInvocation;
+ } catch (Exception ignored) {
+ return false;
}
- );
+ });
}
}
diff --git a/src/main/java/org/opensearch/search/asynchronous/context/active/AsynchronousSearchContextClosedException.java b/src/main/java/org/opensearch/search/asynchronous/context/active/AsynchronousSearchContextClosedException.java
index 2a060a95..203f5cd8 100644
--- a/src/main/java/org/opensearch/search/asynchronous/context/active/AsynchronousSearchContextClosedException.java
+++ b/src/main/java/org/opensearch/search/asynchronous/context/active/AsynchronousSearchContextClosedException.java
@@ -1,8 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
-
package org.opensearch.search.asynchronous.context.active;
import org.opensearch.search.asynchronous.context.AsynchronousSearchContextId;
diff --git a/src/main/java/org/opensearch/search/asynchronous/context/permits/AsynchronousSearchContextPermits.java b/src/main/java/org/opensearch/search/asynchronous/context/permits/AsynchronousSearchContextPermits.java
index 66acadc8..786a1549 100644
--- a/src/main/java/org/opensearch/search/asynchronous/context/permits/AsynchronousSearchContextPermits.java
+++ b/src/main/java/org/opensearch/search/asynchronous/context/permits/AsynchronousSearchContextPermits.java
@@ -1,8 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
-
package org.opensearch.search.asynchronous.context.permits;
import org.opensearch.search.asynchronous.context.AsynchronousSearchContext;
@@ -48,15 +51,18 @@ public AsynchronousSearchContextPermits(AsynchronousSearchContextId asynchronous
this.semaphore = new Semaphore(TOTAL_PERMITS, true);
}
- public AsynchronousSearchContextPermits(AsynchronousSearchContextId asynchronousSearchContextId, ThreadPool threadPool,
- Semaphore semaphore) {
+ public AsynchronousSearchContextPermits(
+ AsynchronousSearchContextId asynchronousSearchContextId,
+ ThreadPool threadPool,
+ Semaphore semaphore
+ ) {
this.asynchronousSearchContextId = asynchronousSearchContextId;
this.threadPool = threadPool;
this.semaphore = semaphore;
}
private Releasable acquirePermits(int permits, TimeValue timeout, final String details) throws AsynchronousSearchContextClosedException,
- TimeoutException {
+ TimeoutException {
RunOnce release = new RunOnce(() -> {});
if (closed) {
logger.debug("Trying to acquire permit for closed context [{}]", asynchronousSearchContextId);
@@ -67,18 +73,28 @@ private Releasable acquirePermits(int permits, TimeValue timeout, final String d
this.lockDetails = details;
release = new RunOnce(() -> {
logger.debug("Releasing permit(s) [{}] with reason [{}]", permits, lockDetails);
- semaphore.release(permits);});
+ semaphore.release(permits);
+ });
if (closed) {
release.run();
logger.debug("Trying to acquire permit for closed context [{}]", asynchronousSearchContextId);
- throw new AsynchronousSearchContextClosedException( asynchronousSearchContextId);
+ throw new AsynchronousSearchContextClosedException(asynchronousSearchContextId);
}
return release::run;
} else {
- throw new TimeoutException("obtaining context lock" + asynchronousSearchContextId + "timed out after " +
- timeout.getMillis() + "ms, previous lock details: [" + lockDetails + "] trying to lock for [" + details + "]");
+ throw new TimeoutException(
+ "obtaining context lock"
+ + asynchronousSearchContextId
+ + "timed out after "
+ + timeout.getMillis()
+ + "ms, previous lock details: ["
+ + lockDetails
+ + "] trying to lock for ["
+ + details
+ + "]"
+ );
}
- } catch (InterruptedException e ) {
+ } catch (InterruptedException e) {
Thread.currentThread().interrupt();
release.run();
throw new RuntimeException("thread interrupted while trying to obtain context lock", e);
@@ -89,8 +105,7 @@ private void asyncAcquirePermit(int permits, final ActionListener on
threadPool.executor(AsynchronousSearchPlugin.OPEN_DISTRO_ASYNC_SEARCH_GENERIC_THREAD_POOL_NAME).execute(new AbstractRunnable() {
@Override
public void onFailure(final Exception e) {
- logger.debug(() -> new ParameterizedMessage("Failed to acquire permit [{}] for [{}]",
- permits, reason), e);
+ logger.debug(() -> new ParameterizedMessage("Failed to acquire permit [{}] for [{}]", permits, reason), e);
onAcquired.onFailure(e);
}
diff --git a/src/main/java/org/opensearch/search/asynchronous/context/permits/NoopAsynchronousSearchContextPermits.java b/src/main/java/org/opensearch/search/asynchronous/context/permits/NoopAsynchronousSearchContextPermits.java
index 40c004c4..f7d90a2a 100644
--- a/src/main/java/org/opensearch/search/asynchronous/context/permits/NoopAsynchronousSearchContextPermits.java
+++ b/src/main/java/org/opensearch/search/asynchronous/context/permits/NoopAsynchronousSearchContextPermits.java
@@ -1,8 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
-
package org.opensearch.search.asynchronous.context.permits;
import org.opensearch.search.asynchronous.context.AsynchronousSearchContextId;
diff --git a/src/main/java/org/opensearch/search/asynchronous/context/persistence/AsynchronousSearchPersistenceContext.java b/src/main/java/org/opensearch/search/asynchronous/context/persistence/AsynchronousSearchPersistenceContext.java
index de4ae17d..0229f1d0 100644
--- a/src/main/java/org/opensearch/search/asynchronous/context/persistence/AsynchronousSearchPersistenceContext.java
+++ b/src/main/java/org/opensearch/search/asynchronous/context/persistence/AsynchronousSearchPersistenceContext.java
@@ -1,8 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
-
package org.opensearch.search.asynchronous.context.persistence;
import org.opensearch.commons.authuser.User;
@@ -34,10 +37,13 @@ public class AsynchronousSearchPersistenceContext extends AsynchronousSearchCont
private final AsynchronousSearchPersistenceModel asynchronousSearchPersistenceModel;
private final NamedWriteableRegistry namedWriteableRegistry;
- public AsynchronousSearchPersistenceContext(String asynchronousSearchId, AsynchronousSearchContextId asynchronousSearchContextId,
- AsynchronousSearchPersistenceModel asynchronousSearchPersistenceModel,
- LongSupplier currentTimeSupplier,
- NamedWriteableRegistry namedWriteableRegistry) {
+ public AsynchronousSearchPersistenceContext(
+ String asynchronousSearchId,
+ AsynchronousSearchContextId asynchronousSearchContextId,
+ AsynchronousSearchPersistenceModel asynchronousSearchPersistenceModel,
+ LongSupplier currentTimeSupplier,
+ NamedWriteableRegistry namedWriteableRegistry
+ ) {
super(asynchronousSearchContextId, currentTimeSupplier);
Objects.requireNonNull(asynchronousSearchId);
Objects.requireNonNull(asynchronousSearchContextId);
@@ -76,16 +82,26 @@ public SearchResponse getSearchResponse() {
if (asynchronousSearchPersistenceModel.getResponse() == null) {
return null;
} else {
- BytesReference bytesReference =
- BytesReference.fromByteBuffer(ByteBuffer.wrap(Base64.getUrlDecoder().decode(
- asynchronousSearchPersistenceModel.getResponse())));
- try (NamedWriteableAwareStreamInput wrapperStreamInput = new NamedWriteableAwareStreamInput(bytesReference.streamInput(),
- namedWriteableRegistry)) {
+ BytesReference bytesReference = BytesReference.fromByteBuffer(
+ ByteBuffer.wrap(Base64.getUrlDecoder().decode(asynchronousSearchPersistenceModel.getResponse()))
+ );
+ try (
+ NamedWriteableAwareStreamInput wrapperStreamInput = new NamedWriteableAwareStreamInput(
+ bytesReference.streamInput(),
+ namedWriteableRegistry
+ )
+ ) {
wrapperStreamInput.setVersion(wrapperStreamInput.readVersion());
return new SearchResponse(wrapperStreamInput);
} catch (IOException e) {
- logger.error(() -> new ParameterizedMessage("Failed to parse search response for asynchronous search [{}] Response : [{}] ",
- asynchronousSearchId, asynchronousSearchPersistenceModel.getResponse()), e);
+ logger.error(
+ () -> new ParameterizedMessage(
+ "Failed to parse search response for asynchronous search [{}] Response : [{}] ",
+ asynchronousSearchId,
+ asynchronousSearchPersistenceModel.getResponse()
+ ),
+ e
+ );
return null;
}
}
@@ -96,16 +112,26 @@ public Exception getSearchError() {
if (asynchronousSearchPersistenceModel.getError() == null) {
return null;
}
- BytesReference bytesReference =
- BytesReference.fromByteBuffer(ByteBuffer.wrap(Base64.getUrlDecoder()
- .decode(asynchronousSearchPersistenceModel.getError())));
- try (NamedWriteableAwareStreamInput wrapperStreamInput = new NamedWriteableAwareStreamInput(bytesReference.streamInput(),
- namedWriteableRegistry)) {
+ BytesReference bytesReference = BytesReference.fromByteBuffer(
+ ByteBuffer.wrap(Base64.getUrlDecoder().decode(asynchronousSearchPersistenceModel.getError()))
+ );
+ try (
+ NamedWriteableAwareStreamInput wrapperStreamInput = new NamedWriteableAwareStreamInput(
+ bytesReference.streamInput(),
+ namedWriteableRegistry
+ )
+ ) {
wrapperStreamInput.setVersion(wrapperStreamInput.readVersion());
return wrapperStreamInput.readException();
} catch (IOException e) {
- logger.error(() -> new ParameterizedMessage("Failed to parse search error for asynchronous search [{}] Error : [{}] ",
- asynchronousSearchId, asynchronousSearchPersistenceModel.getResponse()), e);
+ logger.error(
+ () -> new ParameterizedMessage(
+ "Failed to parse search error for asynchronous search [{}] Error : [{}] ",
+ asynchronousSearchId,
+ asynchronousSearchPersistenceModel.getResponse()
+ ),
+ e
+ );
return null;
}
}
@@ -127,14 +153,11 @@ public int hashCode() {
@Override
public boolean equals(Object o) {
- if (this == o)
- return true;
- if (o == null || getClass() != o.getClass())
- return false;
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
AsynchronousSearchPersistenceContext asynchronousSearchPersistenceContext = (AsynchronousSearchPersistenceContext) o;
- return asynchronousSearchPersistenceContext.getAsynchronousSearchId()
- .equals(this.asynchronousSearchId) && asynchronousSearchPersistenceContext.getAsynchronousSearchPersistenceModel()
- .equals(this.asynchronousSearchPersistenceModel);
+ return asynchronousSearchPersistenceContext.getAsynchronousSearchId().equals(this.asynchronousSearchId)
+ && asynchronousSearchPersistenceContext.getAsynchronousSearchPersistenceModel().equals(this.asynchronousSearchPersistenceModel);
}
}
diff --git a/src/main/java/org/opensearch/search/asynchronous/context/persistence/AsynchronousSearchPersistenceModel.java b/src/main/java/org/opensearch/search/asynchronous/context/persistence/AsynchronousSearchPersistenceModel.java
index 1b4f7158..d46e7c7c 100644
--- a/src/main/java/org/opensearch/search/asynchronous/context/persistence/AsynchronousSearchPersistenceModel.java
+++ b/src/main/java/org/opensearch/search/asynchronous/context/persistence/AsynchronousSearchPersistenceModel.java
@@ -1,8 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
-
package org.opensearch.search.asynchronous.context.persistence;
import org.opensearch.commons.authuser.User;
@@ -34,8 +37,13 @@ public AsynchronousSearchPersistenceModel(long startTimeMillis, long expirationT
this.user = user;
}
- public AsynchronousSearchPersistenceModel(long startTimeMillis, long expirationTimeMillis, SearchResponse response,
- Exception error, User user) throws IOException {
+ public AsynchronousSearchPersistenceModel(
+ long startTimeMillis,
+ long expirationTimeMillis,
+ SearchResponse response,
+ Exception error,
+ User user
+ ) throws IOException {
this.startTimeMillis = startTimeMillis;
this.expirationTimeMillis = expirationTimeMillis;
this.response = serializeResponse(response);
@@ -109,14 +117,12 @@ public boolean equals(Object o) {
return false;
}
AsynchronousSearchPersistenceModel other = (AsynchronousSearchPersistenceModel) o;
- return
- startTimeMillis == other.startTimeMillis && expirationTimeMillis == other.expirationTimeMillis
- && ((response == null && other.response == null) ||
- (response != null && other.response != null && response.equals(other.response)))
- && ((error == null && other.error == null) ||
- (error != null && other.error != null && error.equals(other.error)))
- && ((user == null && other.user == null) ||
- (user != null && other.user != null && user.equals(other.user)));
+ return startTimeMillis == other.startTimeMillis
+ && expirationTimeMillis == other.expirationTimeMillis
+ && ((response == null && other.response == null)
+ || (response != null && other.response != null && response.equals(other.response)))
+ && ((error == null && other.error == null) || (error != null && other.error != null && error.equals(other.error)))
+ && ((user == null && other.user == null) || (user != null && other.user != null && user.equals(other.user)));
}
}
diff --git a/src/main/java/org/opensearch/search/asynchronous/context/state/AsynchronousSearchContextEvent.java b/src/main/java/org/opensearch/search/asynchronous/context/state/AsynchronousSearchContextEvent.java
index cd78991f..5e8fa8ac 100644
--- a/src/main/java/org/opensearch/search/asynchronous/context/state/AsynchronousSearchContextEvent.java
+++ b/src/main/java/org/opensearch/search/asynchronous/context/state/AsynchronousSearchContextEvent.java
@@ -1,8 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
-
package org.opensearch.search.asynchronous.context.state;
import org.opensearch.search.asynchronous.context.AsynchronousSearchContext;
@@ -26,4 +29,3 @@ public AsynchronousSearchContext asynchronousSearchContext() {
}
}
-
diff --git a/src/main/java/org/opensearch/search/asynchronous/context/state/AsynchronousSearchState.java b/src/main/java/org/opensearch/search/asynchronous/context/state/AsynchronousSearchState.java
index 17443bc4..f0f4acc4 100644
--- a/src/main/java/org/opensearch/search/asynchronous/context/state/AsynchronousSearchState.java
+++ b/src/main/java/org/opensearch/search/asynchronous/context/state/AsynchronousSearchState.java
@@ -1,8 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
-
package org.opensearch.search.asynchronous.context.state;
import org.opensearch.action.search.SearchTask;
diff --git a/src/main/java/org/opensearch/search/asynchronous/context/state/AsynchronousSearchStateMachine.java b/src/main/java/org/opensearch/search/asynchronous/context/state/AsynchronousSearchStateMachine.java
index e248d22c..dce7b427 100644
--- a/src/main/java/org/opensearch/search/asynchronous/context/state/AsynchronousSearchStateMachine.java
+++ b/src/main/java/org/opensearch/search/asynchronous/context/state/AsynchronousSearchStateMachine.java
@@ -1,8 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
-
package org.opensearch.search.asynchronous.context.state;
import org.opensearch.search.asynchronous.context.AsynchronousSearchContext;
@@ -33,8 +36,11 @@ public class AsynchronousSearchStateMachine implements StateMachine states;
private final AsynchronousSearchContextEventListener asynchronousSearchContextEventListener;
- public AsynchronousSearchStateMachine(final Set states, final AsynchronousSearchState initialState,
- AsynchronousSearchContextEventListener asynchronousSearchContextEventListener) {
+ public AsynchronousSearchStateMachine(
+ final Set states,
+ final AsynchronousSearchState initialState,
+ AsynchronousSearchContextEventListener asynchronousSearchContextEventListener
+ ) {
super();
this.transitionsMap = new HashMap<>();
this.states = states;
@@ -92,31 +98,45 @@ public AsynchronousSearchState trigger(AsynchronousSearchContextEvent event) thr
AsynchronousSearchTransition extends AsynchronousSearchContextEvent> transition = transitionsMap.get(transitionId);
execute(transition.onEvent(), event, currentState);
asynchronousSearchContext.setState(transition.targetState());
- logger.debug("Executed event [{}] for asynchronous search id [{}] ", event.getClass().getName(),
- event.asynchronousSearchContext.getAsynchronousSearchId());
+ logger.debug(
+ "Executed event [{}] for asynchronous search id [{}] ",
+ event.getClass().getName(),
+ event.asynchronousSearchContext.getAsynchronousSearchId()
+ );
BiConsumer eventListener = transition.eventListener();
try {
eventListener.accept(event.asynchronousSearchContext().getContextId(), asynchronousSearchContextEventListener);
} catch (Exception ex) {
- logger.error(() -> new ParameterizedMessage("Failed to execute listener for asynchronous search id : [{}]",
- event.asynchronousSearchContext.getAsynchronousSearchId()), ex);
+ logger.error(
+ () -> new ParameterizedMessage(
+ "Failed to execute listener for asynchronous search id : [{}]",
+ event.asynchronousSearchContext.getAsynchronousSearchId()
+ ),
+ ex
+ );
}
return asynchronousSearchContext.getAsynchronousSearchState();
} else {
- String message = String.format(Locale.ROOT, "Invalid transition for " +
- "asynchronous search context [%s] from source state [%s] on event [%s]",
- asynchronousSearchContext.getAsynchronousSearchId(), currentState, event.getClass().getName());
+ String message = String.format(
+ Locale.ROOT,
+ "Invalid transition for " + "asynchronous search context [%s] from source state [%s] on event [%s]",
+ asynchronousSearchContext.getAsynchronousSearchId(),
+ currentState,
+ event.getClass().getName()
+ );
logger.error(message);
throw new IllegalStateException(message);
}
}
}
-
@SuppressWarnings("unchecked")
- //Suppress the warning since we know the type of the event and transition based on the validation
- private void execute(BiConsumer onEvent, AsynchronousSearchContextEvent event,
- AsynchronousSearchState state) {
+ // Suppress the warning since we know the type of the event and transition based on the validation
+ private void execute(
+ BiConsumer onEvent,
+ AsynchronousSearchContextEvent event,
+ AsynchronousSearchState state
+ ) {
onEvent.accept(state, (T) event);
}
@@ -139,4 +159,3 @@ private String getTransitionId(AsynchronousSearchState sourceState, Class> eve
}
}
-
diff --git a/src/main/java/org/opensearch/search/asynchronous/context/state/AsynchronousSearchStateMachineClosedException.java b/src/main/java/org/opensearch/search/asynchronous/context/state/AsynchronousSearchStateMachineClosedException.java
index c5b0423b..8e68fb62 100644
--- a/src/main/java/org/opensearch/search/asynchronous/context/state/AsynchronousSearchStateMachineClosedException.java
+++ b/src/main/java/org/opensearch/search/asynchronous/context/state/AsynchronousSearchStateMachineClosedException.java
@@ -1,8 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
-
package org.opensearch.search.asynchronous.context.state;
import java.util.Locale;
@@ -12,10 +15,19 @@ public class AsynchronousSearchStateMachineClosedException extends Exception {
private final AsynchronousSearchState currentState;
private final AsynchronousSearchContextEvent contextEvent;
- public AsynchronousSearchStateMachineClosedException(AsynchronousSearchState currentState,
- AsynchronousSearchContextEvent contextEvent) {
- super(String.format(Locale.ROOT, "Invalid transition for CLOSED context [%s] from source state [%s] on event [%s]",
- contextEvent.asynchronousSearchContext.getAsynchronousSearchId(), currentState, contextEvent.getClass().getName()));
+ public AsynchronousSearchStateMachineClosedException(
+ AsynchronousSearchState currentState,
+ AsynchronousSearchContextEvent contextEvent
+ ) {
+ super(
+ String.format(
+ Locale.ROOT,
+ "Invalid transition for CLOSED context [%s] from source state [%s] on event [%s]",
+ contextEvent.asynchronousSearchContext.getAsynchronousSearchId(),
+ currentState,
+ contextEvent.getClass().getName()
+ )
+ );
this.currentState = currentState;
this.contextEvent = contextEvent;
}
diff --git a/src/main/java/org/opensearch/search/asynchronous/context/state/AsynchronousSearchTransition.java b/src/main/java/org/opensearch/search/asynchronous/context/state/AsynchronousSearchTransition.java
index f38381a3..e857fb47 100644
--- a/src/main/java/org/opensearch/search/asynchronous/context/state/AsynchronousSearchTransition.java
+++ b/src/main/java/org/opensearch/search/asynchronous/context/state/AsynchronousSearchTransition.java
@@ -1,8 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
-
package org.opensearch.search.asynchronous.context.state;
import org.opensearch.search.asynchronous.context.AsynchronousSearchContextId;
@@ -11,7 +14,8 @@
import java.util.function.BiConsumer;
public class AsynchronousSearchTransition
- implements Transition {
+ implements
+ Transition {
private final AsynchronousSearchState sourceState;
private final AsynchronousSearchState targetState;
@@ -19,10 +23,13 @@ public class AsynchronousSearchTransition eventListener;
private final Class eventType;
- public AsynchronousSearchTransition(AsynchronousSearchState sourceState, AsynchronousSearchState targetState,
- BiConsumer onEvent,
- BiConsumer eventListener,
- Class eventName) {
+ public AsynchronousSearchTransition(
+ AsynchronousSearchState sourceState,
+ AsynchronousSearchState targetState,
+ BiConsumer onEvent,
+ BiConsumer eventListener,
+ Class eventName
+ ) {
this.sourceState = sourceState;
this.targetState = targetState;
this.onEvent = onEvent;
diff --git a/src/main/java/org/opensearch/search/asynchronous/context/state/StateMachine.java b/src/main/java/org/opensearch/search/asynchronous/context/state/StateMachine.java
index 24a2811d..4caf980e 100644
--- a/src/main/java/org/opensearch/search/asynchronous/context/state/StateMachine.java
+++ b/src/main/java/org/opensearch/search/asynchronous/context/state/StateMachine.java
@@ -1,8 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
-
package org.opensearch.search.asynchronous.context.state;
import java.util.Map;
diff --git a/src/main/java/org/opensearch/search/asynchronous/context/state/Transition.java b/src/main/java/org/opensearch/search/asynchronous/context/state/Transition.java
index 0504858e..e7c957d8 100644
--- a/src/main/java/org/opensearch/search/asynchronous/context/state/Transition.java
+++ b/src/main/java/org/opensearch/search/asynchronous/context/state/Transition.java
@@ -1,8 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
-
package org.opensearch.search.asynchronous.context.state;
import java.util.function.BiConsumer;
diff --git a/src/main/java/org/opensearch/search/asynchronous/context/state/event/BeginPersistEvent.java b/src/main/java/org/opensearch/search/asynchronous/context/state/event/BeginPersistEvent.java
index 33fee24b..1758e465 100644
--- a/src/main/java/org/opensearch/search/asynchronous/context/state/event/BeginPersistEvent.java
+++ b/src/main/java/org/opensearch/search/asynchronous/context/state/event/BeginPersistEvent.java
@@ -1,8 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
-
package org.opensearch.search.asynchronous.context.state.event;
import org.opensearch.search.asynchronous.context.AsynchronousSearchContext;
@@ -24,12 +27,21 @@ public BeginPersistEvent(AsynchronousSearchContext asynchronousSearchContext) {
public AsynchronousSearchPersistenceModel getAsynchronousSearchPersistenceModel() {
try {
- return new AsynchronousSearchPersistenceModel(asynchronousSearchContext.getStartTimeMillis(),
- asynchronousSearchContext.getExpirationTimeMillis(), asynchronousSearchContext.getSearchResponse(),
- asynchronousSearchContext.getSearchError(), asynchronousSearchContext.getUser());
+ return new AsynchronousSearchPersistenceModel(
+ asynchronousSearchContext.getStartTimeMillis(),
+ asynchronousSearchContext.getExpirationTimeMillis(),
+ asynchronousSearchContext.getSearchResponse(),
+ asynchronousSearchContext.getSearchError(),
+ asynchronousSearchContext.getUser()
+ );
} catch (IOException e) {
- logger.error(() -> new ParameterizedMessage("Failed to create asynchronous search persistence model" +
- " for asynchronous search [{}]", asynchronousSearchContext.getAsynchronousSearchId()), e);
+ logger.error(
+ () -> new ParameterizedMessage(
+ "Failed to create asynchronous search persistence model" + " for asynchronous search [{}]",
+ asynchronousSearchContext.getAsynchronousSearchId()
+ ),
+ e
+ );
return null;
}
}
diff --git a/src/main/java/org/opensearch/search/asynchronous/context/state/event/SearchDeletedEvent.java b/src/main/java/org/opensearch/search/asynchronous/context/state/event/SearchDeletedEvent.java
index acbdf8a8..f5c06c80 100644
--- a/src/main/java/org/opensearch/search/asynchronous/context/state/event/SearchDeletedEvent.java
+++ b/src/main/java/org/opensearch/search/asynchronous/context/state/event/SearchDeletedEvent.java
@@ -1,8 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
-
package org.opensearch.search.asynchronous.context.state.event;
import org.opensearch.search.asynchronous.context.active.AsynchronousSearchActiveContext;
diff --git a/src/main/java/org/opensearch/search/asynchronous/context/state/event/SearchFailureEvent.java b/src/main/java/org/opensearch/search/asynchronous/context/state/event/SearchFailureEvent.java
index 74f68138..b33debc3 100644
--- a/src/main/java/org/opensearch/search/asynchronous/context/state/event/SearchFailureEvent.java
+++ b/src/main/java/org/opensearch/search/asynchronous/context/state/event/SearchFailureEvent.java
@@ -1,8 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
-
package org.opensearch.search.asynchronous.context.state.event;
import org.opensearch.search.asynchronous.context.AsynchronousSearchContext;
diff --git a/src/main/java/org/opensearch/search/asynchronous/context/state/event/SearchResponsePersistFailedEvent.java b/src/main/java/org/opensearch/search/asynchronous/context/state/event/SearchResponsePersistFailedEvent.java
index b2d84528..78b45cdd 100644
--- a/src/main/java/org/opensearch/search/asynchronous/context/state/event/SearchResponsePersistFailedEvent.java
+++ b/src/main/java/org/opensearch/search/asynchronous/context/state/event/SearchResponsePersistFailedEvent.java
@@ -1,8 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
-
package org.opensearch.search.asynchronous.context.state.event;
import org.opensearch.search.asynchronous.context.AsynchronousSearchContext;
diff --git a/src/main/java/org/opensearch/search/asynchronous/context/state/event/SearchResponsePersistedEvent.java b/src/main/java/org/opensearch/search/asynchronous/context/state/event/SearchResponsePersistedEvent.java
index 590bffa3..276deed4 100644
--- a/src/main/java/org/opensearch/search/asynchronous/context/state/event/SearchResponsePersistedEvent.java
+++ b/src/main/java/org/opensearch/search/asynchronous/context/state/event/SearchResponsePersistedEvent.java
@@ -1,8 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
-
package org.opensearch.search.asynchronous.context.state.event;
import org.opensearch.search.asynchronous.context.AsynchronousSearchContext;
@@ -17,4 +20,3 @@ public SearchResponsePersistedEvent(AsynchronousSearchContext asynchronousSearch
super(asynchronousSearchContext);
}
}
-
diff --git a/src/main/java/org/opensearch/search/asynchronous/context/state/event/SearchStartedEvent.java b/src/main/java/org/opensearch/search/asynchronous/context/state/event/SearchStartedEvent.java
index eafdeaa0..ef18d78e 100644
--- a/src/main/java/org/opensearch/search/asynchronous/context/state/event/SearchStartedEvent.java
+++ b/src/main/java/org/opensearch/search/asynchronous/context/state/event/SearchStartedEvent.java
@@ -1,8 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
-
package org.opensearch.search.asynchronous.context.state.event;
import org.opensearch.search.asynchronous.context.AsynchronousSearchContext;
diff --git a/src/main/java/org/opensearch/search/asynchronous/context/state/event/SearchSuccessfulEvent.java b/src/main/java/org/opensearch/search/asynchronous/context/state/event/SearchSuccessfulEvent.java
index d8a92013..5be8ac8e 100644
--- a/src/main/java/org/opensearch/search/asynchronous/context/state/event/SearchSuccessfulEvent.java
+++ b/src/main/java/org/opensearch/search/asynchronous/context/state/event/SearchSuccessfulEvent.java
@@ -1,8 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
-
package org.opensearch.search.asynchronous.context.state.event;
import org.opensearch.search.asynchronous.context.AsynchronousSearchContext;
diff --git a/src/main/java/org/opensearch/search/asynchronous/id/AsynchronousSearchId.java b/src/main/java/org/opensearch/search/asynchronous/id/AsynchronousSearchId.java
index e083512b..e4aae22c 100644
--- a/src/main/java/org/opensearch/search/asynchronous/id/AsynchronousSearchId.java
+++ b/src/main/java/org/opensearch/search/asynchronous/id/AsynchronousSearchId.java
@@ -1,8 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
-
package org.opensearch.search.asynchronous.id;
import org.opensearch.search.asynchronous.context.AsynchronousSearchContextId;
@@ -47,7 +50,6 @@ public String toString() {
return "[" + node + "][" + taskId + "][" + asynchronousSearchContextId + "]";
}
-
@Override
public int hashCode() {
return Objects.hash(this.asynchronousSearchContextId, this.node, this.taskId);
@@ -55,13 +57,11 @@ public int hashCode() {
@Override
public boolean equals(Object o) {
- if (this == o)
- return true;
- if (o == null || getClass() != o.getClass())
- return false;
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
AsynchronousSearchId asynchronousSearchId = (AsynchronousSearchId) o;
return asynchronousSearchId.asynchronousSearchContextId.equals(this.asynchronousSearchContextId)
- && asynchronousSearchId.node.equals(this.node)
- && asynchronousSearchId.taskId == this.taskId;
+ && asynchronousSearchId.node.equals(this.node)
+ && asynchronousSearchId.taskId == this.taskId;
}
}
diff --git a/src/main/java/org/opensearch/search/asynchronous/id/AsynchronousSearchIdConverter.java b/src/main/java/org/opensearch/search/asynchronous/id/AsynchronousSearchIdConverter.java
index 43a1a1fb..06bf7667 100644
--- a/src/main/java/org/opensearch/search/asynchronous/id/AsynchronousSearchIdConverter.java
+++ b/src/main/java/org/opensearch/search/asynchronous/id/AsynchronousSearchIdConverter.java
@@ -1,8 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
-
package org.opensearch.search.asynchronous.id;
import org.opensearch.search.asynchronous.context.AsynchronousSearchContextId;
diff --git a/src/main/java/org/opensearch/search/asynchronous/listener/AsynchronousSearchContextEventListener.java b/src/main/java/org/opensearch/search/asynchronous/listener/AsynchronousSearchContextEventListener.java
index 303a35bd..95fc1b6f 100644
--- a/src/main/java/org/opensearch/search/asynchronous/listener/AsynchronousSearchContextEventListener.java
+++ b/src/main/java/org/opensearch/search/asynchronous/listener/AsynchronousSearchContextEventListener.java
@@ -1,8 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
-
package org.opensearch.search.asynchronous.listener;
import org.opensearch.search.asynchronous.context.AsynchronousSearchContextId;
@@ -15,66 +18,55 @@ public interface AsynchronousSearchContextEventListener {
/**
* @param contextId Executed when a new asynchronous search context was created
*/
- default void onNewContext(AsynchronousSearchContextId contextId) {
- }
+ default void onNewContext(AsynchronousSearchContextId contextId) {}
/**
* @param contextId Executed when a previously created asynchronous search context completes.
*/
- default void onContextCompleted(AsynchronousSearchContextId contextId) {
- }
+ default void onContextCompleted(AsynchronousSearchContextId contextId) {}
/**
* @param contextId Executed when a previously created asynchronous search context fails.
*/
- default void onContextFailed(AsynchronousSearchContextId contextId) {
- }
+ default void onContextFailed(AsynchronousSearchContextId contextId) {}
/**
* @param contextId Executed when a previously created asynchronous search context is persisted.
*/
- default void onContextPersisted(AsynchronousSearchContextId contextId) {
- }
+ default void onContextPersisted(AsynchronousSearchContextId contextId) {}
/**
* @param contextId Executed when a previously created asynchronous search context fails persisting.
*/
- default void onContextPersistFailed(AsynchronousSearchContextId contextId) {
- }
+ default void onContextPersistFailed(AsynchronousSearchContextId contextId) {}
/**
* @param contextId Executed when a previously created asynchronous search context is deleted.
*/
- default void onContextDeleted(AsynchronousSearchContextId contextId) {
- }
+ default void onContextDeleted(AsynchronousSearchContextId contextId) {}
/**
* @param contextId Executed when a previously created asynchronous search context is running.
*/
- default void onContextRunning(AsynchronousSearchContextId contextId) {
- }
+ default void onContextRunning(AsynchronousSearchContextId contextId) {}
/**
* @param contextId Executed when asynchronous search context creation is rejected
*/
- default void onContextRejected(AsynchronousSearchContextId contextId) {
- }
+ default void onContextRejected(AsynchronousSearchContextId contextId) {}
/**
* @param contextId Executed when a running asynchronous search context is deleted and has bypassed succeeded/failed state
*/
- default void onRunningContextDeleted(AsynchronousSearchContextId contextId) {
- }
+ default void onRunningContextDeleted(AsynchronousSearchContextId contextId) {}
/**
* @param contextId Executed when an asynchronous search context is cancelled
*/
- default void onContextCancelled(AsynchronousSearchContextId contextId) {
- }
+ default void onContextCancelled(AsynchronousSearchContextId contextId) {}
/**
* @param contextId Executed when an asynchronous search context is initialized
*/
- default void onContextInitialized(AsynchronousSearchContextId contextId) {
- }
+ default void onContextInitialized(AsynchronousSearchContextId contextId) {}
}
diff --git a/src/main/java/org/opensearch/search/asynchronous/listener/AsynchronousSearchProgressListener.java b/src/main/java/org/opensearch/search/asynchronous/listener/AsynchronousSearchProgressListener.java
index c62b9b86..a5e75f96 100644
--- a/src/main/java/org/opensearch/search/asynchronous/listener/AsynchronousSearchProgressListener.java
+++ b/src/main/java/org/opensearch/search/asynchronous/listener/AsynchronousSearchProgressListener.java
@@ -1,8 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
-
package org.opensearch.search.asynchronous.listener;
import org.opensearch.search.asynchronous.response.AsynchronousSearchResponse;
@@ -29,7 +32,6 @@
import java.util.function.LongSupplier;
import java.util.function.Supplier;
-
/***
* The implementation of {@link CompositeSearchProgressActionListener} responsible for updating the partial results of a single asynchronous
* search request. All partial results are updated atomically.
@@ -42,11 +44,14 @@ public class AsynchronousSearchProgressListener extends SearchProgressActionList
private final Function failureFunction;
private final ExecutorService executor;
- public AsynchronousSearchProgressListener(long relativeStartMillis, Function successFunction,
- Function failureFunction,
- ExecutorService executor, LongSupplier relativeTimeSupplier,
- Supplier reduceContextBuilder) {
+ public AsynchronousSearchProgressListener(
+ long relativeStartMillis,
+ Function successFunction,
+ Function failureFunction,
+ ExecutorService executor,
+ LongSupplier relativeTimeSupplier,
+ Supplier reduceContextBuilder
+ ) {
this.successFunction = successFunction;
this.failureFunction = failureFunction;
this.executor = executor;
@@ -54,7 +59,6 @@ public AsynchronousSearchProgressListener(long relativeStartMillis, Function();
}
-
/***
* Returns the partial response for the search response.
* @return the partial search response
@@ -65,8 +69,12 @@ public SearchResponse partialResponse() {
}
@Override
- protected void onListShards(List shards, List skippedShards, SearchResponse.Clusters clusters,
- boolean fetchPhase) {
+ protected void onListShards(
+ List shards,
+ List skippedShards,
+ SearchResponse.Clusters clusters,
+ boolean fetchPhase
+ ) {
partialResultsHolder.hasFetchPhase.set(fetchPhase);
partialResultsHolder.totalShards.set(shards.size() + skippedShards.size());
partialResultsHolder.skippedShards.set(skippedShards.size());
@@ -77,8 +85,10 @@ protected void onListShards(List shards, List skippedS
@Override
protected void onPartialReduce(List shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {
- assert reducePhase > partialResultsHolder.reducePhase.get() : "reduce phase " + reducePhase + "less than previous phase"
- + partialResultsHolder.reducePhase.get();
+ assert reducePhase > partialResultsHolder.reducePhase.get() : "reduce phase "
+ + reducePhase
+ + "less than previous phase"
+ + partialResultsHolder.reducePhase.get();
partialResultsHolder.partialInternalAggregations.set(aggs);
partialResultsHolder.reducePhase.set(reducePhase);
partialResultsHolder.totalHits.set(totalHits);
@@ -86,10 +96,12 @@ protected void onPartialReduce(List shards, TotalHits totalHits, In
@Override
protected void onFinalReduce(List shards, TotalHits totalHits, InternalAggregations aggs, int reducePhase) {
- assert reducePhase > partialResultsHolder.reducePhase.get() : "reduce phase " + reducePhase + "less than previous phase"
- + partialResultsHolder.reducePhase.get();
+ assert reducePhase > partialResultsHolder.reducePhase.get() : "reduce phase "
+ + reducePhase
+ + "less than previous phase"
+ + partialResultsHolder.reducePhase.get();
partialResultsHolder.internalAggregations.set(aggs);
- //we don't need to hold its reference beyond this point
+ // we don't need to hold its reference beyond this point
partialResultsHolder.partialInternalAggregations.set(null);
partialResultsHolder.reducePhase.set(reducePhase);
partialResultsHolder.totalHits.set(totalHits);
@@ -127,11 +139,11 @@ private synchronized void onShardResult(int shardIndex) {
}
private synchronized void onSearchFailure(int shardIndex, SearchShardTarget shardTarget, Exception e) {
- //It's hard to build partial search failures since the elasticsearch doesn't consider shard not available exceptions as failures
- //while internally it has exceptions from all shards of a particular shard group, it exposes only the exception on the
- //final shard of the group, the exception for which could be shard not available while a previous failure on a shard of the same
- //group could be outside this category. Since the final exception overrides the exception for the group, it causes inconsistency
- //between the partial search failure and failures post completion.
+ // It's hard to build partial search failures since the elasticsearch doesn't consider shard not available exceptions as failures
+ // while internally it has exceptions from all shards of a particular shard group, it exposes only the exception on the
+ // final shard of the group, the exception for which could be shard not available while a previous failure on a shard of the same
+ // group could be outside this category. Since the final exception overrides the exception for the group, it causes inconsistency
+ // between the partial search failure and failures post completion.
if (partialResultsHolder.successfulShardIds.contains(shardIndex)) {
partialResultsHolder.successfulShardIds.remove(shardIndex);
partialResultsHolder.successfulShards.decrementAndGet();
@@ -196,9 +208,11 @@ static class PartialResultsHolder {
final LongSupplier relativeTimeSupplier;
final Supplier reduceContextBuilder;
-
- PartialResultsHolder(long relativeStartMillis, LongSupplier relativeTimeSupplier,
- Supplier reduceContextBuilder) {
+ PartialResultsHolder(
+ long relativeStartMillis,
+ LongSupplier relativeTimeSupplier,
+ Supplier reduceContextBuilder
+ ) {
this.internalAggregations = new AtomicReference<>();
this.totalShards = new SetOnce<>();
this.successfulShards = new AtomicInteger();
@@ -219,20 +233,37 @@ public SearchResponse partialResponse() {
if (isInitialized) {
SearchHits searchHits = new SearchHits(SearchHits.EMPTY, totalHits.get(), Float.NaN);
InternalAggregations finalAggregation = null;
- //after final reduce phase this should be present
+ // after final reduce phase this should be present
if (internalAggregations.get() != null) {
finalAggregation = internalAggregations.get();
- //before final reduce phase ensure we do a top-level final reduce to get reduced aggregation results
- //else we might be returning back all the partial results aggregated so far
+ // before final reduce phase ensure we do a top-level final reduce to get reduced aggregation results
+ // else we might be returning back all the partial results aggregated so far
} else if (partialInternalAggregations.get() != null) {
- finalAggregation = InternalAggregations.topLevelReduce(Collections.singletonList(partialInternalAggregations.get()),
- reduceContextBuilder.get().forFinalReduction());
+ finalAggregation = InternalAggregations.topLevelReduce(
+ Collections.singletonList(partialInternalAggregations.get()),
+ reduceContextBuilder.get().forFinalReduction()
+ );
}
- InternalSearchResponse internalSearchResponse = new InternalSearchResponse(searchHits,
- finalAggregation, null, null, false, null, reducePhase.get());
+ InternalSearchResponse internalSearchResponse = new InternalSearchResponse(
+ searchHits,
+ finalAggregation,
+ null,
+ null,
+ false,
+ null,
+ reducePhase.get()
+ );
long tookInMillis = relativeTimeSupplier.getAsLong() - relativeStartMillis;
- return new SearchResponse(internalSearchResponse, null, totalShards.get(),
- successfulShards.get(), skippedShards.get(), tookInMillis, ShardSearchFailure.EMPTY_ARRAY, clusters.get());
+ return new SearchResponse(
+ internalSearchResponse,
+ null,
+ totalShards.get(),
+ successfulShards.get(),
+ skippedShards.get(),
+ tookInMillis,
+ ShardSearchFailure.EMPTY_ARRAY,
+ clusters.get()
+ );
} else {
return null;
}
diff --git a/src/main/java/org/opensearch/search/asynchronous/listener/AsynchronousSearchTimeoutWrapper.java b/src/main/java/org/opensearch/search/asynchronous/listener/AsynchronousSearchTimeoutWrapper.java
index 5efbfaa2..b38a3f00 100644
--- a/src/main/java/org/opensearch/search/asynchronous/listener/AsynchronousSearchTimeoutWrapper.java
+++ b/src/main/java/org/opensearch/search/asynchronous/listener/AsynchronousSearchTimeoutWrapper.java
@@ -1,8 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
-
package org.opensearch.search.asynchronous.listener;
import org.apache.logging.log4j.LogManager;
@@ -34,10 +37,13 @@ public class AsynchronousSearchTimeoutWrapper {
* @param the response
* @return PrioritizedListener
*/
- public static PrioritizedActionListener wrapScheduledTimeout(ThreadPool threadPool, TimeValue timeout,
- String executor,
- ActionListener actionListener,
- Consumer> timeoutConsumer) {
+ public static PrioritizedActionListener wrapScheduledTimeout(
+ ThreadPool threadPool,
+ TimeValue timeout,
+ String executor,
+ ActionListener actionListener,
+ Consumer> timeoutConsumer
+ ) {
return scheduleTimeout(threadPool, timeout, executor, initListener(actionListener, timeoutConsumer));
}
@@ -48,10 +54,14 @@ public static PrioritizedActionListener wrapScheduledTimeou
* @param Response
* @return PrioritizedListener
*/
- public static PrioritizedActionListener initListener(ActionListener actionListener,
- Consumer> timeoutConsumer) {
- CompletionPrioritizedActionListener completionTimeoutListener =
- new CompletionPrioritizedActionListener<>(actionListener, timeoutConsumer);
+ public static PrioritizedActionListener initListener(
+ ActionListener actionListener,
+ Consumer> timeoutConsumer
+ ) {
+ CompletionPrioritizedActionListener completionTimeoutListener = new CompletionPrioritizedActionListener<>(
+ actionListener,
+ timeoutConsumer
+ );
return completionTimeoutListener;
}
@@ -64,10 +74,17 @@ public static PrioritizedActionListener initListener(Action
* @param Response
* @return PrioritizedListener
*/
- public static PrioritizedActionListener scheduleTimeout(ThreadPool threadPool, TimeValue timeout, String executor,
- PrioritizedActionListener completionTimeoutListener) {
- ((CompletionPrioritizedActionListener)completionTimeoutListener).cancellable = threadPool.schedule(
- (Runnable) completionTimeoutListener, timeout, executor);
+ public static PrioritizedActionListener scheduleTimeout(
+ ThreadPool threadPool,
+ TimeValue timeout,
+ String executor,
+ PrioritizedActionListener completionTimeoutListener
+ ) {
+ ((CompletionPrioritizedActionListener) completionTimeoutListener).cancellable = threadPool.schedule(
+ (Runnable) completionTimeoutListener,
+ timeout,
+ executor
+ );
return completionTimeoutListener;
}
diff --git a/src/main/java/org/opensearch/search/asynchronous/listener/CompositeSearchProgressActionListener.java b/src/main/java/org/opensearch/search/asynchronous/listener/CompositeSearchProgressActionListener.java
index b4b8ea4e..8d352134 100644
--- a/src/main/java/org/opensearch/search/asynchronous/listener/CompositeSearchProgressActionListener.java
+++ b/src/main/java/org/opensearch/search/asynchronous/listener/CompositeSearchProgressActionListener.java
@@ -1,8 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
-
package org.opensearch.search.asynchronous.listener;
import org.opensearch.core.action.ActionListener;
@@ -49,7 +52,6 @@ private synchronized boolean addListener(ActionListener listener) {
return false;
}
-
@Override
public void onResponse(T response) {
Iterable> actionListenersToBeInvoked = finalizeListeners();
diff --git a/src/main/java/org/opensearch/search/asynchronous/listener/PartialResponseProvider.java b/src/main/java/org/opensearch/search/asynchronous/listener/PartialResponseProvider.java
index 01f7630f..5df4f593 100644
--- a/src/main/java/org/opensearch/search/asynchronous/listener/PartialResponseProvider.java
+++ b/src/main/java/org/opensearch/search/asynchronous/listener/PartialResponseProvider.java
@@ -1,8 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
-
package org.opensearch.search.asynchronous.listener;
import org.opensearch.action.search.SearchResponse;
diff --git a/src/main/java/org/opensearch/search/asynchronous/listener/PrioritizedActionListener.java b/src/main/java/org/opensearch/search/asynchronous/listener/PrioritizedActionListener.java
index 2c6954b1..11077275 100644
--- a/src/main/java/org/opensearch/search/asynchronous/listener/PrioritizedActionListener.java
+++ b/src/main/java/org/opensearch/search/asynchronous/listener/PrioritizedActionListener.java
@@ -1,11 +1,13 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
-
package org.opensearch.search.asynchronous.listener;
-
import org.opensearch.core.action.ActionListener;
public interface PrioritizedActionListener extends ActionListener {
diff --git a/src/main/java/org/opensearch/search/asynchronous/management/AsynchronousSearchManagementService.java b/src/main/java/org/opensearch/search/asynchronous/management/AsynchronousSearchManagementService.java
index fa9c6610..977c83c6 100644
--- a/src/main/java/org/opensearch/search/asynchronous/management/AsynchronousSearchManagementService.java
+++ b/src/main/java/org/opensearch/search/asynchronous/management/AsynchronousSearchManagementService.java
@@ -1,8 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
-
package org.opensearch.search.asynchronous.management;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
@@ -72,22 +75,30 @@ public class AsynchronousSearchManagementService extends AbstractLifecycleCompon
private TimeValue activeContextReaperInterval;
private TimeValue persistedResponseCleanUpInterval;
- public static final String PERSISTED_RESPONSE_CLEANUP_ACTION_NAME =
- "indices:data/read/opendistro/asynchronous_search/response_cleanup";
-
- public static final Setting ACTIVE_CONTEXT_REAPER_INTERVAL_SETTING =
- Setting.timeSetting("plugins.asynchronous_search.active.context.reaper_interval",
- LegacyOpendistroAsynchronousSearchSettings.ACTIVE_CONTEXT_REAPER_INTERVAL_SETTING,
- TimeValue.timeValueSeconds(5), Setting.Property.NodeScope);
- public static final Setting PERSISTED_RESPONSE_CLEAN_UP_INTERVAL_SETTING =
- Setting.timeSetting("plugins.asynchronous_search.expired.persisted_response.cleanup_interval",
- LegacyOpendistroAsynchronousSearchSettings.PERSISTED_RESPONSE_CLEAN_UP_INTERVAL_SETTING, TimeValue.timeValueSeconds(5),
- Setting.Property.NodeScope);
+ public static final String PERSISTED_RESPONSE_CLEANUP_ACTION_NAME = "indices:data/read/opendistro/asynchronous_search/response_cleanup";
+
+ public static final Setting ACTIVE_CONTEXT_REAPER_INTERVAL_SETTING = Setting.timeSetting(
+ "plugins.asynchronous_search.active.context.reaper_interval",
+ LegacyOpendistroAsynchronousSearchSettings.ACTIVE_CONTEXT_REAPER_INTERVAL_SETTING,
+ TimeValue.timeValueSeconds(5),
+ Setting.Property.NodeScope
+ );
+ public static final Setting PERSISTED_RESPONSE_CLEAN_UP_INTERVAL_SETTING = Setting.timeSetting(
+ "plugins.asynchronous_search.expired.persisted_response.cleanup_interval",
+ LegacyOpendistroAsynchronousSearchSettings.PERSISTED_RESPONSE_CLEAN_UP_INTERVAL_SETTING,
+ TimeValue.timeValueSeconds(5),
+ Setting.Property.NodeScope
+ );
@Inject
- public AsynchronousSearchManagementService(Settings settings, ClusterService clusterService, ThreadPool threadPool,
- AsynchronousSearchService asynchronousSearchService, TransportService transportService,
- AsynchronousSearchPersistenceService asynchronousSearchPersistenceService) {
+ public AsynchronousSearchManagementService(
+ Settings settings,
+ ClusterService clusterService,
+ ThreadPool threadPool,
+ AsynchronousSearchService asynchronousSearchService,
+ TransportService transportService,
+ AsynchronousSearchPersistenceService asynchronousSearchPersistenceService
+ ) {
this.clusterService = clusterService;
this.threadPool = threadPool;
this.clusterService.addListener(this);
@@ -97,8 +108,14 @@ public AsynchronousSearchManagementService(Settings settings, ClusterService clu
this.activeContextReaperInterval = ACTIVE_CONTEXT_REAPER_INTERVAL_SETTING.get(settings);
this.persistedResponseCleanUpInterval = PERSISTED_RESPONSE_CLEAN_UP_INTERVAL_SETTING.get(settings);
- transportService.registerRequestHandler(PERSISTED_RESPONSE_CLEANUP_ACTION_NAME, ThreadPool.Names.SAME, false, false,
- AsynchronousSearchCleanUpRequest::new, new PersistedResponseCleanUpTransportHandler());
+ transportService.registerRequestHandler(
+ PERSISTED_RESPONSE_CLEANUP_ACTION_NAME,
+ ThreadPool.Names.SAME,
+ false,
+ false,
+ AsynchronousSearchCleanUpRequest::new,
+ new PersistedResponseCleanUpTransportHandler()
+ );
}
class PersistedResponseCleanUpTransportHandler implements TransportRequestHandler {
@@ -109,20 +126,22 @@ public void messageReceived(AsynchronousSearchCleanUpRequest request, TransportC
try {
channel.sendResponse(e);
} catch (IOException ex) {
- logger.warn(() -> new ParameterizedMessage(
- "Failed to send cleanup error response for request [{}]", request), ex);
+ logger.warn(() -> new ParameterizedMessage("Failed to send cleanup error response for request [{}]", request), ex);
}
}));
}
}
private void asyncCleanUpOperation(AsynchronousSearchCleanUpRequest request, Task task, ActionListener listener) {
- transportService.getThreadPool().executor(AsynchronousSearchPlugin.OPEN_DISTRO_ASYNC_SEARCH_GENERIC_THREAD_POOL_NAME)
- .execute(() -> performPersistedResponseCleanUpAction(request, listener));
+ transportService.getThreadPool()
+ .executor(AsynchronousSearchPlugin.OPEN_DISTRO_ASYNC_SEARCH_GENERIC_THREAD_POOL_NAME)
+ .execute(() -> performPersistedResponseCleanUpAction(request, listener));
}
- private void performPersistedResponseCleanUpAction(AsynchronousSearchCleanUpRequest request,
- ActionListener listener) {
+ private void performPersistedResponseCleanUpAction(
+ AsynchronousSearchCleanUpRequest request,
+ ActionListener listener
+ ) {
asynchronousSearchPersistenceService.deleteExpiredResponses(listener, request.absoluteTimeInMillis);
}
@@ -150,8 +169,11 @@ private void triggerCleanUp(ClusterState clusterState, String reason) {
@Override
protected void doStart() {
- activeContextReaperScheduledFuture = threadPool.scheduleWithFixedDelay(new ActiveContextReaper(), activeContextReaperInterval,
- AsynchronousSearchPlugin.OPEN_DISTRO_ASYNC_SEARCH_GENERIC_THREAD_POOL_NAME);
+ activeContextReaperScheduledFuture = threadPool.scheduleWithFixedDelay(
+ new ActiveContextReaper(),
+ activeContextReaperInterval,
+ AsynchronousSearchPlugin.OPEN_DISTRO_ASYNC_SEARCH_GENERIC_THREAD_POOL_NAME
+ );
}
@Override
@@ -174,14 +196,24 @@ public void run() {
Set toFree = asynchronousSearchService.getContextsToReap();
// don't block on response
toFree.forEach(
- context -> asynchronousSearchService.freeContext(context.getAsynchronousSearchId(), context.getContextId(),
- null, ActionListener.wrap(
- (response) -> logger.debug("Successfully freed up context [{}] running duration [{}]",
- context.getAsynchronousSearchId(), context.getExpirationTimeMillis() - context.getStartTimeMillis()),
- (exception) -> logger.debug(() -> new ParameterizedMessage(
- "Failed to cleanup asynchronous search context [{}] running duration [{}] due to ",
- context.getAsynchronousSearchId(),context.getExpirationTimeMillis()
- - context.getStartTimeMillis()), exception)
+ context -> asynchronousSearchService.freeContext(
+ context.getAsynchronousSearchId(),
+ context.getContextId(),
+ null,
+ ActionListener.wrap(
+ (response) -> logger.debug(
+ "Successfully freed up context [{}] running duration [{}]",
+ context.getAsynchronousSearchId(),
+ context.getExpirationTimeMillis() - context.getStartTimeMillis()
+ ),
+ (exception) -> logger.debug(
+ () -> new ParameterizedMessage(
+ "Failed to cleanup asynchronous search context [{}] running duration [{}] due to ",
+ context.getAsynchronousSearchId(),
+ context.getExpirationTimeMillis() - context.getStartTimeMillis()
+ ),
+ exception
+ )
)
)
);
@@ -195,47 +227,53 @@ public final void performCleanUp() {
final ThreadContext threadContext = threadPool.getThreadContext();
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
final Map dataNodes = clusterService.state().nodes().getDataNodes();
- List nodes = Stream.of(dataNodes.values().toArray(new DiscoveryNode[0]))
- .collect(Collectors.toList());
+ List nodes = Stream.of(dataNodes.values().toArray(new DiscoveryNode[0])).collect(Collectors.toList());
if (nodes == null || nodes.isEmpty()) {
logger.debug("Found empty data nodes with asynchronous search enabled attribute [{}] for response clean up", dataNodes);
return;
}
int pos = Randomness.get().nextInt(nodes.size());
DiscoveryNode randomNode = nodes.get(pos);
- transportService.sendRequest(randomNode, PERSISTED_RESPONSE_CLEANUP_ACTION_NAME,
- new AsynchronousSearchCleanUpRequest(threadPool.absoluteTimeInMillis()),
- new TransportResponseHandler() {
-
- @Override
- public AcknowledgedResponse read(StreamInput in) throws IOException {
- return new AcknowledgedResponse(in);
- }
-
- @Override
- public void handleResponse(AcknowledgedResponse response) {
- logger.debug("Successfully executed clean up action on node [{}] with response [{}]", randomNode,
- response.isAcknowledged());
- }
-
- @Override
- public void handleException(TransportException e) {
- logger.error(() -> new ParameterizedMessage("Exception executing action [{}]",
- PERSISTED_RESPONSE_CLEANUP_ACTION_NAME), e);
- }
-
- @Override
- public String executor() {
- return AsynchronousSearchPlugin.OPEN_DISTRO_ASYNC_SEARCH_GENERIC_THREAD_POOL_NAME;
- }
- });
+ transportService.sendRequest(
+ randomNode,
+ PERSISTED_RESPONSE_CLEANUP_ACTION_NAME,
+ new AsynchronousSearchCleanUpRequest(threadPool.absoluteTimeInMillis()),
+ new TransportResponseHandler() {
+
+ @Override
+ public AcknowledgedResponse read(StreamInput in) throws IOException {
+ return new AcknowledgedResponse(in);
+ }
+
+ @Override
+ public void handleResponse(AcknowledgedResponse response) {
+ logger.debug(
+ "Successfully executed clean up action on node [{}] with response [{}]",
+ randomNode,
+ response.isAcknowledged()
+ );
+ }
+
+ @Override
+ public void handleException(TransportException e) {
+ logger.error(
+ () -> new ParameterizedMessage("Exception executing action [{}]", PERSISTED_RESPONSE_CLEANUP_ACTION_NAME),
+ e
+ );
+ }
+
+ @Override
+ public String executor() {
+ return AsynchronousSearchPlugin.OPEN_DISTRO_ASYNC_SEARCH_GENERIC_THREAD_POOL_NAME;
+ }
+ }
+ );
} catch (Exception ex) {
logger.error("Failed to schedule asynchronous search cleanup", ex);
}
}
-
private class ResponseCleanUpRunnable extends AbstractRunnable {
private final String reason;
@@ -253,16 +291,14 @@ public void onFailure(Exception e) {
logger.warn(new ParameterizedMessage("sync search clean up job failed [{}]", reason), e);
}
-
@Override
public void onRejection(Exception e) {
- final boolean shutDown = e instanceof OpenSearchRejectedExecutionException && ((OpenSearchRejectedExecutionException) e)
- .isExecutorShutdown();
+ final boolean shutDown = e instanceof OpenSearchRejectedExecutionException
+ && ((OpenSearchRejectedExecutionException) e).isExecutorShutdown();
logger.log(shutDown ? Level.DEBUG : Level.WARN, "asynchronous search clean up job rejected [{}]", reason, e);
}
}
-
private class PersistedResponseCleanUpAndRescheduleRunnable extends ResponseCleanUpRunnable {
PersistedResponseCleanUpAndRescheduleRunnable() {
super("scheduled");
@@ -286,7 +322,6 @@ public void onAfter() {
}
}
-
static class AsynchronousSearchCleanUpRequest extends ActionRequest {
private final long absoluteTimeInMillis;
@@ -318,7 +353,6 @@ public long getAbsoluteTimeInMillis() {
return absoluteTimeInMillis;
}
-
@Override
public int hashCode() {
return Objects.hash(absoluteTimeInMillis);
@@ -326,10 +360,8 @@ public int hashCode() {
@Override
public boolean equals(Object o) {
- if (this == o)
- return true;
- if (o == null || getClass() != o.getClass())
- return false;
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
AsynchronousSearchCleanUpRequest asynchronousSearchCleanUpRequest = (AsynchronousSearchCleanUpRequest) o;
return absoluteTimeInMillis == asynchronousSearchCleanUpRequest.absoluteTimeInMillis;
}
diff --git a/src/main/java/org/opensearch/search/asynchronous/plugin/AsynchronousSearchPlugin.java b/src/main/java/org/opensearch/search/asynchronous/plugin/AsynchronousSearchPlugin.java
index 925c2622..bc931d19 100644
--- a/src/main/java/org/opensearch/search/asynchronous/plugin/AsynchronousSearchPlugin.java
+++ b/src/main/java/org/opensearch/search/asynchronous/plugin/AsynchronousSearchPlugin.java
@@ -1,8 +1,11 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
+ *
+ * The OpenSearch Contributors require contributions made to
+ * this file be licensed under the Apache-2.0 license or a
+ * compatible open source license.
*/
-
package org.opensearch.search.asynchronous.plugin;
import org.opensearch.core.xcontent.NamedXContentRegistry;
@@ -61,7 +64,6 @@
import java.util.List;
import java.util.function.Supplier;
-
public class AsynchronousSearchPlugin extends Plugin implements ActionPlugin, SystemIndexPlugin {
public static final String OPEN_DISTRO_ASYNC_SEARCH_GENERIC_THREAD_POOL_NAME = "opensearch_asynchronous_search_generic";
@@ -74,8 +76,12 @@ public class AsynchronousSearchPlugin extends Plugin implements ActionPlugin, Sy
@Override
public Collection getSystemIndexDescriptors(Settings settings) {
- return Collections.singletonList(new SystemIndexDescriptor(AsynchronousSearchPersistenceService.ASYNC_SEARCH_RESPONSE_INDEX,
- "Stores the response for asynchronous search"));
+ return Collections.singletonList(
+ new SystemIndexDescriptor(
+ AsynchronousSearchPersistenceService.ASYNC_SEARCH_RESPONSE_INDEX,
+ "Stores the response for asynchronous search"
+ )
+ );
}
@Override
@@ -83,70 +89,95 @@ public Collection> getGuiceServiceClasses()
return Collections.singletonList(AsynchronousSearchManagementService.class);
}
-
- //TODO Revisit these once we performance test the feature
+ // TODO Revisit these once we performance test the feature
@Override
public List> getExecutorBuilders(Settings settings) {
final int availableProcessors = OpenSearchExecutors.allocatedProcessors(settings);
List> executorBuilders = new ArrayList<>();
- executorBuilders.add(new ScalingExecutorBuilder(OPEN_DISTRO_ASYNC_SEARCH_GENERIC_THREAD_POOL_NAME, 1,
- Math.min(2 * availableProcessors, Math.max(128, 512)), TimeValue.timeValueMinutes(30)));
+ executorBuilders.add(
+ new ScalingExecutorBuilder(
+ OPEN_DISTRO_ASYNC_SEARCH_GENERIC_THREAD_POOL_NAME,
+ 1,
+ Math.min(2 * availableProcessors, Math.max(128, 512)),
+ TimeValue.timeValueMinutes(30)
+ )
+ );
return executorBuilders;
}
@Override
- public Collection