From aa293e96bdcdac93914e52dc207e1d7b51603861 Mon Sep 17 00:00:00 2001 From: Ruirui Zhang Date: Thu, 11 Jul 2024 13:55:10 -0700 Subject: [PATCH] Add Delete QueryGroup API Logic Signed-off-by: Ruirui Zhang --- plugins/workload-management/build.gradle | 18 ++ .../wlm/action/DeleteQueryGroupAction.java | 37 ++++ .../wlm/action/DeleteQueryGroupRequest.java | 74 +++++++ .../wlm/action/DeleteQueryGroupResponse.java | 82 ++++++++ .../TransportDeleteQueryGroupAction.java | 58 +++++ .../wlm/action/WorkloadManagementPlugin.java | 62 ++++++ .../WorkloadManagementPluginModule.java | 32 +++ .../plugin/wlm/action/package-info.java | 12 ++ .../rest/RestDeleteQueryGroupAction.java | 69 ++++++ .../plugin/wlm/action/rest/package-info.java | 12 ++ .../wlm/action/service/Persistable.java | 24 +++ .../service/QueryGroupPersistenceService.java | 166 +++++++++++++++ .../wlm/action/service/package-info.java | 12 ++ .../action/DeleteQueryGroupRequestTests.java | 38 ++++ .../action/DeleteQueryGroupResponseTests.java | 136 ++++++++++++ .../wlm/action/QueryGroupTestUtils.java | 100 +++++++++ .../QueryGroupPersistenceServiceTests.java | 53 +++++ .../opensearch/cluster/metadata/Metadata.java | 17 +- .../cluster/metadata/QueryGroup.java | 8 +- .../common/settings/ClusterSettings.java | 8 +- .../QueryGroupServiceSettings.java | 198 ++++++++++++++++++ .../search/query_group/package-info.java | 12 ++ 22 files changed, 1221 insertions(+), 7 deletions(-) create mode 100644 plugins/workload-management/build.gradle create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/DeleteQueryGroupAction.java create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/DeleteQueryGroupRequest.java create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/DeleteQueryGroupResponse.java create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/TransportDeleteQueryGroupAction.java create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/WorkloadManagementPlugin.java create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/WorkloadManagementPluginModule.java create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/package-info.java create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/rest/RestDeleteQueryGroupAction.java create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/rest/package-info.java create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/service/Persistable.java create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/service/QueryGroupPersistenceService.java create mode 100644 plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/service/package-info.java create mode 100644 plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/DeleteQueryGroupRequestTests.java create mode 100644 plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/DeleteQueryGroupResponseTests.java create mode 100644 plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/QueryGroupTestUtils.java create mode 100644 plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/service/QueryGroupPersistenceServiceTests.java create mode 100644 server/src/main/java/org/opensearch/search/query_group/QueryGroupServiceSettings.java create mode 100644 server/src/main/java/org/opensearch/search/query_group/package-info.java diff --git a/plugins/workload-management/build.gradle b/plugins/workload-management/build.gradle new file mode 100644 index 0000000000000..89e13c079795e --- /dev/null +++ b/plugins/workload-management/build.gradle @@ -0,0 +1,18 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + * + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +opensearchplugin { + description 'OpenSearch Workload Management Plugin.' + classname 'org.opensearch.plugin.wlm.action.WorkloadManagementPlugin' +} + +dependencies { +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/DeleteQueryGroupAction.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/DeleteQueryGroupAction.java new file mode 100644 index 0000000000000..0534a9b756d49 --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/DeleteQueryGroupAction.java @@ -0,0 +1,37 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action; + +import org.opensearch.action.ActionType; + +/** + * Transport action for delete QueryGroup + * + * @opensearch.api + */ +public class DeleteQueryGroupAction extends ActionType { + + /** + /** + * An instance of DeleteQueryGroupAction + */ + public static final DeleteQueryGroupAction INSTANCE = new DeleteQueryGroupAction(); + + /** + * Name for DeleteQueryGroupAction + */ + public static final String NAME = "cluster:admin/opensearch/wlm/query_group/_delete"; + + /** + * Default constructor + */ + private DeleteQueryGroupAction() { + super(NAME, DeleteQueryGroupResponse::new); + } +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/DeleteQueryGroupRequest.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/DeleteQueryGroupRequest.java new file mode 100644 index 0000000000000..8ea8dabdd747a --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/DeleteQueryGroupRequest.java @@ -0,0 +1,74 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action; + +import org.opensearch.action.ActionRequest; +import org.opensearch.action.ActionRequestValidationException; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; + +import java.io.IOException; + +/** + * A request for delete QueryGroup + * + * @opensearch.internal + */ +public class DeleteQueryGroupRequest extends ActionRequest implements Writeable.Reader { + String name; + + /** + * Default constructor for DeleteQueryGroupRequest + * @param name - name for the QueryGroup to get + */ + public DeleteQueryGroupRequest(String name) { + this.name = name; + } + + /** + * Constructor for DeleteQueryGroupRequest + * @param in - A {@link StreamInput} object + */ + public DeleteQueryGroupRequest(StreamInput in) throws IOException { + super(in); + name = in.readOptionalString(); + } + + @Override + public DeleteQueryGroupRequest read(StreamInput in) throws IOException { + return new DeleteQueryGroupRequest(in); + } + + @Override + public ActionRequestValidationException validate() { + return null; + } + + /** + * Name getter + */ + public String getName() { + return name; + } + + /** + * Name setter + * @param name - name to be set + */ + public void setName(String name) { + this.name = name; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeOptionalString(name); + } +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/DeleteQueryGroupResponse.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/DeleteQueryGroupResponse.java new file mode 100644 index 0000000000000..62853d01c22da --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/DeleteQueryGroupResponse.java @@ -0,0 +1,82 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action; + +import org.opensearch.cluster.metadata.QueryGroup; +import org.opensearch.core.action.ActionResponse; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.ToXContentObject; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.util.List; + +/** + * Response for the delete API for QueryGroup + * + * @opensearch.internal + */ +public class DeleteQueryGroupResponse extends ActionResponse implements ToXContent, ToXContentObject { + private final List queryGroups; + private RestStatus restStatus; + + /** + * Constructor for DeleteQueryGroupResponse + * @param queryGroups - The QueryGroup list to be fetched + * @param restStatus - The rest status for this response + */ + public DeleteQueryGroupResponse(final List queryGroups, RestStatus restStatus) { + this.queryGroups = queryGroups; + this.restStatus = restStatus; + } + + /** + * Constructor for DeleteQueryGroupResponse + * @param in - A {@link StreamInput} object + */ + public DeleteQueryGroupResponse(StreamInput in) throws IOException { + this.queryGroups = in.readList(QueryGroup::new); + this.restStatus = RestStatus.readFrom(in); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeList(queryGroups); + RestStatus.writeTo(out, restStatus); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.startArray("deleted"); + for (QueryGroup group : queryGroups) { + group.toXContent(builder, params); + } + builder.endArray(); + builder.endObject(); + return builder; + } + + /** + * queryGroups getter + */ + public List getQueryGroups() { + return queryGroups; + } + + /** + * restStatus getter + */ + public RestStatus getRestStatus() { + return restStatus; + } +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/TransportDeleteQueryGroupAction.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/TransportDeleteQueryGroupAction.java new file mode 100644 index 0000000000000..6251f45ae220d --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/TransportDeleteQueryGroupAction.java @@ -0,0 +1,58 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action; + +import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.cluster.metadata.QueryGroup; +import org.opensearch.common.inject.Inject; +import org.opensearch.core.action.ActionListener; +import org.opensearch.plugin.wlm.action.service.Persistable; +import org.opensearch.tasks.Task; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.TransportService; + +/** + * Transport action for delete QueryGroup + * + * @opensearch.internal + */ +public class TransportDeleteQueryGroupAction extends HandledTransportAction { + + private final ThreadPool threadPool; + private final Persistable queryGroupPersistenceService; + + /** + * Constructor for TransportDeleteQueryGroupAction + * + * @param actionName - action name + * @param transportService - a {@link TransportService} object + * @param actionFilters - a {@link ActionFilters} object + * @param threadPool - a {@link ThreadPool} object + * @param queryGroupPersistenceService - a {@link Persistable} object + */ + @Inject + public TransportDeleteQueryGroupAction( + String actionName, + TransportService transportService, + ActionFilters actionFilters, + ThreadPool threadPool, + Persistable queryGroupPersistenceService + ) { + super(DeleteQueryGroupAction.NAME, transportService, actionFilters, DeleteQueryGroupRequest::new); + this.threadPool = threadPool; + this.queryGroupPersistenceService = queryGroupPersistenceService; + } + + @Override + protected void doExecute(Task task, DeleteQueryGroupRequest request, ActionListener listener) { + String name = request.getName(); + threadPool.executor(ThreadPool.Names.GENERIC).execute(() -> queryGroupPersistenceService.delete(name, listener)); + } +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/WorkloadManagementPlugin.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/WorkloadManagementPlugin.java new file mode 100644 index 0000000000000..43a3c0279214b --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/WorkloadManagementPlugin.java @@ -0,0 +1,62 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action; + +import org.opensearch.action.ActionRequest; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.node.DiscoveryNodes; +import org.opensearch.common.inject.Module; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.IndexScopedSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.settings.SettingsFilter; +import org.opensearch.core.action.ActionResponse; +import org.opensearch.plugin.wlm.action.rest.RestDeleteQueryGroupAction; +import org.opensearch.plugins.ActionPlugin; +import org.opensearch.plugins.Plugin; +import org.opensearch.rest.RestController; +import org.opensearch.rest.RestHandler; + +import java.util.Collection; +import java.util.List; +import java.util.function.Supplier; + +/** + * Plugin class for WorkloadManagement + */ +public class WorkloadManagementPlugin extends Plugin implements ActionPlugin { + + /** + * Default constructor + */ + public WorkloadManagementPlugin() {} + + @Override + public List> getActions() { + return List.of(new ActionPlugin.ActionHandler<>(DeleteQueryGroupAction.INSTANCE, TransportDeleteQueryGroupAction.class)); + } + + @Override + public List getRestHandlers( + Settings settings, + RestController restController, + ClusterSettings clusterSettings, + IndexScopedSettings indexScopedSettings, + SettingsFilter settingsFilter, + IndexNameExpressionResolver indexNameExpressionResolver, + Supplier nodesInCluster + ) { + return List.of(new RestDeleteQueryGroupAction()); + } + + @Override + public Collection createGuiceModules() { + return List.of(new WorkloadManagementPluginModule()); + } +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/WorkloadManagementPluginModule.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/WorkloadManagementPluginModule.java new file mode 100644 index 0000000000000..65f92a59a576b --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/WorkloadManagementPluginModule.java @@ -0,0 +1,32 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action; + +import org.opensearch.cluster.metadata.QueryGroup; +import org.opensearch.common.inject.AbstractModule; +import org.opensearch.common.inject.TypeLiteral; +import org.opensearch.plugin.wlm.action.service.Persistable; +import org.opensearch.plugin.wlm.action.service.QueryGroupPersistenceService; + +/** + * Guice Module to manage WorkloadManagement related objects + */ +public class WorkloadManagementPluginModule extends AbstractModule { + + /** + * Constructor for WorkloadManagementPluginModule + */ + public WorkloadManagementPluginModule() {} + + @Override + protected void configure() { + bind(new TypeLiteral>() { + }).to(QueryGroupPersistenceService.class).asEagerSingleton(); + } +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/package-info.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/package-info.java new file mode 100644 index 0000000000000..8f7d2647546f5 --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Base Package of CRUD API of QueryGroup + */ +package org.opensearch.plugin.wlm.action; diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/rest/RestDeleteQueryGroupAction.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/rest/RestDeleteQueryGroupAction.java new file mode 100644 index 0000000000000..579b0b05b2dd2 --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/rest/RestDeleteQueryGroupAction.java @@ -0,0 +1,69 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action.rest; + +import org.opensearch.client.node.NodeClient; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.plugin.wlm.action.DeleteQueryGroupAction; +import org.opensearch.plugin.wlm.action.DeleteQueryGroupRequest; +import org.opensearch.plugin.wlm.action.DeleteQueryGroupResponse; +import org.opensearch.rest.BaseRestHandler; +import org.opensearch.rest.BytesRestResponse; +import org.opensearch.rest.RestChannel; +import org.opensearch.rest.RestRequest; +import org.opensearch.rest.RestResponse; +import org.opensearch.rest.action.RestResponseListener; + +import java.io.IOException; +import java.util.List; + +import static org.opensearch.rest.RestRequest.Method.DELETE; + +/** + * Rest action to delete a QueryGroup + * + * @opensearch.api + */ +public class RestDeleteQueryGroupAction extends BaseRestHandler { + + /** + * Constructor for RestDeleteQueryGroupAction + */ + public RestDeleteQueryGroupAction() {} + + @Override + public String getName() { + return "delete_query_group"; + } + + /** + * The list of {@link Route}s that this RestHandler is responsible for handling. + */ + @Override + public List routes() { + return List.of(new Route(DELETE, "_query_group/{name}"), new Route(DELETE, "_query_group/")); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + String name = request.param("name"); + DeleteQueryGroupRequest deleteQueryGroupRequest = new DeleteQueryGroupRequest(name); + return channel -> client.execute(DeleteQueryGroupAction.INSTANCE, deleteQueryGroupRequest, deleteQueryGroupResponse(channel)); + } + + private RestResponseListener deleteQueryGroupResponse(final RestChannel channel) { + return new RestResponseListener<>(channel) { + @Override + public RestResponse buildResponse(final DeleteQueryGroupResponse response) throws Exception { + return new BytesRestResponse(RestStatus.OK, response.toXContent(channel.newBuilder(), ToXContent.EMPTY_PARAMS)); + } + }; + } +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/rest/package-info.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/rest/package-info.java new file mode 100644 index 0000000000000..783826608c517 --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/rest/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Package for the rest classes for QueryGroup CRUD operations + */ +package org.opensearch.plugin.wlm.action.rest; diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/service/Persistable.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/service/Persistable.java new file mode 100644 index 0000000000000..794245ac7ade5 --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/service/Persistable.java @@ -0,0 +1,24 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action.service; + +import org.opensearch.core.action.ActionListener; +import org.opensearch.plugin.wlm.action.DeleteQueryGroupResponse; + +/** + * This interface defines the key APIs for implementing QueruGroup persistence + */ +public interface Persistable { + /** + * delete the QueryGroup in a durable storage + * @param name - QueryGroup name to be deleted + * @param listener - ActionListener of DeleteQueryGroupResponse + */ + void delete(String name, ActionListener listener); +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/service/QueryGroupPersistenceService.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/service/QueryGroupPersistenceService.java new file mode 100644 index 0000000000000..c37b71d366d63 --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/service/QueryGroupPersistenceService.java @@ -0,0 +1,166 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action.service; + +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.ClusterStateUpdateTask; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.QueryGroup; +import org.opensearch.cluster.service.ClusterManagerTaskThrottler.ThrottlingKey; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.Priority; +import org.opensearch.common.inject.Inject; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.action.ActionListener; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.plugin.wlm.action.DeleteQueryGroupResponse; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.DoubleAdder; + +import static org.opensearch.search.query_group.QueryGroupServiceSettings.MAX_QUERY_GROUP_COUNT; + +/** + * This class defines the functions for QueryGroup persistence + */ +public class QueryGroupPersistenceService implements Persistable { + private static final Logger logger = LogManager.getLogger(QueryGroupPersistenceService.class); + private final ClusterService clusterService; + private static final String SOURCE = "query-group-persistence-service"; + private static final String CREATE_QUERY_GROUP_THROTTLING_KEY = "create-query-group"; + private static final String UPDATE_QUERY_GROUP_THROTTLING_KEY = "update-query-group"; + private static final String DELETE_QUERY_GROUP_THROTTLING_KEY = "delete-query-group"; + private final AtomicInteger inflightCreateQueryGroupRequestCount; + private final Map inflightResourceLimitValues; + private volatile int maxQueryGroupCount; + final ThrottlingKey createQueryGroupThrottlingKey; + final ThrottlingKey updateQueryGroupThrottlingKey; + final ThrottlingKey deleteQueryGroupThrottlingKey; + + /** + * Constructor for QueryGroupPersistenceService + * + * @param clusterService {@link ClusterService} - The cluster service to be used by QueryGroupPersistenceService + * @param settings {@link Settings} - The settings to be used by QueryGroupPersistenceService + * @param clusterSettings {@link ClusterSettings} - The cluster settings to be used by QueryGroupPersistenceService + */ + @Inject + public QueryGroupPersistenceService( + final ClusterService clusterService, + final Settings settings, + final ClusterSettings clusterSettings + ) { + this.clusterService = clusterService; + this.createQueryGroupThrottlingKey = clusterService.registerClusterManagerTask(CREATE_QUERY_GROUP_THROTTLING_KEY, true); + this.deleteQueryGroupThrottlingKey = clusterService.registerClusterManagerTask(DELETE_QUERY_GROUP_THROTTLING_KEY, true); + this.updateQueryGroupThrottlingKey = clusterService.registerClusterManagerTask(UPDATE_QUERY_GROUP_THROTTLING_KEY, true); + maxQueryGroupCount = MAX_QUERY_GROUP_COUNT.get(settings); + clusterSettings.addSettingsUpdateConsumer(MAX_QUERY_GROUP_COUNT, this::setMaxQueryGroupCount); + inflightCreateQueryGroupRequestCount = new AtomicInteger(); + inflightResourceLimitValues = new HashMap<>(); + } + + /** + * Set maxQueryGroupCount to be newMaxQueryGroupCount + * @param newMaxQueryGroupCount - the max number of QueryGroup allowed + */ + public void setMaxQueryGroupCount(int newMaxQueryGroupCount) { + if (newMaxQueryGroupCount < 0) { + throw new IllegalArgumentException("node.query_group.max_count can't be negative"); + } + this.maxQueryGroupCount = newMaxQueryGroupCount; + } + + @Override + public void delete(String name, ActionListener listener) { + deleteInClusterStateMetadata(name, listener); + } + + /** + * Modify cluster state to delete the QueryGroup + * @param name - the name for QueryGroup to be deleted + */ + void deleteInClusterStateMetadata(String name, ActionListener listener) { + clusterService.submitStateUpdateTask(SOURCE, new ClusterStateUpdateTask(Priority.URGENT) { + @Override + public ClusterState execute(ClusterState currentState) throws Exception { + return deleteQueryGroupInClusterState(name, currentState); + } + + @Override + public ThrottlingKey getClusterManagerThrottlingKey() { + return deleteQueryGroupThrottlingKey; + } + + @Override + public void onFailure(String source, Exception e) { + logger.warn("Failed to delete QueryGroup due to error: {}, for source: {}", e.getMessage(), source); + listener.onFailure(e); + } + + @Override + public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { + final Map oldGroupsMap = oldState.metadata().queryGroups(); + final Map newGroupsMap = newState.metadata().queryGroups(); + List deletedGroups = new ArrayList<>(); + for (String groupId : oldGroupsMap.keySet()) { + if (!newGroupsMap.containsKey(groupId)) { + deletedGroups.add(oldGroupsMap.get(groupId)); + } + } + DeleteQueryGroupResponse response = new DeleteQueryGroupResponse(deletedGroups, RestStatus.OK); + listener.onResponse(response); + } + }); + } + + /** + * Modify cluster state to delete the QueryGroup + * @param name - the name for QueryGroup to be deleted + * @param currentClusterState - current cluster state + */ + ClusterState deleteQueryGroupInClusterState(final String name, final ClusterState currentClusterState) { + final Metadata metadata = currentClusterState.metadata(); + final Map previousGroups = metadata.queryGroups(); + + if (name == null || name.isEmpty()) { // delete all + return ClusterState.builder(currentClusterState) + .metadata(Metadata.builder(metadata).queryGroups(new HashMap<>()).build()) + .build(); + } + for (QueryGroup queryGroup : previousGroups.values()) { + if (queryGroup.getName().equals(name)) { + return ClusterState.builder(currentClusterState).metadata(Metadata.builder(metadata).remove(queryGroup).build()).build(); + } + } + logger.error("The QueryGroup with provided name {} doesn't exist", name); + throw new RuntimeException("No QueryGroup exists with the provided name: " + name); + } + + /** + * inflightCreateQueryGroupRequestCount getter + */ + public AtomicInteger getInflightCreateQueryGroupRequestCount() { + return inflightCreateQueryGroupRequestCount; + } + + /** + * inflightResourceLimitValues getter + */ + public Map getInflightResourceLimitValues() { + return inflightResourceLimitValues; + } +} diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/service/package-info.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/service/package-info.java new file mode 100644 index 0000000000000..e70ac3afb81b5 --- /dev/null +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/action/service/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * Package for the service classes for QueryGroup CRUD operations + */ +package org.opensearch.plugin.wlm.action.service; diff --git a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/DeleteQueryGroupRequestTests.java b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/DeleteQueryGroupRequestTests.java new file mode 100644 index 0000000000000..8437dfc738488 --- /dev/null +++ b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/DeleteQueryGroupRequestTests.java @@ -0,0 +1,38 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; + +public class DeleteQueryGroupRequestTests extends OpenSearchTestCase { + + public void testSerialization() throws IOException { + DeleteQueryGroupRequest request = new DeleteQueryGroupRequest(QueryGroupTestUtils.NAME_ONE); + assertEquals(QueryGroupTestUtils.NAME_ONE, request.getName()); + BytesStreamOutput out = new BytesStreamOutput(); + request.writeTo(out); + StreamInput streamInput = out.bytes().streamInput(); + DeleteQueryGroupRequest otherRequest = new DeleteQueryGroupRequest(streamInput); + assertEquals(request.getName(), otherRequest.getName()); + } + + public void testSerializationWithNull() throws IOException { + DeleteQueryGroupRequest request = new DeleteQueryGroupRequest((String) null); + assertNull(request.getName()); + BytesStreamOutput out = new BytesStreamOutput(); + request.writeTo(out); + StreamInput streamInput = out.bytes().streamInput(); + DeleteQueryGroupRequest otherRequest = new DeleteQueryGroupRequest(streamInput); + assertEquals(request.getName(), otherRequest.getName()); + } +} diff --git a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/DeleteQueryGroupResponseTests.java b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/DeleteQueryGroupResponseTests.java new file mode 100644 index 0000000000000..99f709bd1fbfe --- /dev/null +++ b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/DeleteQueryGroupResponseTests.java @@ -0,0 +1,136 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action; + +import org.opensearch.cluster.metadata.QueryGroup; +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.common.xcontent.json.JsonXContent; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.rest.RestStatus; +import org.opensearch.core.xcontent.ToXContent; +import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.opensearch.plugin.wlm.action.QueryGroupTestUtils.compareQueryGroups; +import static org.opensearch.plugin.wlm.action.QueryGroupTestUtils.queryGroupList; +import static org.opensearch.plugin.wlm.action.QueryGroupTestUtils.queryGroupOne; +import static org.opensearch.plugin.wlm.action.QueryGroupTestUtils.queryGroupTwo; +import static org.mockito.Mockito.mock; + +public class DeleteQueryGroupResponseTests extends OpenSearchTestCase { + + public void testSerializationSingleQueryGroup() throws IOException { + List list = new ArrayList<>(); + list.add(queryGroupOne); + DeleteQueryGroupResponse response = new DeleteQueryGroupResponse(list, RestStatus.OK); + assertEquals(response.getQueryGroups(), list); + + BytesStreamOutput out = new BytesStreamOutput(); + response.writeTo(out); + StreamInput streamInput = out.bytes().streamInput(); + + DeleteQueryGroupResponse otherResponse = new DeleteQueryGroupResponse(streamInput); + assertEquals(response.getRestStatus(), otherResponse.getRestStatus()); + compareQueryGroups(response.getQueryGroups(), otherResponse.getQueryGroups()); + } + + public void testSerializationMultipleQueryGroup() throws IOException { + DeleteQueryGroupResponse response = new DeleteQueryGroupResponse(queryGroupList(), RestStatus.OK); + assertEquals(response.getQueryGroups(), queryGroupList()); + + BytesStreamOutput out = new BytesStreamOutput(); + response.writeTo(out); + StreamInput streamInput = out.bytes().streamInput(); + + DeleteQueryGroupResponse otherResponse = new DeleteQueryGroupResponse(streamInput); + assertEquals(response.getRestStatus(), otherResponse.getRestStatus()); + assertEquals(2, otherResponse.getQueryGroups().size()); + compareQueryGroups(response.getQueryGroups(), otherResponse.getQueryGroups()); + } + + public void testSerializationNull() throws IOException { + List list = new ArrayList<>(); + DeleteQueryGroupResponse response = new DeleteQueryGroupResponse(list, RestStatus.OK); + assertEquals(response.getQueryGroups(), list); + + BytesStreamOutput out = new BytesStreamOutput(); + response.writeTo(out); + StreamInput streamInput = out.bytes().streamInput(); + + DeleteQueryGroupResponse otherResponse = new DeleteQueryGroupResponse(streamInput); + assertEquals(response.getRestStatus(), otherResponse.getRestStatus()); + assertEquals(0, otherResponse.getQueryGroups().size()); + } + + public void testToXContentDeleteSingleQueryGroup() throws IOException { + List queryGroupList = new ArrayList<>(); + queryGroupList.add(queryGroupOne); + XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint(); + DeleteQueryGroupResponse otherResponse = new DeleteQueryGroupResponse(queryGroupList, RestStatus.OK); + String actual = otherResponse.toXContent(builder, mock(ToXContent.Params.class)).toString(); + String expected = "{\n" + + " \"deleted\" : [\n" + + " {\n" + + " \"_id\" : \"AgfUO5Ja9yfsYlONlYi3TQ==\",\n" + + " \"name\" : \"query_group_one\",\n" + + " \"resiliency_mode\" : \"monitor\",\n" + + " \"updatedAt\" : 4513232413,\n" + + " \"resourceLimits\" : {\n" + + " \"memory\" : 0.3\n" + + " }\n" + + " }\n" + + " ]\n" + + "}"; + assertEquals(expected, actual); + } + + public void testToXContentDeleteMultipleQueryGroup() throws IOException { + List queryGroupList = new ArrayList<>(); + queryGroupList.add(queryGroupOne); + queryGroupList.add(queryGroupTwo); + XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint(); + DeleteQueryGroupResponse otherResponse = new DeleteQueryGroupResponse(queryGroupList, RestStatus.OK); + String actual = otherResponse.toXContent(builder, mock(ToXContent.Params.class)).toString(); + String expected = "{\n" + + " \"deleted\" : [\n" + + " {\n" + + " \"_id\" : \"AgfUO5Ja9yfsYlONlYi3TQ==\",\n" + + " \"name\" : \"query_group_one\",\n" + + " \"resiliency_mode\" : \"monitor\",\n" + + " \"updatedAt\" : 4513232413,\n" + + " \"resourceLimits\" : {\n" + + " \"memory\" : 0.3\n" + + " }\n" + + " },\n" + + " {\n" + + " \"_id\" : \"G5iIqHy4g7eK1qIAAAAIH53=1\",\n" + + " \"name\" : \"query_group_two\",\n" + + " \"resiliency_mode\" : \"monitor\",\n" + + " \"updatedAt\" : 4513232415,\n" + + " \"resourceLimits\" : {\n" + + " \"memory\" : 0.6\n" + + " }\n" + + " }\n" + + " ]\n" + + "}"; + assertEquals(expected, actual); + } + + public void testToXContentZeroSingleQueryGroup() throws IOException { + XContentBuilder builder = JsonXContent.contentBuilder().prettyPrint(); + DeleteQueryGroupResponse otherResponse = new DeleteQueryGroupResponse(new ArrayList<>(), RestStatus.OK); + String actual = otherResponse.toXContent(builder, mock(ToXContent.Params.class)).toString(); + String expected = "{\n" + " \"deleted\" : [ ]\n" + "}"; + assertEquals(expected, actual); + } +} diff --git a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/QueryGroupTestUtils.java b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/QueryGroupTestUtils.java new file mode 100644 index 0000000000000..20f180460a4eb --- /dev/null +++ b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/QueryGroupTestUtils.java @@ -0,0 +1,100 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action; + +import org.opensearch.cluster.ClusterName; +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.QueryGroup; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Settings; +import org.opensearch.plugin.wlm.action.service.QueryGroupPersistenceService; +import org.opensearch.threadpool.ThreadPool; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.atomic.DoubleAdder; + +import static org.opensearch.cluster.metadata.QueryGroup.builder; +import static org.opensearch.search.ResourceType.fromName; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; + +public class QueryGroupTestUtils { + public static final String NAME_ONE = "query_group_one"; + public static final String NAME_TWO = "query_group_two"; + public static final String _ID_ONE = "AgfUO5Ja9yfsYlONlYi3TQ=="; + public static final String _ID_TWO = "G5iIqHy4g7eK1qIAAAAIH53=1"; + public static final String NAME_NONE_EXISTED = "query_group_none_existed"; + public static final String MEMORY_STRING = "memory"; + public static final String MONITOR_STRING = "monitor"; + public static final long TIMESTAMP_ONE = 4513232413L; + public static final long TIMESTAMP_TWO = 4513232415L; + public static final QueryGroup queryGroupOne = builder().name(NAME_ONE) + ._id(_ID_ONE) + .mode(MONITOR_STRING) + .resourceLimits(Map.of(fromName(MEMORY_STRING), 0.3)) + .updatedAt(TIMESTAMP_ONE) + .build(); + + public static final QueryGroup queryGroupTwo = builder().name(NAME_TWO) + ._id(_ID_TWO) + .mode(MONITOR_STRING) + .resourceLimits(Map.of(fromName(MEMORY_STRING), 0.6)) + .updatedAt(TIMESTAMP_TWO) + .build(); + + public static List queryGroupList() { + List list = new ArrayList<>(); + list.add(queryGroupOne); + list.add(queryGroupTwo); + return list; + } + + public static ClusterState clusterState() { + final Metadata metadata = Metadata.builder().queryGroups(Map.of(_ID_ONE, queryGroupOne, _ID_TWO, queryGroupTwo)).build(); + return ClusterState.builder(new ClusterName("_name")).metadata(metadata).build(); + } + + public static Settings settings() { + return Settings.builder().build(); + } + + public static ClusterSettings clusterSettings() { + return new ClusterSettings(settings(), ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); + } + + public static QueryGroupPersistenceService queryGroupPersistenceService() { + ClusterService clusterService = new ClusterService(settings(), clusterSettings(), mock(ThreadPool.class)); + return new QueryGroupPersistenceService(clusterService, settings(), clusterSettings()); + } + + public static void compareQueryGroups(List listOne, List listTwo) { + assertEquals(listOne.size(), listTwo.size()); + listOne.sort(Comparator.comparing(QueryGroup::getName)); + listTwo.sort(Comparator.comparing(QueryGroup::getName)); + for (int i = 0; i < listOne.size(); i++) { + assertTrue(listOne.get(i).equals(listTwo.get(i))); + } + } + + public static void assertInflightValuesAreZero(QueryGroupPersistenceService queryGroupPersistenceService) { + assertEquals(0, queryGroupPersistenceService.getInflightCreateQueryGroupRequestCount().get()); + Map inflightResourceMap = queryGroupPersistenceService.getInflightResourceLimitValues(); + if (inflightResourceMap != null) { + for (String resourceName : inflightResourceMap.keySet()) { + assertEquals(0, inflightResourceMap.get(resourceName).intValue()); + } + } + } +} diff --git a/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/service/QueryGroupPersistenceServiceTests.java b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/service/QueryGroupPersistenceServiceTests.java new file mode 100644 index 0000000000000..0b6b9a579492e --- /dev/null +++ b/plugins/workload-management/src/test/java/org/opensearch/plugin/wlm/action/service/QueryGroupPersistenceServiceTests.java @@ -0,0 +1,53 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.plugin.wlm.action.service; + +import org.opensearch.cluster.ClusterState; +import org.opensearch.cluster.metadata.QueryGroup; +import org.opensearch.test.OpenSearchTestCase; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.opensearch.plugin.wlm.action.QueryGroupTestUtils.NAME_NONE_EXISTED; +import static org.opensearch.plugin.wlm.action.QueryGroupTestUtils.NAME_TWO; +import static org.opensearch.plugin.wlm.action.QueryGroupTestUtils._ID_TWO; +import static org.opensearch.plugin.wlm.action.QueryGroupTestUtils.assertInflightValuesAreZero; +import static org.opensearch.plugin.wlm.action.QueryGroupTestUtils.clusterState; +import static org.opensearch.plugin.wlm.action.QueryGroupTestUtils.compareQueryGroups; +import static org.opensearch.plugin.wlm.action.QueryGroupTestUtils.queryGroupOne; +import static org.opensearch.plugin.wlm.action.QueryGroupTestUtils.queryGroupPersistenceService; + +public class QueryGroupPersistenceServiceTests extends OpenSearchTestCase { + public void testDeleteSingleQueryGroup() { + ClusterState newClusterState = queryGroupPersistenceService().deleteQueryGroupInClusterState(NAME_TWO, clusterState()); + Map afterDeletionGroups = newClusterState.getMetadata().queryGroups(); + assertFalse(afterDeletionGroups.containsKey(_ID_TWO)); + assertEquals(1, afterDeletionGroups.size()); + List oldQueryGroups = new ArrayList<>(); + oldQueryGroups.add(queryGroupOne); + compareQueryGroups(new ArrayList<>(afterDeletionGroups.values()), oldQueryGroups); + assertInflightValuesAreZero(queryGroupPersistenceService()); + } + + public void testDeleteAllQueryGroups() { + ClusterState newClusterState = queryGroupPersistenceService().deleteQueryGroupInClusterState(null, clusterState()); + Map afterDeletionGroups = newClusterState.getMetadata().queryGroups(); + assertEquals(0, afterDeletionGroups.size()); + assertInflightValuesAreZero(queryGroupPersistenceService()); + } + + public void testDeleteNonExistedQueryGroup() { + assertThrows( + RuntimeException.class, + () -> queryGroupPersistenceService().deleteQueryGroupInClusterState(NAME_NONE_EXISTED, clusterState()) + ); + } +} diff --git a/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java b/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java index 2a54f6444ffda..481bf89de9f43 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/Metadata.java @@ -844,6 +844,12 @@ public Map views() { return Optional.ofNullable((ViewMetadata) this.custom(ViewMetadata.TYPE)).map(ViewMetadata::views).orElse(Collections.emptyMap()); } + public Map queryGroups() { + return Optional.ofNullable((QueryGroupMetadata) this.custom(QueryGroupMetadata.TYPE)) + .map(QueryGroupMetadata::queryGroups) + .orElse(Collections.emptyMap()); + } + public DecommissionAttributeMetadata decommissionAttributeMetadata() { return custom(DecommissionAttributeMetadata.TYPE); } @@ -1380,6 +1386,13 @@ public Builder put(final QueryGroup queryGroup) { return queryGroups(existing); } + public Builder remove(final QueryGroup queryGroup) { + Objects.requireNonNull(queryGroup, "queryGroup should not be null"); + Map existing = new HashMap<>(getQueryGroups()); + existing.remove(queryGroup.get_id()); + return queryGroups(existing); + } + private Map getQueryGroups() { return Optional.ofNullable(this.customs.get(QueryGroupMetadata.TYPE)) .map(o -> (QueryGroupMetadata) o) @@ -1783,8 +1796,8 @@ static void validateDataStreams(SortedMap indicesLooku for (DataStream ds : dsMetadata.dataStreams().values()) { String prefix = DataStream.BACKING_INDEX_PREFIX + ds.getName() + "-"; Set conflicts = indicesLookup.subMap(prefix, DataStream.BACKING_INDEX_PREFIX + ds.getName() + ".") // '.' is the - // char after - // '-' + // char after + // '-' .keySet() .stream() .filter(s -> NUMBER_PATTERN.matcher(s.substring(prefix.length())).matches()) diff --git a/server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java b/server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java index beaab198073df..96bb098cbb6e6 100644 --- a/server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java +++ b/server/src/main/java/org/opensearch/cluster/metadata/QueryGroup.java @@ -30,7 +30,7 @@ * { * "_id": "fafjafjkaf9ag8a9ga9g7ag0aagaga", * "resourceLimits": { - * "jvm": 0.4 + * "memory": 0.4 * }, * "resiliency_mode": "enforced", * "name": "analytics", @@ -112,8 +112,8 @@ private void validateResourceLimits(Map resourceLimits) { Objects.requireNonNull(resource.getKey(), "resourceName can't be null"); Objects.requireNonNull(threshold, "resource limit threshold for" + resource.getKey().getName() + " : can't be null"); - if (Double.compare(threshold, 1.0) > 0) { - throw new IllegalArgumentException("resource value should be less than 1.0"); + if (Double.compare(threshold, 0.0) <= 0 || Double.compare(threshold, 1.0) > 0) { + throw new IllegalArgumentException("resource value should be greater than 0 and less or equal to 1.0"); } } } @@ -201,6 +201,7 @@ public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; QueryGroup that = (QueryGroup) o; return Objects.equals(name, that.name) + && Objects.equals(resiliencyMode, that.resiliencyMode) && Objects.equals(resourceLimits, that.resourceLimits) && Objects.equals(_id, that._id) && updatedAtInMillis == that.updatedAtInMillis; @@ -268,7 +269,6 @@ public static ResiliencyMode fromName(String s) { } throw new IllegalArgumentException("Invalid value for QueryGroupMode: " + s); } - } /** diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 5dcf23ae52294..9ee69c1428d83 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -158,6 +158,7 @@ import org.opensearch.search.backpressure.settings.SearchShardTaskSettings; import org.opensearch.search.backpressure.settings.SearchTaskSettings; import org.opensearch.search.fetch.subphase.highlight.FastVectorHighlighter; +import org.opensearch.search.query_group.QueryGroupServiceSettings; import org.opensearch.snapshots.InternalSnapshotsInfoService; import org.opensearch.snapshots.SnapshotsService; import org.opensearch.tasks.TaskCancellationMonitoringSettings; @@ -758,7 +759,12 @@ public void apply(Settings value, Settings current, Settings previous) { SearchService.CLUSTER_ALLOW_DERIVED_FIELD_SETTING, // Composite index settings - CompositeIndexSettings.STAR_TREE_INDEX_ENABLED_SETTING + CompositeIndexSettings.STAR_TREE_INDEX_ENABLED_SETTING, + + // QueryGroup settings + QueryGroupServiceSettings.MAX_QUERY_GROUP_COUNT, + QueryGroupServiceSettings.NODE_LEVEL_REJECTION_THRESHOLD, + QueryGroupServiceSettings.NODE_LEVEL_CANCELLATION_THRESHOLD ) ) ); diff --git a/server/src/main/java/org/opensearch/search/query_group/QueryGroupServiceSettings.java b/server/src/main/java/org/opensearch/search/query_group/QueryGroupServiceSettings.java new file mode 100644 index 0000000000000..425916ce3e924 --- /dev/null +++ b/server/src/main/java/org/opensearch/search/query_group/QueryGroupServiceSettings.java @@ -0,0 +1,198 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.search.query_group; + +import org.opensearch.common.settings.ClusterSettings; +import org.opensearch.common.settings.Setting; +import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; + +/** + * Main class to declare the QueryGroup feature related settings + */ +public class QueryGroupServiceSettings { + private static final Long DEFAULT_RUN_INTERVAL_MILLIS = 1000l; + private static final Double DEFAULT_NODE_LEVEL_REJECTION_THRESHOLD = 0.8; + private static final Double DEFAULT_NODE_LEVEL_CANCELLATION_THRESHOLD = 0.9; + /** + * default max queryGroup count on any node at any given point in time + */ + public static final int DEFAULT_MAX_QUERY_GROUP_COUNT_VALUE = 100; + + public static final String QUERY_GROUP_COUNT_SETTING_NAME = "node.query_group.max_count"; + public static final double NODE_LEVEL_CANCELLATION_THRESHOLD_MAX_VALUE = 0.95; + public static final double NODE_LEVEL_REJECTION_THRESHOLD_MAX_VALUE = 0.90; + + private TimeValue runIntervalMillis; + private Double nodeLevelMemoryCancellationThreshold; + private Double nodeLevelMemoryRejectionThreshold; + private volatile int maxQueryGroupCount; + /** + * max QueryGroup count setting + */ + public static final Setting MAX_QUERY_GROUP_COUNT = Setting.intSetting( + QUERY_GROUP_COUNT_SETTING_NAME, + DEFAULT_MAX_QUERY_GROUP_COUNT_VALUE, + 0, + (newVal) -> { + if (newVal > 100 || newVal < 1) throw new IllegalArgumentException( + QUERY_GROUP_COUNT_SETTING_NAME + " should be in range [1-100]" + ); + }, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + /** + * Setting name for default QueryGroup count + */ + public static final String SERVICE_RUN_INTERVAL_MILLIS_SETTING_NAME = "query_group.service.run_interval_millis"; + /** + * Setting to control the run interval of QSB service + */ + private static final Setting QUERY_GROUP_RUN_INTERVAL_SETTING = Setting.longSetting( + SERVICE_RUN_INTERVAL_MILLIS_SETTING_NAME, + DEFAULT_RUN_INTERVAL_MILLIS, + 1, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * Setting name for node level rejection threshold for QSB + */ + public static final String NODE_REJECTION_THRESHOLD_SETTING_NAME = "query_group.node.rejection_threshold"; + /** + * Setting to control the rejection threshold + */ + public static final Setting NODE_LEVEL_REJECTION_THRESHOLD = Setting.doubleSetting( + NODE_REJECTION_THRESHOLD_SETTING_NAME, + DEFAULT_NODE_LEVEL_REJECTION_THRESHOLD, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + /** + * Setting name for node level cancellation threshold + */ + public static final String NODE_CANCELLATION_THRESHOLD_SETTING_NAME = "query_group.node.cancellation_threshold"; + /** + * Setting name for node level cancellation threshold + */ + public static final Setting NODE_LEVEL_CANCELLATION_THRESHOLD = Setting.doubleSetting( + NODE_CANCELLATION_THRESHOLD_SETTING_NAME, + DEFAULT_NODE_LEVEL_CANCELLATION_THRESHOLD, + Setting.Property.Dynamic, + Setting.Property.NodeScope + ); + + /** + * QueryGroup service settings constructor + * @param settings - QueryGroup service settings + * @param clusterSettings - QueryGroup cluster settings + */ + public QueryGroupServiceSettings(Settings settings, ClusterSettings clusterSettings) { + runIntervalMillis = new TimeValue(QUERY_GROUP_RUN_INTERVAL_SETTING.get(settings)); + nodeLevelMemoryCancellationThreshold = NODE_LEVEL_CANCELLATION_THRESHOLD.get(settings); + nodeLevelMemoryRejectionThreshold = NODE_LEVEL_REJECTION_THRESHOLD.get(settings); + maxQueryGroupCount = MAX_QUERY_GROUP_COUNT.get(settings); + + ensureRejectionThresholdIsLessThanCancellation(nodeLevelMemoryRejectionThreshold, nodeLevelMemoryCancellationThreshold); + + clusterSettings.addSettingsUpdateConsumer(MAX_QUERY_GROUP_COUNT, this::setMaxQueryGroupCount); + clusterSettings.addSettingsUpdateConsumer(NODE_LEVEL_CANCELLATION_THRESHOLD, this::setNodeLevelMemoryCancellationThreshold); + clusterSettings.addSettingsUpdateConsumer(NODE_LEVEL_REJECTION_THRESHOLD, this::setNodeLevelMemoryRejectionThreshold); + } + + /** + * Method to get runInterval for QSB + * @return runInterval in milliseconds for QSB Service + */ + public TimeValue getRunIntervalMillis() { + return runIntervalMillis; + } + + /** + * Method to set the new QueryGroup count + * @param newMaxQueryGroupCount is the new maxQueryGroupCount per node + */ + public void setMaxQueryGroupCount(int newMaxQueryGroupCount) { + if (newMaxQueryGroupCount < 0) { + throw new IllegalArgumentException("node.node.query_group.max_count can't be negative"); + } + this.maxQueryGroupCount = newMaxQueryGroupCount; + } + + /** + * Method to get the node level cancellation threshold + * @return current node level cancellation threshold + */ + public Double getNodeLevelMemoryCancellationThreshold() { + return nodeLevelMemoryCancellationThreshold; + } + + /** + * Method to set the node level cancellation threshold + * @param nodeLevelMemoryCancellationThreshold sets the new node level cancellation threshold + * @throws IllegalArgumentException if the value is > 0.95 and cancellation < rejection threshold + */ + public void setNodeLevelMemoryCancellationThreshold(Double nodeLevelMemoryCancellationThreshold) { + if (Double.compare(nodeLevelMemoryCancellationThreshold, NODE_LEVEL_CANCELLATION_THRESHOLD_MAX_VALUE) > 0) { + throw new IllegalArgumentException( + NODE_CANCELLATION_THRESHOLD_SETTING_NAME + " value should not be greater than 0.95 as it pose a threat of node drop" + ); + } + + ensureRejectionThresholdIsLessThanCancellation(nodeLevelMemoryRejectionThreshold, nodeLevelMemoryCancellationThreshold); + + this.nodeLevelMemoryCancellationThreshold = nodeLevelMemoryCancellationThreshold; + } + + /** + * Method to get the node level rejection threshold + * @return the current node level rejection threshold + */ + public Double getNodeLevelMemoryRejectionThreshold() { + return nodeLevelMemoryRejectionThreshold; + } + + /** + * Method to set the node level rejection threshold + * @param nodeLevelMemoryRejectionThreshold sets the new rejection threshold + * @throws IllegalArgumentException if rejection > 0.90 and rejection < cancellation threshold + */ + public void setNodeLevelMemoryRejectionThreshold(Double nodeLevelMemoryRejectionThreshold) { + if (Double.compare(nodeLevelMemoryRejectionThreshold, NODE_LEVEL_REJECTION_THRESHOLD_MAX_VALUE) > 0) { + throw new IllegalArgumentException( + NODE_REJECTION_THRESHOLD_SETTING_NAME + " value not be greater than 0.90 as it pose a threat of node drop" + ); + } + + ensureRejectionThresholdIsLessThanCancellation(nodeLevelMemoryRejectionThreshold, nodeLevelMemoryCancellationThreshold); + + this.nodeLevelMemoryRejectionThreshold = nodeLevelMemoryRejectionThreshold; + } + + private void ensureRejectionThresholdIsLessThanCancellation( + Double nodeLevelMemoryRejectionThreshold, + Double nodeLevelMemoryCancellationThreshold + ) { + if (Double.compare(nodeLevelMemoryCancellationThreshold, nodeLevelMemoryRejectionThreshold) < 0) { + throw new IllegalArgumentException( + NODE_CANCELLATION_THRESHOLD_SETTING_NAME + " value should not be less than " + NODE_REJECTION_THRESHOLD_SETTING_NAME + ); + } + } + + /** + * Method to get the current QueryGroup count + * @return the current max QueryGroup count + */ + public int getMaxQueryGroupCount() { + return maxQueryGroupCount; + } +} diff --git a/server/src/main/java/org/opensearch/search/query_group/package-info.java b/server/src/main/java/org/opensearch/search/query_group/package-info.java new file mode 100644 index 0000000000000..00b68b0d3306c --- /dev/null +++ b/server/src/main/java/org/opensearch/search/query_group/package-info.java @@ -0,0 +1,12 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/** + * QueryGroup related artifacts + */ +package org.opensearch.search.query_group;