Skip to content

Commit

Permalink
add logic for stats api
Browse files Browse the repository at this point in the history
Signed-off-by: Ruirui Zhang <[email protected]>
  • Loading branch information
ruai0511 committed Sep 5, 2024
1 parent 375dda3 commit 88c9cff
Show file tree
Hide file tree
Showing 18 changed files with 504 additions and 46 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add `rangeQuery` and `regexpQuery` for `constant_keyword` field type ([#14711](https://github.com/opensearch-project/OpenSearch/pull/14711))
- Add took time to request nodes stats ([#15054](https://github.com/opensearch-project/OpenSearch/pull/15054))
- [Workload Management] Add Get QueryGroup API Logic ([14709](https://github.com/opensearch-project/OpenSearch/pull/14709))
- [Workload Management] Add QueryGroup Stats API Logic ([15777](https://github.com/opensearch-project/OpenSearch/pull/15777))
- [Workload Management] Add Settings for Workload Management feature ([#15028](https://github.com/opensearch-project/OpenSearch/pull/15028))
- [Workload Management] Add Update QueryGroup API Logic ([#14775](https://github.com/opensearch-project/OpenSearch/pull/14775))
- [Workload Management] QueryGroup resource tracking framework changes ([#13897](https://github.com/opensearch-project/OpenSearch/pull/13897))
Expand Down
5 changes: 5 additions & 0 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@
import org.opensearch.action.admin.cluster.storedscripts.TransportPutStoredScriptAction;
import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksAction;
import org.opensearch.action.admin.cluster.tasks.TransportPendingClusterTasksAction;
import org.opensearch.action.admin.cluster.wlm.QueryGroupStatsAction;
import org.opensearch.action.admin.cluster.wlm.TransportQueryGroupStatsAction;
import org.opensearch.action.admin.indices.alias.IndicesAliasesAction;
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.opensearch.action.admin.indices.alias.TransportIndicesAliasesAction;
Expand Down Expand Up @@ -367,6 +369,7 @@
import org.opensearch.rest.action.admin.cluster.RestPendingClusterTasksAction;
import org.opensearch.rest.action.admin.cluster.RestPutRepositoryAction;
import org.opensearch.rest.action.admin.cluster.RestPutStoredScriptAction;
import org.opensearch.rest.action.admin.cluster.RestQueryGroupStatsAction;
import org.opensearch.rest.action.admin.cluster.RestReloadSecureSettingsAction;
import org.opensearch.rest.action.admin.cluster.RestRemoteClusterInfoAction;
import org.opensearch.rest.action.admin.cluster.RestRemoteStoreStatsAction;
Expand Down Expand Up @@ -611,6 +614,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void reg
actions.register(NodesInfoAction.INSTANCE, TransportNodesInfoAction.class);
actions.register(RemoteInfoAction.INSTANCE, TransportRemoteInfoAction.class);
actions.register(NodesStatsAction.INSTANCE, TransportNodesStatsAction.class);
actions.register(QueryGroupStatsAction.INSTANCE, TransportQueryGroupStatsAction.class);
actions.register(RemoteStoreStatsAction.INSTANCE, TransportRemoteStoreStatsAction.class);
actions.register(NodesUsageAction.INSTANCE, TransportNodesUsageAction.class);
actions.register(NodesHotThreadsAction.INSTANCE, TransportNodesHotThreadsAction.class);
Expand Down Expand Up @@ -812,6 +816,7 @@ public void initRestHandlers(Supplier<DiscoveryNodes> nodesInCluster) {
registerHandler.accept(new RestClearVotingConfigExclusionsAction());
registerHandler.accept(new RestMainAction());
registerHandler.accept(new RestNodesInfoAction(settingsFilter));
registerHandler.accept(new RestQueryGroupStatsAction());
registerHandler.accept(new RestRemoteClusterInfoAction());
registerHandler.accept(new RestNodesStatsAction());
registerHandler.accept(new RestNodesUsageAction());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.wlm;

import org.opensearch.action.ActionType;

/**
* Transport action for obtaining QueryGroup Stats.
*
* @opensearch.experimental
*/
public class QueryGroupStatsAction extends ActionType<QueryGroupStatsResponse> {
public static final QueryGroupStatsAction INSTANCE = new QueryGroupStatsAction();
public static final String NAME = "cluster:monitor/query_group_stats";

private QueryGroupStatsAction() {
super(NAME, QueryGroupStatsResponse::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.wlm;

import org.opensearch.action.support.nodes.BaseNodesRequest;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;

import java.io.IOException;

@ExperimentalApi
public class QueryGroupStatsRequest extends BaseNodesRequest<QueryGroupStatsRequest> {

protected QueryGroupStatsRequest(StreamInput in) throws IOException {
super(in);
}

public QueryGroupStatsRequest() {
super(false, (String[]) null);
}

/**
* Get QueryGroup stats from nodes based on the nodes ids specified. If none are passed, stats
* for all nodes will be returned.
*/
public QueryGroupStatsRequest(String... nodesIds) {
super(nodesIds);
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.wlm;

import org.opensearch.action.FailedNodeException;
import org.opensearch.action.support.nodes.BaseNodesResponse;
import org.opensearch.cluster.ClusterName;
import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.common.xcontent.XContentFactory;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.xcontent.ToXContentFragment;
import org.opensearch.core.xcontent.XContentBuilder;
import org.opensearch.wlm.stats.QueryGroupStats;

import java.io.IOException;
import java.util.List;

@ExperimentalApi
public class QueryGroupStatsResponse extends BaseNodesResponse<QueryGroupStats> implements ToXContentFragment {

QueryGroupStatsResponse(StreamInput in) throws IOException {
super(in);
}

QueryGroupStatsResponse(ClusterName clusterName, List<QueryGroupStats> nodes, List<FailedNodeException> failures) {
super(clusterName, nodes, failures);
}

@Override
protected List<QueryGroupStats> readNodesFrom(StreamInput in) throws IOException {
return in.readList(QueryGroupStats::new);
}

@Override
protected void writeNodesTo(StreamOutput out, List<QueryGroupStats> nodes) throws IOException {
out.writeList(nodes);
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
for (QueryGroupStats queryGroupStats : getNodes()) {
builder.startObject(queryGroupStats.getNode().getId());
queryGroupStats.toXContent(builder, params);
builder.endObject();
}
return builder;
}

@Override
public String toString() {
try {
XContentBuilder builder = XContentFactory.jsonBuilder().prettyPrint();
builder.startObject();
toXContent(builder, EMPTY_PARAMS);
builder.endObject();
return builder.toString();
} catch (IOException e) {
return "{ \"error\" : \"" + e.getMessage() + "\"}";
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.action.admin.cluster.wlm;

import org.opensearch.action.FailedNodeException;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.nodes.TransportNodesAction;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportService;
import org.opensearch.wlm.QueryGroupService;
import org.opensearch.wlm.stats.QueryGroupStats;

import java.io.IOException;
import java.util.List;

public class TransportQueryGroupStatsAction extends TransportNodesAction<
QueryGroupStatsRequest,
QueryGroupStatsResponse,
TransportQueryGroupStatsAction.NodeQueryGroupStatsRequest,
QueryGroupStats> {

QueryGroupService queryGroupService;

@Inject
public TransportQueryGroupStatsAction(
ThreadPool threadPool,
ClusterService clusterService,
TransportService transportService,
QueryGroupService queryGroupService,
ActionFilters actionFilters
) {
super(
QueryGroupStatsAction.NAME,
threadPool,
clusterService,
transportService,
actionFilters,
QueryGroupStatsRequest::new,
NodeQueryGroupStatsRequest::new,
ThreadPool.Names.MANAGEMENT,
QueryGroupStats.class
);
this.queryGroupService = queryGroupService;
}

@Override
protected QueryGroupStatsResponse newResponse(
QueryGroupStatsRequest request,
List<QueryGroupStats> queryGroupStats,
List<FailedNodeException> failures
) {
return new QueryGroupStatsResponse(clusterService.getClusterName(), queryGroupStats, failures);
}

@Override
protected NodeQueryGroupStatsRequest newNodeRequest(QueryGroupStatsRequest request) {
return new NodeQueryGroupStatsRequest(request);
}

@Override
protected QueryGroupStats newNodeResponse(StreamInput in) throws IOException {
return new QueryGroupStats(in);
}

@Override
protected QueryGroupStats nodeOperation(NodeQueryGroupStatsRequest nodeQueryGroupStatsRequest) {
return queryGroupService.nodeStats();
}

/**
* Inner QueryGroupStatsRequest
*
* @opensearch.experimental
*/
public static class NodeQueryGroupStatsRequest extends TransportRequest {

protected QueryGroupStatsRequest request;

public NodeQueryGroupStatsRequest(StreamInput in) throws IOException {
super(in);
request = new QueryGroupStatsRequest(in);
}

NodeQueryGroupStatsRequest(QueryGroupStatsRequest request) {
this.request = request;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
request.writeTo(out);
}
}
}
10 changes: 10 additions & 0 deletions server/src/main/java/org/opensearch/client/ClusterAdminClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@
import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksRequest;
import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksRequestBuilder;
import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
import org.opensearch.action.admin.cluster.wlm.QueryGroupStatsRequest;
import org.opensearch.action.admin.cluster.wlm.QueryGroupStatsResponse;
import org.opensearch.action.admin.indices.dangling.delete.DeleteDanglingIndexRequest;
import org.opensearch.action.admin.indices.dangling.import_index.ImportDanglingIndexRequest;
import org.opensearch.action.admin.indices.dangling.list.ListDanglingIndicesRequest;
Expand Down Expand Up @@ -320,6 +322,14 @@ public interface ClusterAdminClient extends OpenSearchClient {
*/
NodesStatsRequestBuilder prepareNodesStats(String... nodesIds);

/**
* QueryGroup stats of the cluster.
*
* @param request The QueryGroupStatsRequest
* @param listener A listener to be notified with a result
*/
void queryGroupStats(QueryGroupStatsRequest request, ActionListener<QueryGroupStatsResponse> listener);

void remoteStoreStats(RemoteStoreStatsRequest request, ActionListener<RemoteStoreStatsResponse> listener);

RemoteStoreStatsRequestBuilder prepareRemoteStoreStats(String index, String shardId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@
import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksRequest;
import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksRequestBuilder;
import org.opensearch.action.admin.cluster.tasks.PendingClusterTasksResponse;
import org.opensearch.action.admin.cluster.wlm.QueryGroupStatsAction;
import org.opensearch.action.admin.cluster.wlm.QueryGroupStatsRequest;
import org.opensearch.action.admin.cluster.wlm.QueryGroupStatsResponse;
import org.opensearch.action.admin.indices.alias.IndicesAliasesAction;
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequest;
import org.opensearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
Expand Down Expand Up @@ -918,6 +921,11 @@ public NodesStatsRequestBuilder prepareNodesStats(String... nodesIds) {
return new NodesStatsRequestBuilder(this, NodesStatsAction.INSTANCE).setNodesIds(nodesIds);
}

@Override
public void queryGroupStats(final QueryGroupStatsRequest request, final ActionListener<QueryGroupStatsResponse> listener) {
execute(QueryGroupStatsAction.INSTANCE, request, listener);
}

@Override
public void remoteStoreStats(final RemoteStoreStatsRequest request, final ActionListener<RemoteStoreStatsResponse> listener) {
execute(RemoteStoreStatsAction.INSTANCE, request, listener);
Expand Down
Loading

0 comments on commit 88c9cff

Please sign in to comment.