Skip to content

Commit

Permalink
EQL: add support for partial search results (#116388) (#118862)
Browse files Browse the repository at this point in the history
Allow queries to succeed if some shards are failing
  • Loading branch information
luigidellaquila authored Dec 17, 2024
1 parent 6ed0d26 commit 6b33e31
Show file tree
Hide file tree
Showing 65 changed files with 3,068 additions and 130 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/116388.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 116388
summary: Add support for partial shard results
area: EQL
type: enhancement
issues: []
47 changes: 47 additions & 0 deletions docs/reference/eql/eql-search-api.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,53 @@ request that targets only `bar*` still returns an error.
+
Defaults to `true`.

`allow_partial_search_results`::
(Optional, Boolean)

If `false`, the request returns an error if one or more shards involved in the query are unavailable.
+
If `true`, the query is executed only on the available shards, ignoring shard request timeouts and
<<shard-failures,shard failures>>.
+
Defaults to `false`.
+
To override the default for this field, set the
`xpack.eql.default_allow_partial_results` cluster setting to `true`.


[IMPORTANT]
====
You can also specify this value using the `allow_partial_search_results` request body parameter.
If both parameters are specified, only the query parameter is used.
====


`allow_partial_sequence_results`::
(Optional, Boolean)


Used together with `allow_partial_search_results=true`, controls the behavior of sequence queries specifically
(if `allow_partial_search_results=false`, this setting has no effect).
If `true` and if some shards are unavailable, the sequences are calculated on available shards only.
+
If `false` and if some shards are unavailable, the query only returns information about the shard failures,
but no further results.
+
Defaults to `false`.
+
Consider that sequences calculated with `allow_partial_search_results=true` can return incorrect results
(eg. if a <<eql-missing-events, missing event>> clause matches records in unavailable shards)
+
To override the default for this field, set the
`xpack.eql.default_allow_partial_sequence_results` cluster setting to `true`.


[IMPORTANT]
====
You can also specify this value using the `allow_partial_sequence_results` request body parameter.
If both parameters are specified, only the query parameter is used.
====

`ccs_minimize_roundtrips`::
(Optional, Boolean) If `true`, network round-trips between the local and the
remote cluster are minimized when running cross-cluster search (CCS) requests.
Expand Down
10 changes: 10 additions & 0 deletions rest-api-spec/src/main/resources/rest-api-spec/api/eql.search.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@
"type": "time",
"description": "Update the time interval in which the results (partial or final) for this search will be available",
"default": "5d"
},
"allow_partial_search_results": {
"type":"boolean",
"description":"Control whether the query should keep running in case of shard failures, and return partial results",
"default":false
},
"allow_partial_sequence_results": {
"type":"boolean",
"description":"Control whether a sequence query should return partial results or no results at all in case of shard failures. This option has effect only if [allow_partial_search_results] is true.",
"default":false
}
},
"body":{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ static TransportVersion def(int id) {
public static final TransportVersion KNN_QUERY_RESCORE_OVERSAMPLE = def(8_806_00_0);
public static final TransportVersion SEMANTIC_QUERY_LENIENT = def(8_807_00_0);
public static final TransportVersion ESQL_QUERY_BUILDER_IN_SEARCH_FUNCTIONS = def(8_808_00_0);
public static final TransportVersion EQL_ALLOW_PARTIAL_SEARCH_RESULTS = def(8_809_00_0);

/*
* STOP! READ THIS FIRST! No, really,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;

public abstract class BaseEqlSpecTestCase extends RemoteClusterAwareEqlRestTestCase {

protected static final String PARAM_FORMATTING = "%2$s";
Expand All @@ -52,6 +55,9 @@ public abstract class BaseEqlSpecTestCase extends RemoteClusterAwareEqlRestTestC
*/
private final int size;
private final int maxSamplesPerKey;
private final Boolean allowPartialSearchResults;
private final Boolean allowPartialSequenceResults;
private final Boolean expectShardFailures;

@Before
public void setup() throws Exception {
Expand Down Expand Up @@ -104,7 +110,16 @@ protected static List<Object[]> asArray(List<EqlSpec> specs) {
}

results.add(
new Object[] { spec.query(), name, spec.expectedEventIds(), spec.joinKeys(), spec.size(), spec.maxSamplesPerKey() }
new Object[] {
spec.query(),
name,
spec.expectedEventIds(),
spec.joinKeys(),
spec.size(),
spec.maxSamplesPerKey(),
spec.allowPartialSearchResults(),
spec.allowPartialSequenceResults(),
spec.expectShardFailures() }
);
}

Expand All @@ -118,7 +133,10 @@ protected static List<Object[]> asArray(List<EqlSpec> specs) {
List<long[]> eventIds,
String[] joinKeys,
Integer size,
Integer maxSamplesPerKey
Integer maxSamplesPerKey,
Boolean allowPartialSearchResults,
Boolean allowPartialSequenceResults,
Boolean expectShardFailures
) {
this.index = index;

Expand All @@ -128,6 +146,9 @@ protected static List<Object[]> asArray(List<EqlSpec> specs) {
this.joinKeys = joinKeys;
this.size = size == null ? -1 : size;
this.maxSamplesPerKey = maxSamplesPerKey == null ? -1 : maxSamplesPerKey;
this.allowPartialSearchResults = allowPartialSearchResults;
this.allowPartialSequenceResults = allowPartialSequenceResults;
this.expectShardFailures = expectShardFailures;
}

public void test() throws Exception {
Expand All @@ -137,6 +158,7 @@ public void test() throws Exception {
private void assertResponse(ObjectPath response) throws Exception {
List<Map<String, Object>> events = response.evaluate("hits.events");
List<Map<String, Object>> sequences = response.evaluate("hits.sequences");
Object shardFailures = response.evaluate("shard_failures");

if (events != null) {
assertEvents(events);
Expand All @@ -145,6 +167,7 @@ private void assertResponse(ObjectPath response) throws Exception {
} else {
fail("No events or sequences found");
}
assertShardFailures(shardFailures);
}

protected ObjectPath runQuery(String index, String query) throws Exception {
Expand All @@ -163,13 +186,56 @@ protected ObjectPath runQuery(String index, String query) throws Exception {
if (maxSamplesPerKey > 0) {
builder.field("max_samples_per_key", maxSamplesPerKey);
}
boolean allowPartialResultsInBody = randomBoolean();
if (allowPartialSearchResults != null) {
if (allowPartialResultsInBody) {
builder.field("allow_partial_search_results", String.valueOf(allowPartialSearchResults));
if (allowPartialSequenceResults != null) {
builder.field("allow_partial_sequence_results", String.valueOf(allowPartialSequenceResults));
}
} else {
// these will be overwritten by the path params, that have higher priority than the query (JSON body) params
if (allowPartialSearchResults != null) {
builder.field("allow_partial_search_results", randomBoolean());
}
if (allowPartialSequenceResults != null) {
builder.field("allow_partial_sequence_results", randomBoolean());
}
}
} else {
// Tests that don't specify a setting for these parameters should always pass.
// These params should be irrelevant.
if (randomBoolean()) {
builder.field("allow_partial_search_results", randomBoolean());
}
if (randomBoolean()) {
builder.field("allow_partial_sequence_results", randomBoolean());
}
}
builder.endObject();

Request request = new Request("POST", "/" + index + "/_eql/search");
Boolean ccsMinimizeRoundtrips = ccsMinimizeRoundtrips();
if (ccsMinimizeRoundtrips != null) {
request.addParameter("ccs_minimize_roundtrips", ccsMinimizeRoundtrips.toString());
}
if (allowPartialSearchResults != null) {
if (allowPartialResultsInBody == false) {
request.addParameter("allow_partial_search_results", String.valueOf(allowPartialSearchResults));
if (allowPartialSequenceResults != null) {
request.addParameter("allow_partial_sequence_results", String.valueOf(allowPartialSequenceResults));
}
}
} else {
// Tests that don't specify a setting for these parameters should always pass.
// These params should be irrelevant.
if (randomBoolean()) {
request.addParameter("allow_partial_search_results", String.valueOf(randomBoolean()));
}
if (randomBoolean()) {
request.addParameter("allow_partial_sequence_results", String.valueOf(randomBoolean()));
}
}
int timeout = Math.toIntExact(timeout().millis());
RequestConfig config = RequestConfig.copy(RequestConfig.DEFAULT)
.setConnectionRequestTimeout(timeout)
Expand All @@ -182,6 +248,20 @@ protected ObjectPath runQuery(String index, String query) throws Exception {
return ObjectPath.createFromResponse(client().performRequest(request));
}

private void assertShardFailures(Object shardFailures) {
if (expectShardFailures != null) {
if (expectShardFailures) {
assertNotNull(shardFailures);
List<?> list = (List<?>) shardFailures;
assertThat(list.size(), is(greaterThan(0)));
} else {
assertNull(shardFailures);
}
} else {
assertNull(shardFailures);
}
}

private void assertEvents(List<Map<String, Object>> events) {
assertNotNull(events);
logger.debug("Events {}", new Object() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
*/
public class DataLoader {
public static final String TEST_INDEX = "endgame-140";
public static final String TEST_SHARD_FAILURES_INDEX = "endgame-shard-failures";
public static final String TEST_EXTRA_INDEX = "extra";
public static final String TEST_NANOS_INDEX = "endgame-140-nanos";
public static final String TEST_SAMPLE = "sample1,sample2,sample3";
Expand Down Expand Up @@ -103,6 +104,11 @@ public static void loadDatasetIntoEs(RestClient client, CheckedBiFunction<XConte
//
load(client, TEST_MISSING_EVENTS_INDEX, null, null, p);
load(client, TEST_SAMPLE_MULTI, null, null, p);
//
// index with a runtime field ("broken", type long) that causes shard failures.
// the rest of the mapping is the same as TEST_INDEX
//
load(client, TEST_SHARD_FAILURES_INDEX, null, DataLoader::timestampToUnixMillis, p);
}

private static void load(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,23 @@ public EqlDateNanosSpecTestCase(
List<long[]> eventIds,
String[] joinKeys,
Integer size,
Integer maxSamplesPerKey
Integer maxSamplesPerKey,
Boolean allowPartialSearchResults,
Boolean allowPartialSequenceResults,
Boolean expectShardFailures
) {
this(TEST_NANOS_INDEX, query, name, eventIds, joinKeys, size, maxSamplesPerKey);
this(
TEST_NANOS_INDEX,
query,
name,
eventIds,
joinKeys,
size,
maxSamplesPerKey,
allowPartialSearchResults,
allowPartialSequenceResults,
expectShardFailures
);
}

// constructor for multi-cluster tests
Expand All @@ -40,9 +54,23 @@ public EqlDateNanosSpecTestCase(
List<long[]> eventIds,
String[] joinKeys,
Integer size,
Integer maxSamplesPerKey
Integer maxSamplesPerKey,
Boolean allowPartialSearchResults,
Boolean allowPartialSequenceResults,
Boolean expectShardFailures
) {
super(index, query, name, eventIds, joinKeys, size, maxSamplesPerKey);
super(
index,
query,
name,
eventIds,
joinKeys,
size,
maxSamplesPerKey,
allowPartialSearchResults,
allowPartialSequenceResults,
expectShardFailures
);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,23 @@ public EqlExtraSpecTestCase(
List<long[]> eventIds,
String[] joinKeys,
Integer size,
Integer maxSamplesPerKey
Integer maxSamplesPerKey,
Boolean allowPartialSearchResults,
Boolean allowPartialSequenceResults,
Boolean expectShardFailures
) {
this(TEST_EXTRA_INDEX, query, name, eventIds, joinKeys, size, maxSamplesPerKey);
this(
TEST_EXTRA_INDEX,
query,
name,
eventIds,
joinKeys,
size,
maxSamplesPerKey,
allowPartialSearchResults,
allowPartialSequenceResults,
expectShardFailures
);
}

// constructor for multi-cluster tests
Expand All @@ -40,9 +54,23 @@ public EqlExtraSpecTestCase(
List<long[]> eventIds,
String[] joinKeys,
Integer size,
Integer maxSamplesPerKey
Integer maxSamplesPerKey,
Boolean allowPartialSearchResults,
Boolean allowPartialSequenceResults,
Boolean expectShardFailures
) {
super(index, query, name, eventIds, joinKeys, size, maxSamplesPerKey);
super(
index,
query,
name,
eventIds,
joinKeys,
size,
maxSamplesPerKey,
allowPartialSearchResults,
allowPartialSequenceResults,
expectShardFailures
);
}

@Override
Expand Down
Loading

0 comments on commit 6b33e31

Please sign in to comment.