Skip to content

Commit

Permalink
fixing tests and adding util
Browse files Browse the repository at this point in the history
Signed-off-by: Bharathwaj G <[email protected]>
  • Loading branch information
bharath-techie committed May 18, 2022
1 parent dc111f5 commit 248f188
Show file tree
Hide file tree
Showing 10 changed files with 654 additions and 533 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,12 @@
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.MultiGetRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.search.*;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchScrollRequest;
import org.opensearch.action.support.ActiveShardCount;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.WriteRequest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,17 @@
import org.opensearch.action.get.MultiGetResponse;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.index.IndexResponse;
import org.opensearch.action.search.*;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.ClearScrollResponse;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.CreatePitResponse;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.DeletePitResponse;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.MultiSearchResponse;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchScrollRequest;
import org.opensearch.action.support.master.AcknowledgedResponse;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.action.update.UpdateResponse;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,13 @@
import org.opensearch.action.get.GetRequest;
import org.opensearch.action.get.MultiGetRequest;
import org.opensearch.action.index.IndexRequest;
import org.opensearch.action.search.*;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchScrollRequest;
import org.opensearch.action.search.SearchType;
import org.opensearch.action.support.ActiveShardCount;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.WriteRequest;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,17 @@
import org.opensearch.action.fieldcaps.FieldCapabilities;
import org.opensearch.action.fieldcaps.FieldCapabilitiesRequest;
import org.opensearch.action.fieldcaps.FieldCapabilitiesResponse;
import org.opensearch.action.search.*;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.ClearScrollResponse;
import org.opensearch.action.search.CreatePitRequest;
import org.opensearch.action.search.CreatePitResponse;
import org.opensearch.action.search.DeletePitRequest;
import org.opensearch.action.search.DeletePitResponse;
import org.opensearch.action.search.MultiSearchRequest;
import org.opensearch.action.search.MultiSearchResponse;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchScrollRequest;
import org.opensearch.client.core.CountRequest;
import org.opensearch.client.core.CountResponse;
import org.opensearch.common.Strings;
Expand Down Expand Up @@ -95,7 +105,12 @@
import org.junit.Before;

import java.io.IOException;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,6 @@ public void onFailure(Exception e) {
logger.error("Cleaning up PIT contexts failed ", e);
}
};
ClearScrollController.closeContexts(clusterService.state().getNodes(), searchTransportService, contexts, deleteListener);
SearchUtils.deletePits(contexts, deleteListener, clusterService.state(), searchTransportService);
}
}
58 changes: 58 additions & 0 deletions server/src/main/java/org/opensearch/action/search/SearchUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,28 @@

package org.opensearch.action.search;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.StepListener;
import org.opensearch.action.support.GroupedActionListener;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.common.Strings;
import org.opensearch.transport.RemoteClusterService;
import org.opensearch.transport.Transport;

import java.util.Collection;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

/**
* Helper class for common search functions
*/
public class SearchUtils {
private static final Logger logger = LogManager.getLogger(CreatePitController.class);

public SearchUtils() {}

Expand All @@ -40,4 +50,52 @@ public static StepListener<BiFunction<String, String, DiscoveryNode>> getConnect
}
return lookupListener;
}

/**
* Delete list of pits, return success if all reader contexts are deleted ( or not found ).
*/
public static void deletePits(
Collection<SearchContextIdForNode> contexts,
ActionListener<Integer> listener,
ClusterState state,
SearchTransportService searchTransportService
) {
final Set<String> clusters = contexts.stream()
.filter(ctx -> Strings.isEmpty(ctx.getClusterAlias()) == false)
.map(SearchContextIdForNode::getClusterAlias)
.collect(Collectors.toSet());
StepListener<BiFunction<String, String, DiscoveryNode>> lookupListener = getConnectionLookupListener(
searchTransportService.getRemoteClusterService(),
state,
clusters
);
lookupListener.whenComplete(nodeLookup -> {
final GroupedActionListener<Boolean> groupedListener = new GroupedActionListener<>(
ActionListener.delegateFailure(
listener,
(l, result) -> l.onResponse(Math.toIntExact(result.stream().filter(r -> r).count()))
),
contexts.size()
);

for (SearchContextIdForNode contextId : contexts) {
final DiscoveryNode node = nodeLookup.apply(contextId.getClusterAlias(), contextId.getNode());
if (node == null) {
groupedListener.onFailure(new OpenSearchException("node not found"));
} else {
try {
final Transport.Connection connection = searchTransportService.getConnection(contextId.getClusterAlias(), node);
searchTransportService.sendFreePITContext(
connection,
contextId.getSearchContextId(),
ActionListener.wrap(r -> groupedListener.onResponse(r.isFreed()), e -> groupedListener.onResponse(false))
);
} catch (Exception e) {
logger.debug("Delete PIT failed ", e);
groupedListener.onResponse(false);
}
}
}
}, listener::onFailure);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,12 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.opensearch.OpenSearchException;
import org.opensearch.action.ActionListener;
import org.opensearch.action.StepListener;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.GroupedActionListener;
import org.opensearch.action.support.HandledTransportAction;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Strings;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.io.stream.NamedWriteableRegistry;
import org.opensearch.tasks.Task;
Expand All @@ -29,9 +26,6 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Set;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

/**
* Transport action for deleting pit reader context - supports deleting list and all pit contexts
Expand Down Expand Up @@ -98,17 +92,8 @@ void deleteAllPits(ActionListener<DeletePitResponse> listener) {
new ActionListener<>() {
@Override
public void onResponse(final Collection<SearchTransportService.SearchFreeContextResponse> responses) {
// final SetOnce<Boolean> succeeded = new SetOnce<>();
boolean hasFailures = responses.stream().anyMatch(r -> !r.isFreed());
listener.onResponse(new DeletePitResponse(!hasFailures));
// for (SearchTransportService.SearchFreeContextResponse response : responses) {
// if (!response.isFreed()) {
// succeeded.set(false);
// break;
// }
// }
// succeeded.trySet(true);
// listener.onResponse(new DeletePitResponse(succeeded.get()));
}

@Override
Expand All @@ -133,48 +118,6 @@ public void onFailure(final Exception e) {
* Delete list of pits, return success if all reader contexts are deleted ( or not found ).
*/
void deletePits(List<SearchContextIdForNode> contexts, ActionListener<Integer> listener) {
final StepListener<BiFunction<String, String, DiscoveryNode>> lookupListener = getLookupListener(contexts);
lookupListener.whenComplete(nodeLookup -> {
final GroupedActionListener<Boolean> groupedListener = new GroupedActionListener<>(
ActionListener.delegateFailure(
listener,
(l, result) -> l.onResponse(Math.toIntExact(result.stream().filter(r -> r).count()))
),
contexts.size()
);

for (SearchContextIdForNode contextId : contexts) {
final DiscoveryNode node = nodeLookup.apply(contextId.getClusterAlias(), contextId.getNode());
if (node == null) {
groupedListener.onFailure(new OpenSearchException("node not found"));
} else {
try {
final Transport.Connection connection = searchTransportService.getConnection(contextId.getClusterAlias(), node);
searchTransportService.sendFreePITContext(
connection,
contextId.getSearchContextId(),
ActionListener.wrap(r -> groupedListener.onResponse(r.isFreed()), e -> groupedListener.onResponse(false))
);
} catch (Exception e) {
logger.debug("Delete PIT failed ", e);
groupedListener.onResponse(false);
}
}
}
}, listener::onFailure);
}

private StepListener<BiFunction<String, String, DiscoveryNode>> getLookupListener(List<SearchContextIdForNode> contexts) {
final StepListener<BiFunction<String, String, DiscoveryNode>> lookupListener = new StepListener<>();
final Set<String> clusters = contexts.stream()
.filter(ctx -> Strings.isEmpty(ctx.getClusterAlias()) == false)
.map(SearchContextIdForNode::getClusterAlias)
.collect(Collectors.toSet());
if (clusters.isEmpty() == false) {
searchTransportService.getRemoteClusterService().collectNodes(clusters, lookupListener);
} else {
lookupListener.onResponse((cluster, nodeId) -> clusterService.state().getNodes().get(nodeId));
}
return lookupListener;
SearchUtils.deletePits(contexts, listener, clusterService.state(), searchTransportService);
}
}
Loading

0 comments on commit 248f188

Please sign in to comment.