From 4fb17b0470484cd161d4a985a3b95eeaac50c321 Mon Sep 17 00:00:00 2001 From: Kaituo Li Date: Tue, 22 Dec 2020 20:18:10 -0800 Subject: [PATCH] Fix the profile API returns prematurely. MultiResponsesDelegateActionListener helps send multiple requests asynchronously and return one final response altogether. While waiting for all inflight requests, the method respondImmediately and failImmediately can stop waiting and return immediately. While these two methods are convenient, it is easy to misuse them and cause bugs (see https://github.com/opendistro-for-elasticsearch/anomaly-detection/issues/339 for example). This PR removes the method respondImmediately and failImmediately and refactor profile runner to avoid using them. This PR also stops printing out the unknown entity state since it is not useful. Testing done: 1. Added unit tests to verify the bug fix. 2. Manual tests to run profile calls for single-stream and multi-entity detectors for different phases of the detector lifecycle (disabled, init, running). Verified profile results make sense. --- .../ad/AnomalyDetectorProfileRunner.java | 100 ++++--- .../ad/EntityProfileRunner.java | 65 +++-- .../ad/constant/CommonErrorMessages.java | 1 + .../ad/model/EntityProfile.java | 6 +- .../MultiResponsesDelegateActionListener.java | 16 -- .../ad/AbstractProfileRunnerTests.java | 158 +++++++++++ .../ad/AnomalyDetectorProfileRunnerTests.java | 138 +++------ .../ad/TestHelpers.java | 30 +- .../ad/model/EntityProfileTests.java | 59 ++++ ...ndexAnomalyDetectorActionHandlerTests.java | 31 +-- .../metrics/CardinalityProfileTests.java | 261 ++++++++++++++++++ 11 files changed, 631 insertions(+), 234 deletions(-) create mode 100644 src/test/java/com/amazon/opendistroforelasticsearch/ad/AbstractProfileRunnerTests.java create mode 100644 src/test/java/com/amazon/opendistroforelasticsearch/ad/model/EntityProfileTests.java create mode 100644 src/test/java/org/elasticsearch/search/aggregations/metrics/CardinalityProfileTests.java diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunner.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunner.java index 331687f5..535deb68 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunner.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunner.java @@ -99,7 +99,6 @@ public void profile(String detectorId, ActionListener listener, listener.onFailure(new InvalidParameterException(CommonErrorMessages.EMPTY_PROFILES_COLLECT)); return; } - calculateTotalResponsesToWait(detectorId, profilesToCollect, listener); } @@ -118,10 +117,38 @@ private void calculateTotalResponsesToWait( ) { ensureExpectedToken(XContentParser.Token.START_OBJECT, xContentParser.nextToken(), xContentParser); AnomalyDetector detector = AnomalyDetector.parse(xContentParser, detectorId); + + prepareProfile(detector, listener, profilesToCollect); + } catch (Exception e) { + listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId, e)); + } + } else { + listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId)); + } + }, exception -> listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId, exception)))); + } + + private void prepareProfile( + AnomalyDetector detector, + ActionListener listener, + Set profilesToCollect + ) { + String detectorId = detector.getDetectorId(); + GetRequest getRequest = new GetRequest(ANOMALY_DETECTOR_JOB_INDEX, detectorId); + client.get(getRequest, ActionListener.wrap(getResponse -> { + if (getResponse != null && getResponse.isExists()) { + try ( + XContentParser parser = XContentType.JSON + .xContent() + .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, getResponse.getSourceAsString()) + ) { + ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); + AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser); + long enabledTimeMs = job.getEnabledTime().toEpochMilli(); + boolean isMultiEntityDetector = detector.isMultientityDetector(); int totalResponsesToWait = 0; - if (profilesToCollect.contains(DetectorProfileName.ERROR)) { totalResponsesToWait++; } @@ -158,50 +185,20 @@ private void calculateTotalResponsesToWait( new MultiResponsesDelegateActionListener( listener, totalResponsesToWait, - "Fail to fetch profile for " + detectorId, + CommonErrorMessages.FAIL_FETCH_ERR_MSG + detectorId, false ); - prepareProfile(detector, delegateListener, profilesToCollect); - } catch (Exception e) { - listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId, e)); - } - } else { - listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId)); - } - }, exception -> listener.onFailure(new RuntimeException(CommonErrorMessages.FAIL_TO_FIND_DETECTOR_MSG + detectorId, exception)))); - } - - private void prepareProfile( - AnomalyDetector detector, - MultiResponsesDelegateActionListener listener, - Set profilesToCollect - ) { - String detectorId = detector.getDetectorId(); - GetRequest getRequest = new GetRequest(ANOMALY_DETECTOR_JOB_INDEX, detectorId); - client.get(getRequest, ActionListener.wrap(getResponse -> { - if (getResponse != null && getResponse.isExists()) { - try ( - XContentParser parser = XContentType.JSON - .xContent() - .createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, getResponse.getSourceAsString()) - ) { - ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); - AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser); - long enabledTimeMs = job.getEnabledTime().toEpochMilli(); - if (profilesToCollect.contains(DetectorProfileName.ERROR)) { GetRequest getStateRequest = new GetRequest(DetectorInternalState.DETECTOR_STATE_INDEX, detectorId); - client.get(getStateRequest, onGetDetectorState(listener, detectorId, enabledTimeMs)); + client.get(getStateRequest, onGetDetectorState(delegateListener, detectorId, enabledTimeMs)); } - boolean isMultiEntityDetector = detector.isMultientityDetector(); - // total number of listeners we need to define. Needed by MultiResponsesDelegateActionListener to decide // when to consolidate results and return to users if (isMultiEntityDetector) { if (profilesToCollect.contains(DetectorProfileName.TOTAL_ENTITIES)) { - profileEntityStats(listener, detector); + profileEntityStats(delegateListener, detector); } if (profilesToCollect.contains(DetectorProfileName.COORDINATING_NODE) || profilesToCollect.contains(DetectorProfileName.SHINGLE_SIZE) @@ -210,24 +207,24 @@ private void prepareProfile( || profilesToCollect.contains(DetectorProfileName.ACTIVE_ENTITIES) || profilesToCollect.contains(DetectorProfileName.INIT_PROGRESS) || profilesToCollect.contains(DetectorProfileName.STATE)) { - profileModels(detector, profilesToCollect, job, true, listener); + profileModels(detector, profilesToCollect, job, true, delegateListener); } } else { if (profilesToCollect.contains(DetectorProfileName.STATE) || profilesToCollect.contains(DetectorProfileName.INIT_PROGRESS)) { - profileStateRelated(detector, listener, job.isEnabled(), profilesToCollect); + profileStateRelated(detector, delegateListener, job.isEnabled(), profilesToCollect); } if (profilesToCollect.contains(DetectorProfileName.COORDINATING_NODE) || profilesToCollect.contains(DetectorProfileName.SHINGLE_SIZE) || profilesToCollect.contains(DetectorProfileName.TOTAL_SIZE_IN_BYTES) || profilesToCollect.contains(DetectorProfileName.MODELS)) { - profileModels(detector, profilesToCollect, job, false, listener); + profileModels(detector, profilesToCollect, job, false, delegateListener); } } - } catch (IOException | XContentParseException | NullPointerException e) { - logger.error(e); - listener.failImmediately(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG, e); + } catch (Exception e) { + logger.error(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG, e); + listener.onFailure(e); } } else { onGetDetectorForPrepare(listener, profilesToCollect); @@ -261,20 +258,19 @@ private void profileEntityStats(MultiResponsesDelegateActionListener { listener.failImmediately(CommonErrorMessages.FAIL_TO_GET_TOTAL_ENTITIES + detector.getDetectorId()); }) - ); + }, searchException -> { + logger.warn(CommonErrorMessages.FAIL_TO_GET_TOTAL_ENTITIES + detector.getDetectorId()); + listener.onFailure(searchException); + })); } } - private void onGetDetectorForPrepare( - MultiResponsesDelegateActionListener listener, - Set profiles - ) { + private void onGetDetectorForPrepare(ActionListener listener, Set profiles) { DetectorProfile.Builder profileBuilder = new DetectorProfile.Builder(); if (profiles.contains(DetectorProfileName.STATE)) { profileBuilder.state(DetectorState.DISABLED); } - listener.respondImmediately(profileBuilder.build()); + listener.onResponse(profileBuilder.build()); } /** @@ -340,8 +336,8 @@ private ActionListener onGetDetectorState( listener.onResponse(profileBuilder.build()); } catch (IOException | XContentParseException | NullPointerException e) { - logger.error(e); - listener.failImmediately(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG, e); + logger.error(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG, e); + listener.onFailure(e); } } else { // detector state for this detector does not exist @@ -475,7 +471,7 @@ private ActionListener onInittedEver( "Fail to find any anomaly result with anomaly score larger than 0 after AD job enabled time for detector {}", detector.getDetectorId() ); - listener.failImmediately(new RuntimeException("Fail to find detector state: " + detector.getDetectorId(), exception)); + listener.onFailure(exception); } }); } @@ -523,7 +519,7 @@ private ActionListener onPollRCFUpdates( new ParameterizedMessage("Fail to get init progress through messaging for {}", detector.getDetectorId()), exception ); - listener.failImmediately(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG + detector.getDetectorId(), exception); + listener.onFailure(exception); } }); } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/EntityProfileRunner.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/EntityProfileRunner.java index 97e82924..23ee639a 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/EntityProfileRunner.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/EntityProfileRunner.java @@ -20,7 +20,6 @@ import static com.amazon.opendistroforelasticsearch.ad.settings.AnomalyDetectorSettings.CATEGORY_FIELD_LIMIT; import static org.elasticsearch.common.xcontent.XContentParserUtils.ensureExpectedToken; -import java.io.IOException; import java.security.InvalidParameterException; import java.util.List; import java.util.Optional; @@ -35,7 +34,6 @@ import org.elasticsearch.client.Client; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; -import org.elasticsearch.common.xcontent.XContentParseException; import org.elasticsearch.common.xcontent.XContentParser; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.IndexNotFoundException; @@ -113,25 +111,7 @@ public void profile( new InvalidParameterException(CommonErrorMessages.CATEGORICAL_FIELD_NUMBER_SURPASSED + CATEGORY_FIELD_LIMIT) ); } else { - int totalResponsesToWait = 0; - if (profilesToCollect.contains(EntityProfileName.INIT_PROGRESS) - || profilesToCollect.contains(EntityProfileName.STATE)) { - totalResponsesToWait++; - } - if (profilesToCollect.contains(EntityProfileName.ENTITY_INFO)) { - totalResponsesToWait++; - } - if (profilesToCollect.contains(EntityProfileName.MODELS)) { - totalResponsesToWait++; - } - MultiResponsesDelegateActionListener delegateListener = - new MultiResponsesDelegateActionListener( - listener, - totalResponsesToWait, - "Fail to fetch profile for " + entityValue + " of detector " + detectorId, - false - ); - prepareEntityProfile(delegateListener, detectorId, entityValue, profilesToCollect, detector, categoryField.get(0)); + prepareEntityProfile(listener, detectorId, entityValue, profilesToCollect, detector, categoryField.get(0)); } } catch (Exception t) { listener.onFailure(t); @@ -143,7 +123,7 @@ public void profile( } private void prepareEntityProfile( - MultiResponsesDelegateActionListener delegateListener, + ActionListener listener, String detectorId, String entityValue, Set profilesToCollect, @@ -158,8 +138,8 @@ private void prepareEntityProfile( request, ActionListener .wrap( - r -> getJob(detectorId, categoryField, entityValue, profilesToCollect, detector, r, delegateListener), - delegateListener::failImmediately + r -> getJob(detectorId, categoryField, entityValue, profilesToCollect, detector, r, listener), + listener::onFailure ) ); } @@ -171,7 +151,7 @@ private void getJob( Set profilesToCollect, AnomalyDetector detector, EntityProfileResponse entityProfileResponse, - MultiResponsesDelegateActionListener delegateListener + ActionListener listener ) { GetRequest getRequest = new GetRequest(ANOMALY_DETECTOR_JOB_INDEX, detectorId); client.get(getRequest, ActionListener.wrap(getResponse -> { @@ -184,6 +164,25 @@ private void getJob( ensureExpectedToken(XContentParser.Token.START_OBJECT, parser.nextToken(), parser); AnomalyDetectorJob job = AnomalyDetectorJob.parse(parser); + int totalResponsesToWait = 0; + if (profilesToCollect.contains(EntityProfileName.INIT_PROGRESS) + || profilesToCollect.contains(EntityProfileName.STATE)) { + totalResponsesToWait++; + } + if (profilesToCollect.contains(EntityProfileName.ENTITY_INFO)) { + totalResponsesToWait++; + } + if (profilesToCollect.contains(EntityProfileName.MODELS)) { + totalResponsesToWait++; + } + MultiResponsesDelegateActionListener delegateListener = + new MultiResponsesDelegateActionListener( + listener, + totalResponsesToWait, + "Fail to fetch profile for " + entityValue + " of detector " + detectorId, + false + ); + if (profilesToCollect.contains(EntityProfileName.MODELS)) { EntityProfile.Builder builder = new EntityProfile.Builder(categoryField, entityValue); if (false == job.isEnabled()) { @@ -233,20 +232,20 @@ private void getJob( delegateListener.onResponse(builder.build()); })); } - } catch (IOException | XContentParseException | NullPointerException e) { - logger.error(e); - delegateListener.failImmediately(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG, e); + } catch (Exception e) { + logger.error(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG, e); + listener.onFailure(e); } } else { - sendUnknownState(profilesToCollect, categoryField, entityValue, true, delegateListener); + sendUnknownState(profilesToCollect, categoryField, entityValue, true, listener); } }, exception -> { if (exception instanceof IndexNotFoundException) { logger.info(exception.getMessage()); - sendUnknownState(profilesToCollect, categoryField, entityValue, true, delegateListener); + sendUnknownState(profilesToCollect, categoryField, entityValue, true, listener); } else { logger.error(CommonErrorMessages.FAIL_TO_GET_PROFILE_MSG + detectorId, exception); - delegateListener.failImmediately(exception); + listener.onFailure(exception); } })); } @@ -285,14 +284,14 @@ private void sendUnknownState( String categoryField, String entityValue, boolean immediate, - MultiResponsesDelegateActionListener delegateListener + ActionListener delegateListener ) { EntityProfile.Builder builder = new EntityProfile.Builder(categoryField, entityValue); if (profilesToCollect.contains(EntityProfileName.STATE)) { builder.state(EntityState.UNKNOWN); } if (immediate) { - delegateListener.respondImmediately(builder.build()); + delegateListener.onResponse(builder.build()); } else { delegateListener.onResponse(builder.build()); } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/constant/CommonErrorMessages.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/constant/CommonErrorMessages.java index e5446ac3..253c6b30 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/constant/CommonErrorMessages.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/constant/CommonErrorMessages.java @@ -34,4 +34,5 @@ public class CommonErrorMessages { public static String FAIL_TO_GET_TOTAL_ENTITIES = "Failed to get total entities for detector "; public static String CATEGORICAL_FIELD_NUMBER_SURPASSED = "We don't support categorical fields more than "; public static String EMPTY_PROFILES_COLLECT = "profiles to collect are missing or invalid"; + public static String FAIL_FETCH_ERR_MSG = "Fail to fetch profile for "; } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/EntityProfile.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/EntityProfile.java index a74070be..3a7dbe58 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/EntityProfile.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/model/EntityProfile.java @@ -214,7 +214,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws if (modelProfile != null) { builder.field(CommonName.MODEL, modelProfile); } - if (state != null) { + if (state != EntityState.UNKNOWN) { builder.field(CommonName.STATE, state); } builder.endObject(); @@ -263,7 +263,7 @@ public String toString() { if (modelProfile != null) { builder.append(CommonName.MODELS, modelProfile); } - if (state != null) { + if (state != EntityState.UNKNOWN) { builder.append(CommonName.STATE, state); } return builder.toString(); @@ -330,7 +330,7 @@ public void merge(Mergeable other) { if (otherProfile.modelProfile != null) { this.modelProfile = otherProfile.modelProfile; } - if (otherProfile.getState() != null) { + if (otherProfile.getState() != EntityState.UNKNOWN) { this.state = otherProfile.getState(); } } diff --git a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/MultiResponsesDelegateActionListener.java b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/MultiResponsesDelegateActionListener.java index 52c9380a..f580b264 100644 --- a/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/MultiResponsesDelegateActionListener.java +++ b/src/main/java/com/amazon/opendistroforelasticsearch/ad/util/MultiResponsesDelegateActionListener.java @@ -109,20 +109,4 @@ private void handleSavedResponses() { this.delegate.onResponse(response0); } } - - public void failImmediately(Exception e) { - this.delegate.onFailure(new RuntimeException(finalErrorMsg, e)); - } - - public void failImmediately(String errMsg) { - this.delegate.onFailure(new RuntimeException(errMsg)); - } - - public void failImmediately(String errMsg, Exception e) { - this.delegate.onFailure(new RuntimeException(errMsg, e)); - } - - public void respondImmediately(T o) { - this.delegate.onResponse(o); - } } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/AbstractProfileRunnerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/AbstractProfileRunnerTests.java new file mode 100644 index 00000000..5f3b4900 --- /dev/null +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/AbstractProfileRunnerTests.java @@ -0,0 +1,158 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad; + +import static java.util.Collections.emptyMap; +import static java.util.Collections.emptySet; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import org.elasticsearch.Version; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.client.Client; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.transport.TransportAddress; +import org.junit.Before; +import org.junit.BeforeClass; + +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; +import com.amazon.opendistroforelasticsearch.ad.model.DetectorProfileName; +import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer; + +public class AbstractProfileRunnerTests extends AbstractADTest { + protected enum DetectorStatus { + INDEX_NOT_EXIST, + NO_DOC, + EXIST + } + + protected enum JobStatus { + INDEX_NOT_EXIT, + DISABLED, + ENABLED + } + + protected enum ErrorResultStatus { + INDEX_NOT_EXIT, + NO_ERROR, + SHINGLE_ERROR, + STOPPED_ERROR, + NULL_POINTER_EXCEPTION + } + + protected AnomalyDetectorProfileRunner runner; + protected Client client; + protected DiscoveryNodeFilterer nodeFilter; + protected AnomalyDetector detector; + protected ClusterService clusterService; + + protected static Set stateOnly; + protected static Set stateNError; + protected static Set modelProfile; + protected static Set stateInitProgress; + protected static Set totalInitProgress; + protected static Set initProgressErrorProfile; + + protected static String noFullShingleError = "No full shingle in current detection window"; + protected static String stoppedError = + "Stopped detector as job failed consecutively for more than 3 times: Having trouble querying data." + + " Maybe all of your features have been disabled."; + + protected static String clusterName; + protected static DiscoveryNode discoveryNode1; + + protected int requiredSamples; + protected int neededSamples; + + // profile model related + protected String node1; + protected String nodeName1; + + protected String node2; + protected String nodeName2; + protected DiscoveryNode discoveryNode2; + + protected long modelSize; + protected String model1Id; + protected String model0Id; + + protected int shingleSize; + + protected int detectorIntervalMin; + protected GetResponse detectorGetReponse; + protected String messaingExceptionError = "blah"; + + @BeforeClass + public static void setUpOnce() { + stateOnly = new HashSet(); + stateOnly.add(DetectorProfileName.STATE); + stateNError = new HashSet(); + stateNError.add(DetectorProfileName.ERROR); + stateNError.add(DetectorProfileName.STATE); + stateInitProgress = new HashSet(); + stateInitProgress.add(DetectorProfileName.INIT_PROGRESS); + stateInitProgress.add(DetectorProfileName.STATE); + modelProfile = new HashSet( + Arrays + .asList( + DetectorProfileName.SHINGLE_SIZE, + DetectorProfileName.MODELS, + DetectorProfileName.COORDINATING_NODE, + DetectorProfileName.TOTAL_SIZE_IN_BYTES + ) + ); + totalInitProgress = new HashSet( + Arrays.asList(DetectorProfileName.TOTAL_ENTITIES, DetectorProfileName.INIT_PROGRESS) + ); + initProgressErrorProfile = new HashSet( + Arrays.asList(DetectorProfileName.INIT_PROGRESS, DetectorProfileName.ERROR) + ); + clusterName = "test-cluster-name"; + discoveryNode1 = new DiscoveryNode( + "nodeName1", + "node1", + new TransportAddress(TransportAddress.META_ADDRESS, 9300), + emptyMap(), + emptySet(), + Version.CURRENT + ); + } + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + client = mock(Client.class); + nodeFilter = mock(DiscoveryNodeFilterer.class); + clusterService = mock(ClusterService.class); + when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("test cluster")).build()); + + requiredSamples = 128; + neededSamples = 5; + + runner = new AnomalyDetectorProfileRunner(client, xContentRegistry(), nodeFilter, requiredSamples); + + detectorIntervalMin = 3; + detectorGetReponse = mock(GetResponse.class); + } +} diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunnerTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunnerTests.java index fd92d61e..bc40ab6d 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunnerTests.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/AnomalyDetectorProfileRunnerTests.java @@ -21,7 +21,6 @@ import static java.util.Collections.emptySet; import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.doAnswer; -import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.io.IOException; @@ -30,7 +29,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -42,17 +40,12 @@ import org.elasticsearch.action.FailedNodeException; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; -import org.elasticsearch.client.Client; import org.elasticsearch.cluster.ClusterName; -import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.node.DiscoveryNode; -import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.NotSerializableExceptionWrapper; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.transport.RemoteTransportException; -import org.junit.Before; -import org.junit.BeforeClass; import com.amazon.opendistroforelasticsearch.ad.common.exception.AnomalyDetectionException; import com.amazon.opendistroforelasticsearch.ad.common.exception.ResourceNotFoundException; @@ -72,96 +65,8 @@ import com.amazon.opendistroforelasticsearch.ad.transport.ProfileResponse; import com.amazon.opendistroforelasticsearch.ad.transport.RCFPollingAction; import com.amazon.opendistroforelasticsearch.ad.transport.RCFPollingResponse; -import com.amazon.opendistroforelasticsearch.ad.util.DiscoveryNodeFilterer; - -public class AnomalyDetectorProfileRunnerTests extends AbstractADTest { - private AnomalyDetectorProfileRunner runner; - private Client client; - private DiscoveryNodeFilterer nodeFilter; - private AnomalyDetector detector; - private ClusterService clusterService; - - private static Set stateOnly; - private static Set stateNError; - private static Set modelProfile; - private static Set stateInitProgress; - private static String noFullShingleError = "No full shingle in current detection window"; - private static String stoppedError = "Stopped detector as job failed consecutively for more than 3 times: Having trouble querying data." - + " Maybe all of your features have been disabled."; - - private int requiredSamples; - private int neededSamples; - - // profile model related - private String node1; - private String nodeName1; - private DiscoveryNode discoveryNode1; - - private String node2; - private String nodeName2; - private DiscoveryNode discoveryNode2; - - private long modelSize; - private String model1Id; - private String model0Id; - - private int shingleSize; - - private int detectorIntervalMin; - private GetResponse detectorGetReponse; - private String messaingExceptionError = "blah"; - - @BeforeClass - public static void setUpOnce() { - stateOnly = new HashSet(); - stateOnly.add(DetectorProfileName.STATE); - stateNError = new HashSet(); - stateNError.add(DetectorProfileName.ERROR); - stateNError.add(DetectorProfileName.STATE); - stateInitProgress = new HashSet(); - stateInitProgress.add(DetectorProfileName.INIT_PROGRESS); - stateInitProgress.add(DetectorProfileName.STATE); - modelProfile = new HashSet( - Arrays - .asList( - DetectorProfileName.SHINGLE_SIZE, - DetectorProfileName.MODELS, - DetectorProfileName.COORDINATING_NODE, - DetectorProfileName.TOTAL_SIZE_IN_BYTES - ) - ); - } - - @Override - @Before - public void setUp() throws Exception { - super.setUp(); - client = mock(Client.class); - nodeFilter = mock(DiscoveryNodeFilterer.class); - clusterService = mock(ClusterService.class); - when(clusterService.state()).thenReturn(ClusterState.builder(new ClusterName("test cluster")).build()); - - requiredSamples = 128; - neededSamples = 5; - - runner = new AnomalyDetectorProfileRunner(client, xContentRegistry(), nodeFilter, requiredSamples); - - detectorIntervalMin = 3; - detectorGetReponse = mock(GetResponse.class); - } - - enum DetectorStatus { - INDEX_NOT_EXIST, - NO_DOC, - EXIST - } - - enum JobStatus { - INDEX_NOT_EXIT, - DISABLED, - ENABLED - } +public class AnomalyDetectorProfileRunnerTests extends AbstractProfileRunnerTests { enum RCFPollingStatus { INIT_NOT_EXIT, REMOTE_INIT_NOT_EXIT, @@ -173,13 +78,14 @@ enum RCFPollingStatus { INITTING } - enum ErrorResultStatus { - INDEX_NOT_EXIT, - NO_ERROR, - SHINGLE_ERROR, - STOPPED_ERROR - } - + /** + * Convenience methods for single-stream detector profile tests set up + * @param detectorStatus Detector config status + * @param jobStatus Detector job status + * @param rcfPollingStatus RCF polling result status + * @param errorResultStatus Error result status + * @throws IOException when failing the getting request + */ @SuppressWarnings("unchecked") private void setUpClientGet( DetectorStatus detectorStatus, @@ -188,6 +94,7 @@ private void setUpClientGet( ErrorResultStatus errorResultStatus ) throws IOException { detector = TestHelpers.randomAnomalyDetectorWithInterval(new IntervalTimeConfiguration(detectorIntervalMin, ChronoUnit.MINUTES)); + doAnswer(invocation -> { Object[] args = invocation.getArguments(); GetRequest request = (GetRequest) args[0]; @@ -639,9 +546,9 @@ public void testInitNoUpdateNoIndex() throws IOException, InterruptedException { assertEquals(expectedProfile, response); inProgressLatch.countDown(); }, exception -> { - logger.error(exception); + LOG.error(exception); for (StackTraceElement ste : exception.getStackTrace()) { - logger.info(ste); + LOG.info(ste); } assertTrue("Should not reach here ", false); inProgressLatch.countDown(); @@ -661,9 +568,9 @@ public void testInitNoIndex() throws IOException, InterruptedException { assertEquals(expectedProfile, response); inProgressLatch.countDown(); }, exception -> { - logger.error(exception); + LOG.error(exception); for (StackTraceElement ste : exception.getStackTrace()) { - logger.info(ste); + LOG.info(ste); } assertTrue("Should not reach here ", false); inProgressLatch.countDown(); @@ -674,4 +581,21 @@ public void testInitNoIndex() throws IOException, InterruptedException { public void testInvalidRequiredSamples() { expectThrows(IllegalArgumentException.class, () -> new AnomalyDetectorProfileRunner(client, xContentRegistry(), nodeFilter, 0)); } + + public void testFailRCFPolling() throws IOException, InterruptedException { + setUpClientGet(DetectorStatus.EXIST, JobStatus.ENABLED, RCFPollingStatus.EXCEPTION, ErrorResultStatus.NO_ERROR); + final CountDownLatch inProgressLatch = new CountDownLatch(1); + + runner.profile(detector.getDetectorId(), ActionListener.wrap(response -> { + assertTrue("Should not reach here ", false); + inProgressLatch.countDown(); + }, exception -> { + assertTrue(exception instanceof RuntimeException); + // this means we don't exit with failImmediately. failImmediately can make we return early when there are other concurrent + // requests + assertTrue(exception.getMessage(), exception.getMessage().contains("Exceptions:")); + inProgressLatch.countDown(); + }), stateNError); + assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS)); + } } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java index c084aa08..20909dab 100644 --- a/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/TestHelpers.java @@ -28,8 +28,10 @@ import static org.powermock.api.mockito.PowerMockito.when; import java.io.IOException; +import java.nio.ByteBuffer; import java.time.Instant; import java.time.temporal.ChronoUnit; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -38,6 +40,7 @@ import java.util.Random; import java.util.concurrent.Callable; import java.util.function.Consumer; +import java.util.stream.IntStream; import org.apache.http.Header; import org.apache.http.HttpEntity; @@ -357,7 +360,7 @@ public static AnomalyDetector randomAnomalyDetectorWithInterval(TimeConfiguratio null, randomInt(), Instant.now().truncatedTo(ChronoUnit.SECONDS), - null, + categoryField, randomUser() ); } @@ -666,6 +669,24 @@ public static GetResponse createGetResponse(ToXContentObject o, String id, Strin ); } + public static GetResponse createBrokenGetResponse(String id, String indexName) throws IOException { + ByteBuffer[] buffers = new ByteBuffer[0]; + return new GetResponse( + new GetResult( + indexName, + MapperService.SINGLE_MAPPING_NAME, + id, + UNASSIGNED_SEQ_NO, + 0, + -1, + true, + BytesReference.fromByteBuffers(buffers), + Collections.emptyMap(), + Collections.emptyMap() + ) + ); + } + public static SearchResponse createSearchResponse(ToXContentObject o) throws IOException { XContentBuilder content = o.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS); @@ -779,4 +800,11 @@ public static String toJsonString(ToXContentObject object) throws IOException { XContentBuilder builder = XContentFactory.jsonBuilder(); return TestHelpers.xContentBuilderToString(object.toXContent(builder, ToXContent.EMPTY_PARAMS)); } + + public static SearchHits createSearchHits(int totalHits) { + List hitList = new ArrayList<>(); + IntStream.range(0, totalHits).forEach(i -> hitList.add(new SearchHit(i))); + SearchHit[] hitArray = new SearchHit[hitList.size()]; + return new SearchHits(hitList.toArray(hitArray), new TotalHits(totalHits, TotalHits.Relation.EQUAL_TO), 1.0F); + } } diff --git a/src/test/java/com/amazon/opendistroforelasticsearch/ad/model/EntityProfileTests.java b/src/test/java/com/amazon/opendistroforelasticsearch/ad/model/EntityProfileTests.java new file mode 100644 index 00000000..5e195f7b --- /dev/null +++ b/src/test/java/com/amazon/opendistroforelasticsearch/ad/model/EntityProfileTests.java @@ -0,0 +1,59 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package com.amazon.opendistroforelasticsearch.ad.model; + +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; + +import java.io.IOException; + +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import test.com.amazon.opendistroforelasticsearch.ad.util.JsonDeserializer; + +import com.amazon.opendistroforelasticsearch.ad.AbstractADTest; +import com.amazon.opendistroforelasticsearch.ad.common.exception.JsonPathNotFoundException; +import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; + +public class EntityProfileTests extends AbstractADTest { + public void testMerge() { + EntityProfile profile1 = new EntityProfile(null, null, null, -1, -1, null, null, EntityState.INIT); + + EntityProfile profile2 = new EntityProfile(null, null, null, -1, -1, null, null, EntityState.UNKNOWN); + + profile1.merge(profile2); + assertEquals(profile1.getState(), EntityState.INIT); + } + + public void testToXContent() throws IOException, JsonPathNotFoundException { + EntityProfile profile1 = new EntityProfile(null, null, null, -1, -1, null, null, EntityState.INIT); + + XContentBuilder builder = jsonBuilder(); + profile1.toXContent(builder, ToXContent.EMPTY_PARAMS); + String json = Strings.toString(builder); + + assertEquals("INIT", JsonDeserializer.getTextValue(json, CommonName.STATE)); + + EntityProfile profile2 = new EntityProfile(null, null, null, -1, -1, null, null, EntityState.UNKNOWN); + + builder = jsonBuilder(); + profile2.toXContent(builder, ToXContent.EMPTY_PARAMS); + json = Strings.toString(builder); + + assertTrue(false == JsonDeserializer.hasChildNode(json, CommonName.STATE)); + } +} diff --git a/src/test/java/org/elasticsearch/action/admin/indices/mapping/get/IndexAnomalyDetectorActionHandlerTests.java b/src/test/java/org/elasticsearch/action/admin/indices/mapping/get/IndexAnomalyDetectorActionHandlerTests.java index 30949a86..f65f218b 100644 --- a/src/test/java/org/elasticsearch/action/admin/indices/mapping/get/IndexAnomalyDetectorActionHandlerTests.java +++ b/src/test/java/org/elasticsearch/action/admin/indices/mapping/get/IndexAnomalyDetectorActionHandlerTests.java @@ -27,13 +27,9 @@ import static org.mockito.Mockito.when; import java.io.IOException; -import java.util.ArrayList; import java.util.Arrays; -import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.stream.IntStream; -import org.apache.lucene.search.TotalHits; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequest; import org.elasticsearch.action.ActionResponse; @@ -53,8 +49,6 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.rest.RestRequest; -import org.elasticsearch.search.SearchHit; -import org.elasticsearch.search.SearchHits; import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.ThreadPool; import org.junit.AfterClass; @@ -182,13 +176,6 @@ public void setUp() throws Exception { ); } - private SearchHits createSearchHits(int totalHits) { - List hitList = new ArrayList<>(); - IntStream.range(0, totalHits).forEach(i -> hitList.add(new SearchHit(i))); - SearchHit[] hitArray = new SearchHit[hitList.size()]; - return new SearchHits(hitList.toArray(hitArray), new TotalHits(totalHits, TotalHits.Relation.EQUAL_TO), 1.0F); - } - public void testTwoCategoricalFields() throws IOException { expectThrows( IllegalArgumentException.class, @@ -200,7 +187,7 @@ public void testTwoCategoricalFields() throws IOException { public void testNoCategoricalField() throws IOException { SearchResponse mockResponse = mock(SearchResponse.class); int totalHits = 1001; - when(mockResponse.getHits()).thenReturn(createSearchHits(totalHits)); + when(mockResponse.getHits()).thenReturn(TestHelpers.createSearchHits(totalHits)); doAnswer(invocation -> { Object[] args = invocation.getArguments(); assertTrue(String.format("The size of args is %d. Its content is %s", args.length, Arrays.toString(args)), args.length == 2); @@ -250,7 +237,7 @@ public void testTextField() throws IOException { SearchResponse detectorResponse = mock(SearchResponse.class); int totalHits = 9; - when(detectorResponse.getHits()).thenReturn(createSearchHits(totalHits)); + when(detectorResponse.getHits()).thenReturn(TestHelpers.createSearchHits(totalHits)); // extend NodeClient since its execute method is final and mockito does not allow to mock final methods // we can also use spy to overstep the final methods @@ -313,11 +300,11 @@ private void testValidTypeTemplate(String filedTypeName) throws IOException { SearchResponse detectorResponse = mock(SearchResponse.class); int totalHits = 9; - when(detectorResponse.getHits()).thenReturn(createSearchHits(totalHits)); + when(detectorResponse.getHits()).thenReturn(TestHelpers.createSearchHits(totalHits)); SearchResponse userIndexResponse = mock(SearchResponse.class); int userIndexHits = 0; - when(userIndexResponse.getHits()).thenReturn(createSearchHits(userIndexHits)); + when(userIndexResponse.getHits()).thenReturn(TestHelpers.createSearchHits(userIndexHits)); // extend NodeClient since its execute method is final and mockito does not allow to mock final methods // we can also use spy to overstep the final methods @@ -397,14 +384,14 @@ private void testUpdateTemplate(String fieldTypeName) throws IOException { SearchResponse detectorResponse = mock(SearchResponse.class); int totalHits = 9; - when(detectorResponse.getHits()).thenReturn(createSearchHits(totalHits)); + when(detectorResponse.getHits()).thenReturn(TestHelpers.createSearchHits(totalHits)); GetResponse getDetectorResponse = TestHelpers .createGetResponse(detector, detector.getDetectorId(), AnomalyDetector.ANOMALY_DETECTORS_INDEX); SearchResponse userIndexResponse = mock(SearchResponse.class); int userIndexHits = 0; - when(userIndexResponse.getHits()).thenReturn(createSearchHits(userIndexHits)); + when(userIndexResponse.getHits()).thenReturn(TestHelpers.createSearchHits(userIndexHits)); // extend NodeClient since its execute method is final and mockito does not allow to mock final methods // we can also use spy to overstep the final methods @@ -496,7 +483,7 @@ public void testMoreThanTenMultiEntityDetectors() throws IOException { int totalHits = 11; - when(mockResponse.getHits()).thenReturn(createSearchHits(totalHits)); + when(mockResponse.getHits()).thenReturn(TestHelpers.createSearchHits(totalHits)); doAnswer(invocation -> { Object[] args = invocation.getArguments(); @@ -530,7 +517,7 @@ public void testTenMultiEntityDetectorsUpdateSingleEntityAdToMulti() throws IOEx .createGetResponse(existingDetector, existingDetector.getDetectorId(), AnomalyDetector.ANOMALY_DETECTORS_INDEX); SearchResponse searchResponse = mock(SearchResponse.class); - when(searchResponse.getHits()).thenReturn(createSearchHits(totalHits)); + when(searchResponse.getHits()).thenReturn(TestHelpers.createSearchHits(totalHits)); doAnswer(invocation -> { Object[] args = invocation.getArguments(); @@ -602,7 +589,7 @@ public void testTenMultiEntityDetectorsUpdateExistingMultiEntityAd() throws IOEx .createGetResponse(detector, detector.getDetectorId(), AnomalyDetector.ANOMALY_DETECTORS_INDEX); SearchResponse searchResponse = mock(SearchResponse.class); - when(searchResponse.getHits()).thenReturn(createSearchHits(totalHits)); + when(searchResponse.getHits()).thenReturn(TestHelpers.createSearchHits(totalHits)); doAnswer(invocation -> { Object[] args = invocation.getArguments(); diff --git a/src/test/java/org/elasticsearch/search/aggregations/metrics/CardinalityProfileTests.java b/src/test/java/org/elasticsearch/search/aggregations/metrics/CardinalityProfileTests.java new file mode 100644 index 00000000..0be67df3 --- /dev/null +++ b/src/test/java/org/elasticsearch/search/aggregations/metrics/CardinalityProfileTests.java @@ -0,0 +1,261 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.elasticsearch.search.aggregations.metrics; + +import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector.ANOMALY_DETECTORS_INDEX; +import static com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.time.temporal.ChronoUnit; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.get.GetRequest; +import org.elasticsearch.action.get.GetResponse; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.search.aggregations.InternalAggregation; +import org.elasticsearch.search.aggregations.InternalAggregations; + +import com.amazon.opendistroforelasticsearch.ad.AbstractProfileRunnerTests; +import com.amazon.opendistroforelasticsearch.ad.TestHelpers; +import com.amazon.opendistroforelasticsearch.ad.constant.CommonName; +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetector; +import com.amazon.opendistroforelasticsearch.ad.model.AnomalyDetectorJob; +import com.amazon.opendistroforelasticsearch.ad.model.DetectorInternalState; +import com.amazon.opendistroforelasticsearch.ad.model.IntervalTimeConfiguration; +import com.amazon.opendistroforelasticsearch.ad.transport.ProfileAction; +import com.amazon.opendistroforelasticsearch.ad.transport.ProfileNodeResponse; +import com.amazon.opendistroforelasticsearch.ad.transport.ProfileResponse; +import com.carrotsearch.hppc.BitMixer; + +/** + * Run tests in ES package since InternalCardinality has only package private constructors + * and we cannot mock it since it is a final class. + * + */ +public class CardinalityProfileTests extends AbstractProfileRunnerTests { + enum ADResultStatus { + NO_RESULT, + EXCEPTION + } + + enum CardinalityStatus { + EXCEPTION, + NORMAL + } + + @SuppressWarnings("unchecked") + private void setUpMultiEntityClientGet(DetectorStatus detectorStatus, JobStatus jobStatus, ErrorResultStatus errorResultStatus) + throws IOException { + detector = TestHelpers + .randomAnomalyDetectorWithInterval(new IntervalTimeConfiguration(detectorIntervalMin, ChronoUnit.MINUTES), true); + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + GetRequest request = (GetRequest) args[0]; + ActionListener listener = (ActionListener) args[1]; + + if (request.index().equals(ANOMALY_DETECTORS_INDEX)) { + switch (detectorStatus) { + case EXIST: + listener + .onResponse( + TestHelpers.createGetResponse(detector, detector.getDetectorId(), AnomalyDetector.ANOMALY_DETECTORS_INDEX) + ); + break; + default: + assertTrue("should not reach here", false); + break; + } + } else if (request.index().equals(ANOMALY_DETECTOR_JOB_INDEX)) { + AnomalyDetectorJob job = null; + switch (jobStatus) { + case ENABLED: + job = TestHelpers.randomAnomalyDetectorJob(true); + listener + .onResponse( + TestHelpers.createGetResponse(job, detector.getDetectorId(), AnomalyDetectorJob.ANOMALY_DETECTOR_JOB_INDEX) + ); + break; + default: + assertTrue("should not reach here", false); + break; + } + } else if (request.index().equals(DetectorInternalState.DETECTOR_STATE_INDEX)) { + switch (errorResultStatus) { + case NO_ERROR: + break; + case NULL_POINTER_EXCEPTION: + GetResponse response = mock(GetResponse.class); + when(response.isExists()).thenReturn(true); + doThrow(NullPointerException.class).when(response).getSourceAsString(); + listener.onResponse(response); + break; + default: + assertTrue("should not reach here", false); + break; + } + } + return null; + }).when(client).get(any(), any()); + } + + @SuppressWarnings("unchecked") + private void setUpMultiEntityClientSearch(ADResultStatus resultStatus, CardinalityStatus cardinalityStatus) { + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + ActionListener listener = (ActionListener) args[1]; + SearchRequest request = (SearchRequest) args[0]; + if (request.indices()[0].equals(CommonName.ANOMALY_RESULT_INDEX_ALIAS)) { + switch (resultStatus) { + case NO_RESULT: + SearchResponse mockResponse = mock(SearchResponse.class); + when(mockResponse.getHits()).thenReturn(TestHelpers.createSearchHits(0)); + listener.onResponse(mockResponse); + break; + case EXCEPTION: + listener.onFailure(new RuntimeException()); + break; + default: + assertTrue("should not reach here", false); + break; + } + } else { + switch (cardinalityStatus) { + case EXCEPTION: + listener.onFailure(new RuntimeException()); + break; + case NORMAL: + SearchResponse response = mock(SearchResponse.class); + List aggs = new ArrayList<>(1); + HyperLogLogPlusPlus hyperLogLog = new HyperLogLogPlusPlus( + AbstractHyperLogLog.MIN_PRECISION, + BigArrays.NON_RECYCLING_INSTANCE, + 0 + ); + for (int i = 0; i < 100; i++) { + hyperLogLog.collect(0, BitMixer.mix64(randomIntBetween(1, 100))); + } + aggs.add(new InternalCardinality(CommonName.TOTAL_ENTITIES, hyperLogLog, new HashMap<>())); + when(response.getAggregations()).thenReturn(InternalAggregations.from(aggs)); + listener.onResponse(response); + break; + default: + assertTrue("should not reach here", false); + break; + } + + } + + return null; + }).when(client).search(any(), any()); + } + + @SuppressWarnings("unchecked") + private void setUpProfileAction() { + doAnswer(invocation -> { + Object[] args = invocation.getArguments(); + + ActionListener listener = (ActionListener) args[2]; + + ProfileNodeResponse profileNodeResponse1 = new ProfileNodeResponse(discoveryNode1, new HashMap<>(), shingleSize, 0, 0); + List profileNodeResponses = Arrays.asList(profileNodeResponse1); + listener.onResponse(new ProfileResponse(new ClusterName(clusterName), profileNodeResponses, Collections.emptyList())); + + return null; + }).when(client).execute(eq(ProfileAction.INSTANCE), any(), any()); + } + + public void testFailGetEntityStats() throws IOException, InterruptedException { + setUpMultiEntityClientGet(DetectorStatus.EXIST, JobStatus.ENABLED, ErrorResultStatus.NO_ERROR); + setUpMultiEntityClientSearch(ADResultStatus.NO_RESULT, CardinalityStatus.EXCEPTION); + setUpProfileAction(); + + final CountDownLatch inProgressLatch = new CountDownLatch(1); + + runner.profile(detector.getDetectorId(), ActionListener.wrap(response -> { + assertTrue("Should not reach here ", false); + inProgressLatch.countDown(); + }, exception -> { + assertTrue(exception instanceof RuntimeException); + // this means we don't exit with failImmediately. failImmediately can make we return early when there are other concurrent + // requests + assertTrue(exception.getMessage(), exception.getMessage().contains("Exceptions:")); + inProgressLatch.countDown(); + + }), totalInitProgress); + + assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS)); + } + + public void testFailGetState() throws IOException, InterruptedException { + setUpMultiEntityClientGet(DetectorStatus.EXIST, JobStatus.ENABLED, ErrorResultStatus.NULL_POINTER_EXCEPTION); + setUpMultiEntityClientSearch(ADResultStatus.NO_RESULT, CardinalityStatus.NORMAL); + setUpProfileAction(); + + final CountDownLatch inProgressLatch = new CountDownLatch(1); + + runner.profile(detector.getDetectorId(), ActionListener.wrap(response -> { + assertTrue("Should not reach here ", false); + inProgressLatch.countDown(); + }, exception -> { + assertTrue(exception instanceof RuntimeException); + // this means we don't exit with failImmediately. failImmediately can make we return early when there are other concurrent + // requests + assertTrue(exception.getMessage(), exception.getMessage().contains("Exceptions:")); + inProgressLatch.countDown(); + + }), initProgressErrorProfile); + + assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS)); + } + + public void testFaiConfirmInitted() throws IOException, InterruptedException { + setUpMultiEntityClientGet(DetectorStatus.EXIST, JobStatus.ENABLED, ErrorResultStatus.NO_ERROR); + setUpMultiEntityClientSearch(ADResultStatus.EXCEPTION, CardinalityStatus.NORMAL); + setUpProfileAction(); + + final CountDownLatch inProgressLatch = new CountDownLatch(1); + + runner.profile(detector.getDetectorId(), ActionListener.wrap(response -> { + assertTrue("Should not reach here ", false); + inProgressLatch.countDown(); + }, exception -> { + assertTrue(exception instanceof RuntimeException); + // this means we don't exit with failImmediately. failImmediately can make we return early when there are other concurrent + // requests + assertTrue(exception.getMessage(), exception.getMessage().contains("Exceptions:")); + inProgressLatch.countDown(); + + }), totalInitProgress); + + assertTrue(inProgressLatch.await(100, TimeUnit.SECONDS)); + } +}