Skip to content

Commit

Permalink
Adding tests, making pit action names consistent
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <[email protected]>
  • Loading branch information
bharath-techie committed Jul 14, 2022
1 parent 97786e1 commit e693b63
Show file tree
Hide file tree
Showing 22 changed files with 180 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -885,7 +885,8 @@ public void testApiNamingConventions() throws Exception {
"nodes.hot_threads",
"nodes.usage",
"nodes.reload_secure_settings",
"search_shards", };
"search_shards",
"get_all_pits" };
List<String> booleanReturnMethods = Arrays.asList("security.enable_user", "security.disable_user", "security.change_password");
Set<String> deprecatedMethods = new HashSet<>();
deprecatedMethods.add("indices.force_merge");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"create_pit":{
"documentation":{
"url":"https://opensearch.org/docs/latest/opensearch/rest-api/point_in_time/",
"description":"Creates point in time context."
"description":"Creates point in time search context."
},
"stability":"experimental",
"url":{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"get_all_pits":{
"documentation":{
"url":"https://opensearch.org/docs/latest/opensearch/rest-api/point_in_time/",
"description":"Get all active point in time searches."
},
"stability":"experimental",
"url":{
"paths":[
{
"path":"/_search/point_in_time/all",
"methods":[
"GET"
]
}
]
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,18 @@
keep_alive: 23h

- set: {id: pit_id}
- set: {creation_time: create_time}
- match: { _shards.total: 1}
- match: { _shards.successful: 1}
- match: { _shards.failed: 0}

- do:
get_all_pits: {}

- match: {pits.0.pitId: $pit_id}
- match: {pits.0.creationTime: $create_time}
- match: {pits.0.keepAlive: 82800000}

- do:
search:
rest_total_hits_as_int: true
Expand Down Expand Up @@ -87,7 +96,7 @@
"pit_id": [$pit_id]

- match: {pits.0.pitId: $pit_id}
- match: {pits.0.succeeded: true }
- match: {pits.0.successful: true }

---
"Delete all":
Expand Down Expand Up @@ -127,7 +136,7 @@
delete_all_pits: {}

- match: {pits.0.pitId: $pit_id}
- match: {pits.0.succeeded: true }
- match: {pits.0.successful: true }

- do:
catch: missing
Expand Down
25 changes: 23 additions & 2 deletions server/src/main/java/org/opensearch/action/ActionModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,20 @@
import org.opensearch.action.ingest.SimulatePipelineTransportAction;
import org.opensearch.action.main.MainAction;
import org.opensearch.action.main.TransportMainAction;
import org.opensearch.action.search.*;
import org.opensearch.action.search.ClearScrollAction;
import org.opensearch.action.search.CreatePitAction;
import org.opensearch.action.search.DeletePitAction;
import org.opensearch.action.search.GetAllPitsAction;
import org.opensearch.action.search.MultiSearchAction;
import org.opensearch.action.search.SearchAction;
import org.opensearch.action.search.SearchScrollAction;
import org.opensearch.action.search.TransportClearScrollAction;
import org.opensearch.action.search.TransportCreatePitAction;
import org.opensearch.action.search.TransportDeletePitAction;
import org.opensearch.action.search.TransportGetAllPitsAction;
import org.opensearch.action.search.TransportMultiSearchAction;
import org.opensearch.action.search.TransportSearchAction;
import org.opensearch.action.search.TransportSearchScrollAction;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.AutoCreateIndex;
import org.opensearch.action.support.DestructiveOperations;
Expand Down Expand Up @@ -387,7 +400,15 @@
import org.opensearch.rest.action.ingest.RestGetPipelineAction;
import org.opensearch.rest.action.ingest.RestPutPipelineAction;
import org.opensearch.rest.action.ingest.RestSimulatePipelineAction;
import org.opensearch.rest.action.search.*;
import org.opensearch.rest.action.search.RestClearScrollAction;
import org.opensearch.rest.action.search.RestCountAction;
import org.opensearch.rest.action.search.RestCreatePitAction;
import org.opensearch.rest.action.search.RestDeletePitAction;
import org.opensearch.rest.action.search.RestExplainAction;
import org.opensearch.rest.action.search.RestGetAllPitsAction;
import org.opensearch.rest.action.search.RestMultiSearchAction;
import org.opensearch.rest.action.search.RestSearchAction;
import org.opensearch.rest.action.search.RestSearchScrollAction;
import org.opensearch.tasks.Task;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.usage.UsageService;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
public class CreatePitAction extends ActionType<CreatePitResponse> {
public static final CreatePitAction INSTANCE = new CreatePitAction();
public static final String NAME = "indices:data/read/point_in_time";
public static final String NAME = "indices:data/read/point_in_time/create";

private CreatePitAction() {
super(NAME, CreatePitResponse::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
public class DeletePitAction extends ActionType<DeletePitResponse> {

public static final DeletePitAction INSTANCE = new DeletePitAction();
public static final String NAME = "indices:admin/read/pit/delete";
public static final String NAME = "indices:data/read/point_in_time/delete";

private DeletePitAction() {
super(NAME, DeletePitResponse::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@

import java.io.IOException;

/**
* Inner node get all pits request
*/
public class GetAllPitNodeRequest extends BaseNodeRequest {
GetAllPitNodesRequest request;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import java.util.List;

/**
* Response which holds information about all PIT contexts in a node
* Inner node get all pits response
*/
public class GetAllPitNodeResponse extends BaseNodeResponse implements ToXContentFragment {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import java.io.IOException;

/**
* Request to get all active PIT IDs in set of nodes
* Request to get all active PIT IDs from all nodes of cluster
*/
public class GetAllPitNodesRequest extends BaseNodesRequest<GetAllPitNodesRequest> {
@Inject
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import java.util.Set;
import java.util.stream.Collectors;

/**
* This class transforms active PIT objects from all nodes to unique PIT objects
*/
public class GetAllPitNodesResponse extends BaseNodesResponse<GetAllPitNodeResponse> implements ToXContentObject {
List<ListPitInfo> pitsInfo = new ArrayList<>();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*/
public class GetAllPitsAction extends ActionType<GetAllPitNodesResponse> {
public static final GetAllPitsAction INSTANCE = new GetAllPitsAction();
public static final String NAME = "indices:data/readall/pit";
public static final String NAME = "indices:data/read/point_in_time/readall";

private GetAllPitsAction() {
super(NAME, GetAllPitNodesResponse::new);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,26 @@
public class ListPitInfo implements ToXContentFragment, Writeable {
private final String pitId;
private final long creationTime;
private final long keepAlive;

public ListPitInfo(String pitId, long creationTime) {
public ListPitInfo(String pitId, long creationTime, long keepAlive) {
this.pitId = pitId;
this.creationTime = creationTime;
this.keepAlive = keepAlive;
}

public ListPitInfo(StreamInput in) throws IOException {
this.pitId = in.readString();
this.creationTime = in.readLong();
this.keepAlive = in.readLong();
}

@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field("pitId", pitId);
builder.field("creationTime", creationTime);
builder.field("keepAlive", keepAlive);
builder.endObject();
return builder;
}
Expand All @@ -54,5 +58,6 @@ public long getCreationTime() {
public void writeTo(StreamOutput out) throws IOException {
out.writeString(pitId);
out.writeLong(creationTime);
out.writeLong(keepAlive);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.List;

/**
* Transport action to get all PIT contexts
* Transport action to get all active PIT contexts across all nodes
*/
public class TransportGetAllPitsAction extends TransportNodesAction<
GetAllPitNodesRequest,
Expand Down Expand Up @@ -73,7 +73,7 @@ protected GetAllPitNodeResponse newNodeResponse(StreamInput in) throws IOExcepti
}

/**
* This node specific operation retrieves all node specific information
* This node specific operation retrieves all active PIT IDs in a node
*/
@Override
protected GetAllPitNodeResponse nodeOperation(GetAllPitNodeRequest request) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,13 @@ public String getName() {
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
final ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.local(false);
clusterStateRequest.masterNodeTimeout(request.paramAsTime("master_timeout", clusterStateRequest.masterNodeTimeout()));
clusterStateRequest.clusterManagerNodeTimeout(
request.paramAsTime("cluster_manager_timeout", clusterStateRequest.clusterManagerNodeTimeout())
);
clusterStateRequest.clear().nodes(true).routingTable(true).indices("*");
return channel -> client.admin().cluster().state(clusterStateRequest, new RestActionListener<ClusterStateResponse>(channel) {
@Override
public void processResponse(final ClusterStateResponse clusterStateResponse) throws IOException {
public void processResponse(final ClusterStateResponse clusterStateResponse) {
final List<DiscoveryNode> nodes = new LinkedList<>();
for (ObjectCursor<DiscoveryNode> cursor : clusterStateResponse.getState().nodes().getDataNodes().values()) {
DiscoveryNode node = cursor.value;
Expand All @@ -64,7 +66,7 @@ public void processResponse(final ClusterStateResponse clusterStateResponse) thr
public RestResponse buildResponse(final GetAllPitNodesResponse getAllPITNodesResponse) throws Exception {
try (XContentBuilder builder = channel.newBuilder()) {
builder.startObject();
builder.field("pitIds", getAllPITNodesResponse.getPITIDs());
builder.field("pits", getAllPITNodesResponse.getPITIDs());
builder.endObject();
return new BytesRestResponse(RestStatus.OK, builder);
}
Expand Down
12 changes: 10 additions & 2 deletions server/src/main/java/org/opensearch/search/SearchService.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,15 @@
import org.opensearch.action.ActionListener;
import org.opensearch.action.ActionRunnable;
import org.opensearch.action.OriginalIndices;
import org.opensearch.action.search.*;
import org.opensearch.action.search.DeletePitInfo;
import org.opensearch.action.search.DeletePitResponse;
import org.opensearch.action.search.ListPitInfo;
import org.opensearch.action.search.PitSearchContextIdForNode;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchShardTask;
import org.opensearch.action.search.SearchType;
import org.opensearch.action.search.UpdatePitContextRequest;
import org.opensearch.action.search.UpdatePitContextResponse;
import org.opensearch.action.support.TransportActions;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -1454,7 +1462,7 @@ public List<ListPitInfo> getAllPITReaderContexts() {
for (ReaderContext ctx : activeReaders.values()) {
if (ctx instanceof PitReaderContext) {
final PitReaderContext context = (PitReaderContext) ctx;
ListPitInfo pitInfo = new ListPitInfo(context.getPitId(), context.getCreationTime());
ListPitInfo pitInfo = new ListPitInfo(context.getPitId(), context.getCreationTime(), context.getKeepAlive());
pitContextsInfo.add(pitInfo);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public Releasable updatePitIdAndKeepAlive(long keepAliveInMillis, String pitId,
}

public long getCreationTime() {
return this.creationTime.get();
return this.creationTime.get() == null ? 0 : this.creationTime.get();
}

public void setCreationTime(final long creationTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@ protected AtomicLong getLastAccessTime() {
return lastAccessTime;
}

public long getKeepAlive() {
return keepAlive.get();
}

@Override
public final void close() {
if (closed.compareAndSet(false, true)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,14 @@

package org.opensearch.action.search;

import com.carrotsearch.hppc.cursors.ObjectCursor;
import org.junit.Assert;
import org.opensearch.Version;
import org.opensearch.action.ActionFuture;
import org.opensearch.action.admin.cluster.state.ClusterStateRequest;
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
import org.opensearch.client.Client;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.util.concurrent.AtomicArray;
import org.opensearch.index.query.IdsQueryBuilder;
import org.opensearch.index.query.MatchAllQueryBuilder;
Expand All @@ -21,8 +28,12 @@
import org.opensearch.search.internal.ShardSearchContextId;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;

import static org.opensearch.test.OpenSearchIntegTestCase.client;
import static org.opensearch.test.OpenSearchTestCase.between;
import static org.opensearch.test.OpenSearchTestCase.randomAlphaOfLength;
import static org.opensearch.test.OpenSearchTestCase.randomBoolean;
Expand Down Expand Up @@ -81,4 +92,41 @@ public static String getPitId() {
}
return SearchContextId.encode(array.asList(), aliasFilters, version);
}

public static void assertUsingGetAllPits(Client client, String id, long creationTime) throws ExecutionException, InterruptedException {
final ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.local(false);
clusterStateRequest.clear().nodes(true).routingTable(true).indices("*");
ClusterStateResponse clusterStateResponse = client.admin().cluster().state(clusterStateRequest).get();
final List<DiscoveryNode> nodes = new LinkedList<>();
for (ObjectCursor<DiscoveryNode> cursor : clusterStateResponse.getState().nodes().getDataNodes().values()) {
DiscoveryNode node = cursor.value;
nodes.add(node);
}
DiscoveryNode[] disNodesArr = new DiscoveryNode[nodes.size()];
nodes.toArray(disNodesArr);
GetAllPitNodesRequest getAllPITNodesRequest = new GetAllPitNodesRequest(disNodesArr);
ActionFuture<GetAllPitNodesResponse> execute1 = client.execute(GetAllPitsAction.INSTANCE, getAllPITNodesRequest);
GetAllPitNodesResponse getPitResponse = execute1.get();
Assert.assertTrue(getPitResponse.getPITIDs().get(0).getPitId().contains(id));
Assert.assertEquals(getPitResponse.getPITIDs().get(0).getCreationTime(), creationTime);
}

public static void assertGetAllPitsEmpty(Client client) throws ExecutionException, InterruptedException {
final ClusterStateRequest clusterStateRequest = new ClusterStateRequest();
clusterStateRequest.local(false);
clusterStateRequest.clear().nodes(true).routingTable(true).indices("*");
ClusterStateResponse clusterStateResponse = client.admin().cluster().state(clusterStateRequest).get();
final List<DiscoveryNode> nodes = new LinkedList<>();
for (ObjectCursor<DiscoveryNode> cursor : clusterStateResponse.getState().nodes().getDataNodes().values()) {
DiscoveryNode node = cursor.value;
nodes.add(node);
}
DiscoveryNode[] disNodesArr = new DiscoveryNode[nodes.size()];
nodes.toArray(disNodesArr);
GetAllPitNodesRequest getAllPITNodesRequest = new GetAllPitNodesRequest(disNodesArr);
ActionFuture<GetAllPitNodesResponse> execute1 = client.execute(GetAllPitsAction.INSTANCE, getAllPITNodesRequest);
GetAllPitNodesResponse getPitResponse = execute1.get();
Assert.assertEquals(0, getPitResponse.getPITIDs().size());
}
}
Loading

0 comments on commit e693b63

Please sign in to comment.