diff --git a/client/rest-high-level/src/main/java/org/opensearch/client/RequestConverters.java b/client/rest-high-level/src/main/java/org/opensearch/client/RequestConverters.java index e8309f40b0ec7..b872ee21f9927 100644 --- a/client/rest-high-level/src/main/java/org/opensearch/client/RequestConverters.java +++ b/client/rest-high-level/src/main/java/org/opensearch/client/RequestConverters.java @@ -53,7 +53,12 @@ import org.opensearch.action.get.GetRequest; import org.opensearch.action.get.MultiGetRequest; import org.opensearch.action.index.IndexRequest; -import org.opensearch.action.search.*; +import org.opensearch.action.search.ClearScrollRequest; +import org.opensearch.action.search.CreatePitRequest; +import org.opensearch.action.search.DeletePitRequest; +import org.opensearch.action.search.MultiSearchRequest; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchScrollRequest; import org.opensearch.action.support.ActiveShardCount; import org.opensearch.action.support.IndicesOptions; import org.opensearch.action.support.WriteRequest; diff --git a/client/rest-high-level/src/main/java/org/opensearch/client/RestHighLevelClient.java b/client/rest-high-level/src/main/java/org/opensearch/client/RestHighLevelClient.java index 5d2b1e95e2ee0..4cb3ff999f793 100644 --- a/client/rest-high-level/src/main/java/org/opensearch/client/RestHighLevelClient.java +++ b/client/rest-high-level/src/main/java/org/opensearch/client/RestHighLevelClient.java @@ -57,7 +57,17 @@ import org.opensearch.action.get.MultiGetResponse; import org.opensearch.action.index.IndexRequest; import org.opensearch.action.index.IndexResponse; -import org.opensearch.action.search.*; +import org.opensearch.action.search.ClearScrollRequest; +import org.opensearch.action.search.ClearScrollResponse; +import org.opensearch.action.search.CreatePitRequest; +import org.opensearch.action.search.CreatePitResponse; +import org.opensearch.action.search.DeletePitRequest; +import org.opensearch.action.search.DeletePitResponse; +import org.opensearch.action.search.MultiSearchRequest; +import org.opensearch.action.search.MultiSearchResponse; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.search.SearchScrollRequest; import org.opensearch.action.support.master.AcknowledgedResponse; import org.opensearch.action.update.UpdateRequest; import org.opensearch.action.update.UpdateResponse; diff --git a/client/rest-high-level/src/test/java/org/opensearch/client/RequestConvertersTests.java b/client/rest-high-level/src/test/java/org/opensearch/client/RequestConvertersTests.java index e8dbf9b5ea403..087d25fbd757d 100644 --- a/client/rest-high-level/src/test/java/org/opensearch/client/RequestConvertersTests.java +++ b/client/rest-high-level/src/test/java/org/opensearch/client/RequestConvertersTests.java @@ -52,7 +52,13 @@ import org.opensearch.action.get.GetRequest; import org.opensearch.action.get.MultiGetRequest; import org.opensearch.action.index.IndexRequest; -import org.opensearch.action.search.*; +import org.opensearch.action.search.ClearScrollRequest; +import org.opensearch.action.search.CreatePitRequest; +import org.opensearch.action.search.DeletePitRequest; +import org.opensearch.action.search.MultiSearchRequest; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchScrollRequest; +import org.opensearch.action.search.SearchType; import org.opensearch.action.support.ActiveShardCount; import org.opensearch.action.support.IndicesOptions; import org.opensearch.action.support.WriteRequest; diff --git a/client/rest-high-level/src/test/java/org/opensearch/client/SearchIT.java b/client/rest-high-level/src/test/java/org/opensearch/client/SearchIT.java index fc7bfea247a3b..c0fd95e4c4e52 100644 --- a/client/rest-high-level/src/test/java/org/opensearch/client/SearchIT.java +++ b/client/rest-high-level/src/test/java/org/opensearch/client/SearchIT.java @@ -41,7 +41,17 @@ import org.opensearch.action.fieldcaps.FieldCapabilities; import org.opensearch.action.fieldcaps.FieldCapabilitiesRequest; import org.opensearch.action.fieldcaps.FieldCapabilitiesResponse; -import org.opensearch.action.search.*; +import org.opensearch.action.search.ClearScrollRequest; +import org.opensearch.action.search.ClearScrollResponse; +import org.opensearch.action.search.CreatePitRequest; +import org.opensearch.action.search.CreatePitResponse; +import org.opensearch.action.search.DeletePitRequest; +import org.opensearch.action.search.DeletePitResponse; +import org.opensearch.action.search.MultiSearchRequest; +import org.opensearch.action.search.MultiSearchResponse; +import org.opensearch.action.search.SearchRequest; +import org.opensearch.action.search.SearchResponse; +import org.opensearch.action.search.SearchScrollRequest; import org.opensearch.client.core.CountRequest; import org.opensearch.client.core.CountResponse; import org.opensearch.common.Strings; @@ -95,7 +105,12 @@ import org.junit.Before; import java.io.IOException; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import static org.opensearch.common.xcontent.XContentFactory.jsonBuilder; diff --git a/server/src/main/java/org/opensearch/action/search/CreatePitController.java b/server/src/main/java/org/opensearch/action/search/CreatePitController.java index 3d2ecc8b695c6..3aa7b60f18fc3 100644 --- a/server/src/main/java/org/opensearch/action/search/CreatePitController.java +++ b/server/src/main/java/org/opensearch/action/search/CreatePitController.java @@ -268,6 +268,6 @@ public void onFailure(Exception e) { logger.error("Cleaning up PIT contexts failed ", e); } }; - ClearScrollController.closeContexts(clusterService.state().getNodes(), searchTransportService, contexts, deleteListener); + SearchUtils.deletePits(contexts, deleteListener, clusterService.state(), searchTransportService); } } diff --git a/server/src/main/java/org/opensearch/action/search/SearchUtils.java b/server/src/main/java/org/opensearch/action/search/SearchUtils.java index 148d1645568b1..b9eea817c75b9 100644 --- a/server/src/main/java/org/opensearch/action/search/SearchUtils.java +++ b/server/src/main/java/org/opensearch/action/search/SearchUtils.java @@ -8,18 +8,28 @@ package org.opensearch.action.search; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.opensearch.OpenSearchException; +import org.opensearch.action.ActionListener; import org.opensearch.action.StepListener; +import org.opensearch.action.support.GroupedActionListener; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.node.DiscoveryNode; +import org.opensearch.common.Strings; import org.opensearch.transport.RemoteClusterService; +import org.opensearch.transport.Transport; +import java.util.Collection; import java.util.Set; import java.util.function.BiFunction; +import java.util.stream.Collectors; /** * Helper class for common search functions */ public class SearchUtils { + private static final Logger logger = LogManager.getLogger(CreatePitController.class); public SearchUtils() {} @@ -40,4 +50,52 @@ public static StepListener> getConnect } return lookupListener; } + + /** + * Delete list of pits, return success if all reader contexts are deleted ( or not found ). + */ + public static void deletePits( + Collection contexts, + ActionListener listener, + ClusterState state, + SearchTransportService searchTransportService + ) { + final Set clusters = contexts.stream() + .filter(ctx -> Strings.isEmpty(ctx.getClusterAlias()) == false) + .map(SearchContextIdForNode::getClusterAlias) + .collect(Collectors.toSet()); + StepListener> lookupListener = getConnectionLookupListener( + searchTransportService.getRemoteClusterService(), + state, + clusters + ); + lookupListener.whenComplete(nodeLookup -> { + final GroupedActionListener groupedListener = new GroupedActionListener<>( + ActionListener.delegateFailure( + listener, + (l, result) -> l.onResponse(Math.toIntExact(result.stream().filter(r -> r).count())) + ), + contexts.size() + ); + + for (SearchContextIdForNode contextId : contexts) { + final DiscoveryNode node = nodeLookup.apply(contextId.getClusterAlias(), contextId.getNode()); + if (node == null) { + groupedListener.onFailure(new OpenSearchException("node not found")); + } else { + try { + final Transport.Connection connection = searchTransportService.getConnection(contextId.getClusterAlias(), node); + searchTransportService.sendFreePITContext( + connection, + contextId.getSearchContextId(), + ActionListener.wrap(r -> groupedListener.onResponse(r.isFreed()), e -> groupedListener.onResponse(false)) + ); + } catch (Exception e) { + logger.debug("Delete PIT failed ", e); + groupedListener.onResponse(false); + } + } + } + }, listener::onFailure); + } } diff --git a/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java b/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java index c62d26f09b26b..234fceaa92bb4 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportDeletePitAction.java @@ -11,15 +11,12 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; -import org.opensearch.OpenSearchException; import org.opensearch.action.ActionListener; -import org.opensearch.action.StepListener; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.GroupedActionListener; import org.opensearch.action.support.HandledTransportAction; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.Strings; import org.opensearch.common.inject.Inject; import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.tasks.Task; @@ -29,9 +26,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.Set; -import java.util.function.BiFunction; -import java.util.stream.Collectors; /** * Transport action for deleting pit reader context - supports deleting list and all pit contexts @@ -98,17 +92,8 @@ void deleteAllPits(ActionListener listener) { new ActionListener<>() { @Override public void onResponse(final Collection responses) { - // final SetOnce succeeded = new SetOnce<>(); boolean hasFailures = responses.stream().anyMatch(r -> !r.isFreed()); listener.onResponse(new DeletePitResponse(!hasFailures)); - // for (SearchTransportService.SearchFreeContextResponse response : responses) { - // if (!response.isFreed()) { - // succeeded.set(false); - // break; - // } - // } - // succeeded.trySet(true); - // listener.onResponse(new DeletePitResponse(succeeded.get())); } @Override @@ -133,48 +118,6 @@ public void onFailure(final Exception e) { * Delete list of pits, return success if all reader contexts are deleted ( or not found ). */ void deletePits(List contexts, ActionListener listener) { - final StepListener> lookupListener = getLookupListener(contexts); - lookupListener.whenComplete(nodeLookup -> { - final GroupedActionListener groupedListener = new GroupedActionListener<>( - ActionListener.delegateFailure( - listener, - (l, result) -> l.onResponse(Math.toIntExact(result.stream().filter(r -> r).count())) - ), - contexts.size() - ); - - for (SearchContextIdForNode contextId : contexts) { - final DiscoveryNode node = nodeLookup.apply(contextId.getClusterAlias(), contextId.getNode()); - if (node == null) { - groupedListener.onFailure(new OpenSearchException("node not found")); - } else { - try { - final Transport.Connection connection = searchTransportService.getConnection(contextId.getClusterAlias(), node); - searchTransportService.sendFreePITContext( - connection, - contextId.getSearchContextId(), - ActionListener.wrap(r -> groupedListener.onResponse(r.isFreed()), e -> groupedListener.onResponse(false)) - ); - } catch (Exception e) { - logger.debug("Delete PIT failed ", e); - groupedListener.onResponse(false); - } - } - } - }, listener::onFailure); - } - - private StepListener> getLookupListener(List contexts) { - final StepListener> lookupListener = new StepListener<>(); - final Set clusters = contexts.stream() - .filter(ctx -> Strings.isEmpty(ctx.getClusterAlias()) == false) - .map(SearchContextIdForNode::getClusterAlias) - .collect(Collectors.toSet()); - if (clusters.isEmpty() == false) { - searchTransportService.getRemoteClusterService().collectNodes(clusters, lookupListener); - } else { - lookupListener.onResponse((cluster, nodeId) -> clusterService.state().getNodes().get(nodeId)); - } - return lookupListener; + SearchUtils.deletePits(contexts, listener, clusterService.state(), searchTransportService); } } diff --git a/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java b/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java index f7078762dee5b..b608c92c95f40 100644 --- a/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java +++ b/server/src/test/java/org/opensearch/action/search/CreatePitControllerTests.java @@ -22,18 +22,13 @@ import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.common.settings.Settings; import org.opensearch.common.unit.TimeValue; -import org.opensearch.common.util.concurrent.AtomicArray; import org.opensearch.index.query.IdsQueryBuilder; import org.opensearch.index.query.MatchAllQueryBuilder; import org.opensearch.index.query.QueryBuilder; import org.opensearch.index.query.TermQueryBuilder; -import org.opensearch.index.shard.ShardId; import org.opensearch.search.SearchHit; import org.opensearch.search.SearchHits; -import org.opensearch.search.SearchPhaseResult; -import org.opensearch.search.SearchShardTarget; import org.opensearch.search.aggregations.InternalAggregations; -import org.opensearch.search.internal.AliasFilter; import org.opensearch.search.internal.InternalSearchResponse; import org.opensearch.search.internal.ShardSearchContextId; import org.opensearch.tasks.Task; @@ -44,17 +39,17 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.RemoteClusterConnectionTests; import org.opensearch.transport.Transport; + import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; -import java.util.Map; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.opensearch.action.search.PitTestsUtil.getPitId; /** * Functional tests for various methods in create pit controller. Covers update pit phase specifically since @@ -213,6 +208,20 @@ public void sendFreeContext( t.start(); } + /** + * Test if cleanup request is called + */ + @Override + public void sendFreePITContext( + Transport.Connection connection, + ShardSearchContextId contextId, + ActionListener listener + ) { + deleteNodesInvoked.add(connection.getNode()); + Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(true))); + t.start(); + } + @Override public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { return new SearchAsyncActionTests.MockConnection(node); @@ -297,8 +306,11 @@ public Transport.Connection getConnection(String clusterAlias, DiscoveryNode nod return new SearchAsyncActionTests.MockConnection(node); } + /** + * Test if cleanup request is called + */ @Override - public void sendFreeContext( + public void sendFreePITContext( Transport.Connection connection, ShardSearchContextId contextId, ActionListener listener @@ -394,8 +406,11 @@ public void updatePitContext( } } + /** + * Test if cleanup request is called + */ @Override - public void sendFreeContext( + public void sendFreePITContext( Transport.Connection connection, ShardSearchContextId contextId, ActionListener listener @@ -484,8 +499,11 @@ public void updatePitContext( t.start(); } + /** + * Test if cleanup request is called + */ @Override - public void sendFreeContext( + public void sendFreePITContext( Transport.Connection connection, ShardSearchContextId contextId, ActionListener listener @@ -539,54 +557,4 @@ public void onFailure(Exception e) { } } - - public static QueryBuilder randomQueryBuilder() { - if (randomBoolean()) { - return new TermQueryBuilder(randomAlphaOfLength(10), randomAlphaOfLength(10)); - } else if (randomBoolean()) { - return new MatchAllQueryBuilder(); - } else { - return new IdsQueryBuilder().addIds(randomAlphaOfLength(10)); - } - } - - public static String getPitId() { - AtomicArray array = new AtomicArray<>(3); - SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult1 = new SearchAsyncActionTests.TestSearchPhaseResult( - new ShardSearchContextId("a", 1), - null - ); - testSearchPhaseResult1.setSearchShardTarget(new SearchShardTarget("node_1", new ShardId("idx", "uuid1", 2), null, null)); - SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult2 = new SearchAsyncActionTests.TestSearchPhaseResult( - new ShardSearchContextId("b", 12), - null - ); - testSearchPhaseResult2.setSearchShardTarget(new SearchShardTarget("node_2", new ShardId("idy", "uuid2", 42), null, null)); - SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult3 = new SearchAsyncActionTests.TestSearchPhaseResult( - new ShardSearchContextId("c", 42), - null - ); - testSearchPhaseResult3.setSearchShardTarget(new SearchShardTarget("node_3", new ShardId("idy", "uuid2", 43), null, null)); - array.setOnce(0, testSearchPhaseResult1); - array.setOnce(1, testSearchPhaseResult2); - array.setOnce(2, testSearchPhaseResult3); - - final Version version = Version.CURRENT; - final Map aliasFilters = new HashMap<>(); - for (SearchPhaseResult result : array.asList()) { - final AliasFilter aliasFilter; - if (randomBoolean()) { - aliasFilter = new AliasFilter(randomQueryBuilder()); - } else if (randomBoolean()) { - aliasFilter = new AliasFilter(randomQueryBuilder(), "alias-" + between(1, 10)); - } else { - aliasFilter = AliasFilter.EMPTY; - } - if (randomBoolean()) { - aliasFilters.put(result.getSearchShardTarget().getShardId().getIndex().getUUID(), aliasFilter); - } - } - return SearchContextId.encode(array.asList(), aliasFilters, version); - } - } diff --git a/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java b/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java new file mode 100644 index 0000000000000..ec83cb45697d9 --- /dev/null +++ b/server/src/test/java/org/opensearch/action/search/PitTestsUtil.java @@ -0,0 +1,84 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.search; + +import org.opensearch.Version; +import org.opensearch.common.util.concurrent.AtomicArray; +import org.opensearch.index.query.IdsQueryBuilder; +import org.opensearch.index.query.MatchAllQueryBuilder; +import org.opensearch.index.query.QueryBuilder; +import org.opensearch.index.query.TermQueryBuilder; +import org.opensearch.index.shard.ShardId; +import org.opensearch.search.SearchPhaseResult; +import org.opensearch.search.SearchShardTarget; +import org.opensearch.search.internal.AliasFilter; +import org.opensearch.search.internal.ShardSearchContextId; + +import java.util.HashMap; +import java.util.Map; + +import static org.opensearch.test.OpenSearchTestCase.between; +import static org.opensearch.test.OpenSearchTestCase.randomAlphaOfLength; +import static org.opensearch.test.OpenSearchTestCase.randomBoolean; + +/** + * Helper class for common pit tests functions + */ +public class PitTestsUtil { + private PitTestsUtil() {} + + public static QueryBuilder randomQueryBuilder() { + if (randomBoolean()) { + return new TermQueryBuilder(randomAlphaOfLength(10), randomAlphaOfLength(10)); + } else if (randomBoolean()) { + return new MatchAllQueryBuilder(); + } else { + return new IdsQueryBuilder().addIds(randomAlphaOfLength(10)); + } + } + + public static String getPitId() { + AtomicArray array = new AtomicArray<>(3); + SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult1 = new SearchAsyncActionTests.TestSearchPhaseResult( + new ShardSearchContextId("a", 1), + null + ); + testSearchPhaseResult1.setSearchShardTarget(new SearchShardTarget("node_1", new ShardId("idx", "uuid1", 2), null, null)); + SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult2 = new SearchAsyncActionTests.TestSearchPhaseResult( + new ShardSearchContextId("b", 12), + null + ); + testSearchPhaseResult2.setSearchShardTarget(new SearchShardTarget("node_2", new ShardId("idy", "uuid2", 42), null, null)); + SearchAsyncActionTests.TestSearchPhaseResult testSearchPhaseResult3 = new SearchAsyncActionTests.TestSearchPhaseResult( + new ShardSearchContextId("c", 42), + null + ); + testSearchPhaseResult3.setSearchShardTarget(new SearchShardTarget("node_3", new ShardId("idy", "uuid2", 43), null, null)); + array.setOnce(0, testSearchPhaseResult1); + array.setOnce(1, testSearchPhaseResult2); + array.setOnce(2, testSearchPhaseResult3); + + final Version version = Version.CURRENT; + final Map aliasFilters = new HashMap<>(); + for (SearchPhaseResult result : array.asList()) { + final AliasFilter aliasFilter; + if (randomBoolean()) { + aliasFilter = new AliasFilter(randomQueryBuilder()); + } else if (randomBoolean()) { + aliasFilter = new AliasFilter(randomQueryBuilder(), "alias-" + between(1, 10)); + } else { + aliasFilter = AliasFilter.EMPTY; + } + if (randomBoolean()) { + aliasFilters.put(result.getSearchShardTarget().getShardId().getIndex().getUUID(), aliasFilter); + } + } + return SearchContextId.encode(array.asList(), aliasFilters, version); + } +} diff --git a/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java b/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java index 66a0b771bbe44..86c5b06e1b0fd 100644 --- a/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java +++ b/server/src/test/java/org/opensearch/action/search/TransportDeletePitActionTests.java @@ -20,7 +20,6 @@ import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.service.ClusterService; -import org.opensearch.common.UUIDs; import org.opensearch.common.io.stream.NamedWriteableRegistry; import org.opensearch.common.settings.Settings; import org.opensearch.index.query.IdsQueryBuilder; @@ -34,28 +33,30 @@ import org.opensearch.search.internal.ShardSearchContextId; import org.opensearch.tasks.Task; import org.opensearch.tasks.TaskId; -import org.opensearch.tasks.TaskManager; import org.opensearch.test.OpenSearchTestCase; +import org.opensearch.test.transport.MockTransportService; import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.RemoteClusterConnectionTests; import org.opensearch.transport.Transport; import org.opensearch.transport.TransportResponse; -import org.opensearch.transport.TransportService; import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; +import static org.opensearch.action.search.PitTestsUtil.getPitId; import static org.opensearch.action.support.PlainActionFuture.newFuture; +import static org.opensearch.transport.RemoteClusterConnectionTests.startTransport; /** * Functional tests for transport delete pit action */ public class TransportDeletePitActionTests extends OpenSearchTestCase { - DiscoveryNode node1 = null; DiscoveryNode node2 = null; DiscoveryNode node3 = null; @@ -65,13 +66,34 @@ public class TransportDeletePitActionTests extends OpenSearchTestCase { DiscoveryNodes nodes = null; NamedWriteableRegistry namedWriteableRegistry = null; ClusterService clusterServiceMock = null; + Settings settings = Settings.builder().put("node.name", TransportMultiSearchActionTests.class.getSimpleName()).build(); + private ThreadPool threadPool = new ThreadPool(settings); + + @Override + public void tearDown() throws Exception { + super.tearDown(); + ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS); + } + + private MockTransportService startTransport(String id, List knownNodes, Version version) { + return startTransport(id, knownNodes, version, Settings.EMPTY); + } + + private MockTransportService startTransport( + final String id, + final List knownNodes, + final Version version, + final Settings settings + ) { + return RemoteClusterConnectionTests.startTransport(id, knownNodes, version, threadPool, settings); + } @Before public void setupData() { node1 = new DiscoveryNode("node_1", buildNewFakeTransportAddress(), Version.CURRENT); node2 = new DiscoveryNode("node_2", buildNewFakeTransportAddress(), Version.CURRENT); node3 = new DiscoveryNode("node_3", buildNewFakeTransportAddress(), Version.CURRENT); - pitId = CreatePitControllerTests.getPitId(); + pitId = getPitId(); namedWriteableRegistry = new NamedWriteableRegistry( Arrays.asList( new NamedWriteableRegistry.Entry(QueryBuilder.class, TermQueryBuilder.NAME, TermQueryBuilder::new), @@ -116,469 +138,479 @@ public void setupData() { */ public void testDeletePitSuccess() throws InterruptedException, ExecutionException { List deleteNodesInvoked = new CopyOnWriteArrayList<>(); - Settings settings = Settings.builder().put("node.name", TransportMultiSearchActionTests.class.getSimpleName()).build(); ActionFilters actionFilters = mock(ActionFilters.class); when(actionFilters.filters()).thenReturn(new ActionFilter[0]); - ThreadPool threadPool = new ThreadPool(settings); - try { - SearchTransportService searchTransportService = new SearchTransportService(null, null) { - - @Override - public void sendFreePITContext( - Transport.Connection connection, - ShardSearchContextId contextId, - ActionListener listener - ) { - deleteNodesInvoked.add(connection.getNode()); - Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(true))); - t.start(); - } - - @Override - public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { - return new SearchAsyncActionTests.MockConnection(node); - } - }; - - TransportService transportService = new TransportService( - Settings.EMPTY, - mock(Transport.class), - threadPool, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, - boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), - null, - Collections.emptySet() + List knownNodes = new CopyOnWriteArrayList<>(); + try ( + MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) + ) { + knownNodes.add(cluster1Transport.getLocalDiscoNode()); + knownNodes.add(cluster2Transport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + + try ( + MockTransportService transportService = MockTransportService.createNewService( + Settings.EMPTY, + Version.CURRENT, + threadPool, + null + ) ) { - @Override - public TaskManager getTaskManager() { - return taskManager; - } - }; - TransportDeletePitAction action = new TransportDeletePitAction( - transportService, - actionFilters, - namedWriteableRegistry, - transportSearchAction, - clusterServiceMock, - searchTransportService - ); - DeletePitRequest deletePITRequest = new DeletePitRequest(pitId); - PlainActionFuture future = newFuture(); - action.execute(task, deletePITRequest, future); - DeletePitResponse dr = future.get(); - assertEquals(true, dr.isSucceeded()); - assertEquals(3, deleteNodesInvoked.size()); - } finally { - assertTrue(OpenSearchTestCase.terminate(threadPool)); + transportService.start(); + transportService.acceptIncomingRequests(); + SearchTransportService searchTransportService = new SearchTransportService(transportService, null) { + + @Override + public void sendFreePITContext( + Transport.Connection connection, + ShardSearchContextId contextId, + ActionListener listener + ) { + deleteNodesInvoked.add(connection.getNode()); + Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(true))); + t.start(); + } + + @Override + public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } + }; + TransportDeletePitAction action = new TransportDeletePitAction( + transportService, + actionFilters, + namedWriteableRegistry, + transportSearchAction, + clusterServiceMock, + searchTransportService + ); + DeletePitRequest deletePITRequest = new DeletePitRequest(pitId); + PlainActionFuture future = newFuture(); + action.execute(task, deletePITRequest, future); + DeletePitResponse dr = future.get(); + assertEquals(true, dr.isSucceeded()); + assertEquals(3, deleteNodesInvoked.size()); + + } } } public void testDeleteAllPITSuccess() throws InterruptedException, ExecutionException { List deleteNodesInvoked = new CopyOnWriteArrayList<>(); - Settings settings = Settings.builder().put("node.name", TransportMultiSearchActionTests.class.getSimpleName()).build(); ActionFilters actionFilters = mock(ActionFilters.class); when(actionFilters.filters()).thenReturn(new ActionFilter[0]); - ThreadPool threadPool = new ThreadPool(settings); - try { - SearchTransportService searchTransportService = new SearchTransportService(null, null) { - public void sendFreeAllPitContexts(Transport.Connection connection, final ActionListener listener) { - deleteNodesInvoked.add(connection.getNode()); - Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(true))); - t.start(); - } - - @Override - public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { - return new SearchAsyncActionTests.MockConnection(node); - } - }; - - TransportService transportService = new TransportService( - Settings.EMPTY, - mock(Transport.class), - threadPool, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, - boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), - null, - Collections.emptySet() + List knownNodes = new CopyOnWriteArrayList<>(); + try ( + MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) + ) { + knownNodes.add(cluster1Transport.getLocalDiscoNode()); + knownNodes.add(cluster2Transport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + + try ( + MockTransportService transportService = MockTransportService.createNewService( + Settings.EMPTY, + Version.CURRENT, + threadPool, + null + ) ) { - @Override - public TaskManager getTaskManager() { - return taskManager; - } - }; - TransportDeletePitAction action = new TransportDeletePitAction( - transportService, - actionFilters, - namedWriteableRegistry, - transportSearchAction, - clusterServiceMock, - searchTransportService - ); - DeletePitRequest deletePITRequest = new DeletePitRequest("_all"); - PlainActionFuture future = newFuture(); - action.execute(task, deletePITRequest, future); - DeletePitResponse dr = future.get(); - assertEquals(true, dr.isSucceeded()); - assertEquals(3, deleteNodesInvoked.size()); - } finally { - assertTrue(OpenSearchTestCase.terminate(threadPool)); + transportService.start(); + transportService.acceptIncomingRequests(); + SearchTransportService searchTransportService = new SearchTransportService(transportService, null) { + public void sendFreeAllPitContexts(Transport.Connection connection, final ActionListener listener) { + deleteNodesInvoked.add(connection.getNode()); + Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(true))); + t.start(); + } + + @Override + public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } + }; + TransportDeletePitAction action = new TransportDeletePitAction( + transportService, + actionFilters, + namedWriteableRegistry, + transportSearchAction, + clusterServiceMock, + searchTransportService + ); + DeletePitRequest deletePITRequest = new DeletePitRequest("_all"); + PlainActionFuture future = newFuture(); + action.execute(task, deletePITRequest, future); + DeletePitResponse dr = future.get(); + assertEquals(true, dr.isSucceeded()); + assertEquals(3, deleteNodesInvoked.size()); + + } } } public void testDeletePitWhenNodeIsDown() throws InterruptedException, ExecutionException { List deleteNodesInvoked = new CopyOnWriteArrayList<>(); - Settings settings = Settings.builder().put("node.name", TransportMultiSearchActionTests.class.getSimpleName()).build(); ActionFilters actionFilters = mock(ActionFilters.class); when(actionFilters.filters()).thenReturn(new ActionFilter[0]); - ThreadPool threadPool = new ThreadPool(settings); - try { - SearchTransportService searchTransportService = new SearchTransportService(null, null) { - - @Override - public void sendFreePITContext( - Transport.Connection connection, - ShardSearchContextId contextId, - ActionListener listener - ) { - deleteNodesInvoked.add(connection.getNode()); - - if (connection.getNode().getId() == "node_3") { - Thread t = new Thread(() -> listener.onFailure(new Exception("node 3 down"))); - t.start(); - } else { - Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(true))); - t.start(); - } - } - - @Override - public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { - return new SearchAsyncActionTests.MockConnection(node); - } - }; - TransportService transportService = new TransportService( - Settings.EMPTY, - mock(Transport.class), - threadPool, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, - boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), - null, - Collections.emptySet() + List knownNodes = new CopyOnWriteArrayList<>(); + try ( + MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) + ) { + knownNodes.add(cluster1Transport.getLocalDiscoNode()); + knownNodes.add(cluster2Transport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + + try ( + MockTransportService transportService = MockTransportService.createNewService( + Settings.EMPTY, + Version.CURRENT, + threadPool, + null + ) ) { - @Override - public TaskManager getTaskManager() { - return taskManager; - } - }; - TransportDeletePitAction action = new TransportDeletePitAction( - transportService, - actionFilters, - namedWriteableRegistry, - transportSearchAction, - clusterServiceMock, - searchTransportService - ); - DeletePitRequest deletePITRequest = new DeletePitRequest(pitId); - PlainActionFuture future = newFuture(); - action.execute(task, deletePITRequest, future); - DeletePitResponse dr = future.get(); - assertEquals(false, dr.isSucceeded()); - assertEquals(3, deleteNodesInvoked.size()); - } finally { - assertTrue(OpenSearchTestCase.terminate(threadPool)); + transportService.start(); + transportService.acceptIncomingRequests(); + SearchTransportService searchTransportService = new SearchTransportService(transportService, null) { + + @Override + public void sendFreePITContext( + Transport.Connection connection, + ShardSearchContextId contextId, + ActionListener listener + ) { + deleteNodesInvoked.add(connection.getNode()); + + if (connection.getNode().getId() == "node_3") { + Thread t = new Thread(() -> listener.onFailure(new Exception("node 3 down"))); + t.start(); + } else { + Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(true))); + t.start(); + } + } + + @Override + public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } + }; + TransportDeletePitAction action = new TransportDeletePitAction( + transportService, + actionFilters, + namedWriteableRegistry, + transportSearchAction, + clusterServiceMock, + searchTransportService + ); + DeletePitRequest deletePITRequest = new DeletePitRequest(pitId); + PlainActionFuture future = newFuture(); + action.execute(task, deletePITRequest, future); + DeletePitResponse dr = future.get(); + assertEquals(false, dr.isSucceeded()); + assertEquals(3, deleteNodesInvoked.size()); + } } } public void testDeletePitWhenAllNodesAreDown() throws InterruptedException, ExecutionException { List deleteNodesInvoked = new CopyOnWriteArrayList<>(); - Settings settings = Settings.builder().put("node.name", TransportMultiSearchActionTests.class.getSimpleName()).build(); ActionFilters actionFilters = mock(ActionFilters.class); when(actionFilters.filters()).thenReturn(new ActionFilter[0]); - ThreadPool threadPool = new ThreadPool(settings); - try { - SearchTransportService searchTransportService = new SearchTransportService(null, null) { - - @Override - public void sendFreePITContext( - Transport.Connection connection, - ShardSearchContextId contextId, - ActionListener listener - ) { - deleteNodesInvoked.add(connection.getNode()); - Thread t = new Thread(() -> listener.onFailure(new Exception("node 3 down"))); - t.start(); - } - - @Override - public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { - return new SearchAsyncActionTests.MockConnection(node); - } - }; - TransportService transportService = new TransportService( - Settings.EMPTY, - mock(Transport.class), - threadPool, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, - boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), - null, - Collections.emptySet() + List knownNodes = new CopyOnWriteArrayList<>(); + try ( + MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) + ) { + knownNodes.add(cluster1Transport.getLocalDiscoNode()); + knownNodes.add(cluster2Transport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + try ( + MockTransportService transportService = MockTransportService.createNewService( + Settings.EMPTY, + Version.CURRENT, + threadPool, + null + ) ) { - @Override - public TaskManager getTaskManager() { - return taskManager; - } - }; - TransportDeletePitAction action = new TransportDeletePitAction( - transportService, - actionFilters, - namedWriteableRegistry, - transportSearchAction, - clusterServiceMock, - searchTransportService - ); - DeletePitRequest deletePITRequest = new DeletePitRequest(pitId); - PlainActionFuture future = newFuture(); - action.execute(task, deletePITRequest, future); - DeletePitResponse dr = future.get(); - assertEquals(false, dr.isSucceeded()); - assertEquals(3, deleteNodesInvoked.size()); - } finally { - assertTrue(OpenSearchTestCase.terminate(threadPool)); + transportService.start(); + transportService.acceptIncomingRequests(); + SearchTransportService searchTransportService = new SearchTransportService(transportService, null) { + @Override + public void sendFreePITContext( + Transport.Connection connection, + ShardSearchContextId contextId, + ActionListener listener + ) { + deleteNodesInvoked.add(connection.getNode()); + Thread t = new Thread(() -> listener.onFailure(new Exception("node 3 down"))); + t.start(); + } + + @Override + public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } + }; + TransportDeletePitAction action = new TransportDeletePitAction( + transportService, + actionFilters, + namedWriteableRegistry, + transportSearchAction, + clusterServiceMock, + searchTransportService + ); + DeletePitRequest deletePITRequest = new DeletePitRequest(pitId); + PlainActionFuture future = newFuture(); + action.execute(task, deletePITRequest, future); + DeletePitResponse dr = future.get(); + assertEquals(false, dr.isSucceeded()); + assertEquals(3, deleteNodesInvoked.size()); + } } } public void testDeletePitFailure() throws InterruptedException, ExecutionException { List deleteNodesInvoked = new CopyOnWriteArrayList<>(); - Settings settings = Settings.builder().put("node.name", TransportMultiSearchActionTests.class.getSimpleName()).build(); ActionFilters actionFilters = mock(ActionFilters.class); when(actionFilters.filters()).thenReturn(new ActionFilter[0]); - ThreadPool threadPool = new ThreadPool(settings); - try { - SearchTransportService searchTransportService = new SearchTransportService(null, null) { - - @Override - public void sendFreePITContext( - Transport.Connection connection, - ShardSearchContextId contextId, - ActionListener listener - ) { - deleteNodesInvoked.add(connection.getNode()); - - if (connection.getNode().getId() == "node_3") { - Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(false))); - t.start(); - } else { - Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(true))); - t.start(); - } - } - - @Override - public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { - return new SearchAsyncActionTests.MockConnection(node); - } - }; - TransportService transportService = new TransportService( - Settings.EMPTY, - mock(Transport.class), - threadPool, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, - boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), - null, - Collections.emptySet() + + List knownNodes = new CopyOnWriteArrayList<>(); + try ( + MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) + ) { + knownNodes.add(cluster1Transport.getLocalDiscoNode()); + knownNodes.add(cluster2Transport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + + try ( + MockTransportService transportService = MockTransportService.createNewService( + Settings.EMPTY, + Version.CURRENT, + threadPool, + null + ) ) { - @Override - public TaskManager getTaskManager() { - return taskManager; - } - }; - TransportDeletePitAction action = new TransportDeletePitAction( - transportService, - actionFilters, - namedWriteableRegistry, - transportSearchAction, - clusterServiceMock, - searchTransportService - ); - DeletePitRequest deletePITRequest = new DeletePitRequest(pitId); - PlainActionFuture future = newFuture(); - action.execute(task, deletePITRequest, future); - DeletePitResponse dr = future.get(); - assertEquals(false, dr.isSucceeded()); - assertEquals(3, deleteNodesInvoked.size()); - } finally { - assertTrue(OpenSearchTestCase.terminate(threadPool)); + transportService.start(); + transportService.acceptIncomingRequests(); + SearchTransportService searchTransportService = new SearchTransportService(transportService, null) { + + @Override + public void sendFreePITContext( + Transport.Connection connection, + ShardSearchContextId contextId, + ActionListener listener + ) { + deleteNodesInvoked.add(connection.getNode()); + + if (connection.getNode().getId() == "node_3") { + Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(false))); + t.start(); + } else { + Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(true))); + t.start(); + } + } + + @Override + public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } + }; + TransportDeletePitAction action = new TransportDeletePitAction( + transportService, + actionFilters, + namedWriteableRegistry, + transportSearchAction, + clusterServiceMock, + searchTransportService + ); + DeletePitRequest deletePITRequest = new DeletePitRequest(pitId); + PlainActionFuture future = newFuture(); + action.execute(task, deletePITRequest, future); + DeletePitResponse dr = future.get(); + assertEquals(false, dr.isSucceeded()); + assertEquals(3, deleteNodesInvoked.size()); + } } } public void testDeleteAllPitWhenNodeIsDown() throws InterruptedException, ExecutionException { List deleteNodesInvoked = new CopyOnWriteArrayList<>(); - Settings settings = Settings.builder().put("node.name", TransportMultiSearchActionTests.class.getSimpleName()).build(); ActionFilters actionFilters = mock(ActionFilters.class); when(actionFilters.filters()).thenReturn(new ActionFilter[0]); - ThreadPool threadPool = new ThreadPool(settings); - try { - SearchTransportService searchTransportService = new SearchTransportService(null, null) { - @Override - public void sendFreeAllPitContexts(Transport.Connection connection, final ActionListener listener) { - deleteNodesInvoked.add(connection.getNode()); - if (connection.getNode().getId() == "node_3") { - Thread t = new Thread(() -> listener.onFailure(new Exception("node 3 down"))); - t.start(); - } else { - Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(true))); - t.start(); - } - } - - @Override - public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { - return new SearchAsyncActionTests.MockConnection(node); - } - }; - TransportService transportService = new TransportService( - Settings.EMPTY, - mock(Transport.class), - threadPool, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, - boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), - null, - Collections.emptySet() + + List knownNodes = new CopyOnWriteArrayList<>(); + try ( + MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) + ) { + knownNodes.add(cluster1Transport.getLocalDiscoNode()); + knownNodes.add(cluster2Transport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + + try ( + MockTransportService transportService = MockTransportService.createNewService( + Settings.EMPTY, + Version.CURRENT, + threadPool, + null + ) ) { - @Override - public TaskManager getTaskManager() { - return taskManager; - } - }; - TransportDeletePitAction action = new TransportDeletePitAction( - transportService, - actionFilters, - namedWriteableRegistry, - transportSearchAction, - clusterServiceMock, - searchTransportService - ); - DeletePitRequest deletePITRequest = new DeletePitRequest("_all"); - PlainActionFuture future = newFuture(); - action.execute(task, deletePITRequest, future); - DeletePitResponse dr = future.get(); - assertEquals(false, dr.isSucceeded()); - assertEquals(3, deleteNodesInvoked.size()); - } finally { - assertTrue(OpenSearchTestCase.terminate(threadPool)); + transportService.start(); + transportService.acceptIncomingRequests(); + SearchTransportService searchTransportService = new SearchTransportService(transportService, null) { + @Override + public void sendFreeAllPitContexts(Transport.Connection connection, final ActionListener listener) { + deleteNodesInvoked.add(connection.getNode()); + if (connection.getNode().getId() == "node_3") { + Thread t = new Thread(() -> listener.onFailure(new Exception("node 3 down"))); + t.start(); + } else { + Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(true))); + t.start(); + } + } + + @Override + public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } + }; + TransportDeletePitAction action = new TransportDeletePitAction( + transportService, + actionFilters, + namedWriteableRegistry, + transportSearchAction, + clusterServiceMock, + searchTransportService + ); + DeletePitRequest deletePITRequest = new DeletePitRequest("_all"); + PlainActionFuture future = newFuture(); + action.execute(task, deletePITRequest, future); + DeletePitResponse dr = future.get(); + assertEquals(false, dr.isSucceeded()); + assertEquals(3, deleteNodesInvoked.size()); + } } } public void testDeleteAllPitWhenAllNodesAreDown() throws InterruptedException, ExecutionException { List deleteNodesInvoked = new CopyOnWriteArrayList<>(); - Settings settings = Settings.builder().put("node.name", TransportMultiSearchActionTests.class.getSimpleName()).build(); ActionFilters actionFilters = mock(ActionFilters.class); when(actionFilters.filters()).thenReturn(new ActionFilter[0]); - ThreadPool threadPool = new ThreadPool(settings); - try { - SearchTransportService searchTransportService = new SearchTransportService(null, null) { - - @Override - public void sendFreeAllPitContexts(Transport.Connection connection, final ActionListener listener) { - deleteNodesInvoked.add(connection.getNode()); - Thread t = new Thread(() -> listener.onFailure(new Exception("node down"))); - t.start(); - } - - @Override - public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { - return new SearchAsyncActionTests.MockConnection(node); - } - }; - TransportService transportService = new TransportService( - Settings.EMPTY, - mock(Transport.class), - threadPool, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, - boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), - null, - Collections.emptySet() + + List knownNodes = new CopyOnWriteArrayList<>(); + try ( + MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) + ) { + knownNodes.add(cluster1Transport.getLocalDiscoNode()); + knownNodes.add(cluster2Transport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + + try ( + MockTransportService transportService = MockTransportService.createNewService( + Settings.EMPTY, + Version.CURRENT, + threadPool, + null + ) ) { - @Override - public TaskManager getTaskManager() { - return taskManager; - } - }; - TransportDeletePitAction action = new TransportDeletePitAction( - transportService, - actionFilters, - namedWriteableRegistry, - transportSearchAction, - clusterServiceMock, - searchTransportService - ); - DeletePitRequest deletePITRequest = new DeletePitRequest("_all"); - PlainActionFuture future = newFuture(); - action.execute(task, deletePITRequest, future); - DeletePitResponse dr = future.get(); - assertEquals(false, dr.isSucceeded()); - assertEquals(3, deleteNodesInvoked.size()); - } finally { - assertTrue(OpenSearchTestCase.terminate(threadPool)); + transportService.start(); + transportService.acceptIncomingRequests(); + SearchTransportService searchTransportService = new SearchTransportService(transportService, null) { + + @Override + public void sendFreeAllPitContexts(Transport.Connection connection, final ActionListener listener) { + deleteNodesInvoked.add(connection.getNode()); + Thread t = new Thread(() -> listener.onFailure(new Exception("node down"))); + t.start(); + } + + @Override + public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } + }; + TransportDeletePitAction action = new TransportDeletePitAction( + transportService, + actionFilters, + namedWriteableRegistry, + transportSearchAction, + clusterServiceMock, + searchTransportService + ); + DeletePitRequest deletePITRequest = new DeletePitRequest("_all"); + PlainActionFuture future = newFuture(); + action.execute(task, deletePITRequest, future); + DeletePitResponse dr = future.get(); + assertEquals(false, dr.isSucceeded()); + assertEquals(3, deleteNodesInvoked.size()); + } } } public void testDeleteAllPitFailure() throws InterruptedException, ExecutionException { List deleteNodesInvoked = new CopyOnWriteArrayList<>(); - Settings settings = Settings.builder().put("node.name", TransportMultiSearchActionTests.class.getSimpleName()).build(); ActionFilters actionFilters = mock(ActionFilters.class); when(actionFilters.filters()).thenReturn(new ActionFilter[0]); - ThreadPool threadPool = new ThreadPool(settings); - try { - SearchTransportService searchTransportService = new SearchTransportService(null, null) { - - public void sendFreeAllPitContexts(Transport.Connection connection, final ActionListener listener) { - deleteNodesInvoked.add(connection.getNode()); - if (connection.getNode().getId() == "node_3") { - Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(false))); - t.start(); - } else { - Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(true))); - t.start(); - } - } - - @Override - public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { - return new SearchAsyncActionTests.MockConnection(node); - } - }; - TransportService transportService = new TransportService( - Settings.EMPTY, - mock(Transport.class), - threadPool, - TransportService.NOOP_TRANSPORT_INTERCEPTOR, - boundAddress -> DiscoveryNode.createLocal(settings, boundAddress.publishAddress(), UUIDs.randomBase64UUID()), - null, - Collections.emptySet() + + List knownNodes = new CopyOnWriteArrayList<>(); + try ( + MockTransportService cluster1Transport = startTransport("cluster_1_node", knownNodes, Version.CURRENT); + MockTransportService cluster2Transport = startTransport("cluster_2_node", knownNodes, Version.CURRENT) + ) { + knownNodes.add(cluster1Transport.getLocalDiscoNode()); + knownNodes.add(cluster2Transport.getLocalDiscoNode()); + Collections.shuffle(knownNodes, random()); + + try ( + MockTransportService transportService = MockTransportService.createNewService( + Settings.EMPTY, + Version.CURRENT, + threadPool, + null + ) ) { - @Override - public TaskManager getTaskManager() { - return taskManager; - } - }; - TransportDeletePitAction action = new TransportDeletePitAction( - transportService, - actionFilters, - namedWriteableRegistry, - transportSearchAction, - clusterServiceMock, - searchTransportService - ); - DeletePitRequest deletePITRequest = new DeletePitRequest("_all"); - PlainActionFuture future = newFuture(); - action.execute(task, deletePITRequest, future); - DeletePitResponse dr = future.get(); - assertEquals(false, dr.isSucceeded()); - assertEquals(3, deleteNodesInvoked.size()); - } finally { - assertTrue(OpenSearchTestCase.terminate(threadPool)); + transportService.start(); + transportService.acceptIncomingRequests(); + SearchTransportService searchTransportService = new SearchTransportService(transportService, null) { + + public void sendFreeAllPitContexts(Transport.Connection connection, final ActionListener listener) { + deleteNodesInvoked.add(connection.getNode()); + if (connection.getNode().getId() == "node_3") { + Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(false))); + t.start(); + } else { + Thread t = new Thread(() -> listener.onResponse(new SearchFreeContextResponse(true))); + t.start(); + } + } + + @Override + public Transport.Connection getConnection(String clusterAlias, DiscoveryNode node) { + return new SearchAsyncActionTests.MockConnection(node); + } + }; + TransportDeletePitAction action = new TransportDeletePitAction( + transportService, + actionFilters, + namedWriteableRegistry, + transportSearchAction, + clusterServiceMock, + searchTransportService + ); + DeletePitRequest deletePITRequest = new DeletePitRequest("_all"); + PlainActionFuture future = newFuture(); + action.execute(task, deletePITRequest, future); + DeletePitResponse dr = future.get(); + assertEquals(false, dr.isSucceeded()); + assertEquals(3, deleteNodesInvoked.size()); + } } }