Skip to content

Commit

Permalink
Merge branch 'main' into buildinfo/version_qualifier
Browse files Browse the repository at this point in the history
  • Loading branch information
rjernst committed Oct 16, 2023
2 parents c74e9c7 + e4ea68a commit 2d0e23d
Show file tree
Hide file tree
Showing 74 changed files with 2,303 additions and 143 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/100383.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 100383
summary: Push s3 requests count via metrics API
area: Distributed
type: enhancement
issues: []
6 changes: 6 additions & 0 deletions docs/changelog/100492.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 100492
summary: Add runtime field of type `geo_shape`
area: Geo
type: enhancement
issues:
- 61299
6 changes: 6 additions & 0 deletions docs/changelog/100846.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
pr: 100846
summary: Consistent scores for multi-term `SourceConfirmedTestQuery`
area: Search
type: bug
issues:
- 98712
5 changes: 5 additions & 0 deletions docs/changelog/100886.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 100886
summary: Use the correct writable name for model assignment metadata in mixed version clusters. Prevents a node failure due to IllegalArgumentException Unknown NamedWriteable [trained_model_assignment]
area: Machine Learning
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.Strings;
import org.elasticsearch.core.Tuple;
import org.elasticsearch.geometry.Geometry;
import org.elasticsearch.geometry.Point;
import org.elasticsearch.index.Index;
import org.elasticsearch.index.IndexService;
Expand All @@ -76,6 +77,7 @@
import org.elasticsearch.script.DoubleFieldScript;
import org.elasticsearch.script.FilterScript;
import org.elasticsearch.script.GeoPointFieldScript;
import org.elasticsearch.script.GeometryFieldScript;
import org.elasticsearch.script.IpFieldScript;
import org.elasticsearch.script.LongFieldScript;
import org.elasticsearch.script.ScoreScript;
Expand Down Expand Up @@ -681,6 +683,25 @@ static Response innerShardOperation(Request request, ScriptService scriptService
);
return new Response(format.apply(points));
}, indexService);
} else if (scriptContext == GeometryFieldScript.CONTEXT) {
return prepareRamIndex(request, (context, leafReaderContext) -> {
GeometryFieldScript.Factory factory = scriptService.compile(request.script, GeometryFieldScript.CONTEXT);
GeometryFieldScript.LeafFactory leafFactory = factory.newFactory(
GeometryFieldScript.CONTEXT.name,
request.getScript().getParams(),
context.lookup(),
OnScriptError.FAIL
);
GeometryFieldScript geometryFieldScript = leafFactory.newInstance(leafReaderContext);
List<Geometry> geometries = new ArrayList<>();
geometryFieldScript.runForDoc(0, geometries::add);
// convert geometries to the standard format of the fields api
Function<List<Geometry>, List<Object>> format = GeometryFormatterFactory.getFormatter(
GeometryFormatterFactory.GEOJSON,
Function.identity()
);
return new Response(format.apply(geometries));
}, indexService);
} else if (scriptContext == IpFieldScript.CONTEXT) {
return prepareRamIndex(request, (context, leafReaderContext) -> {
IpFieldScript.Factory factory = scriptService.compile(request.script, IpFieldScript.CONTEXT);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License
# 2.0 and the Server Side Public License, v 1; you may not use this file except
# in compliance with, at your election, the Elastic License 2.0 or the Server
# Side Public License, v 1.
#

# The whitelist for runtime fields that generate geometries

# These two whitelists are required for painless to find the classes
class org.elasticsearch.script.GeometryFieldScript @no_import {
}
class org.elasticsearch.script.GeometryFieldScript$Factory @no_import {
}

static_import {
# The `emit` callback to collect values for the field
void emit(org.elasticsearch.script.GeometryFieldScript, Object) bound_to org.elasticsearch.script.GeometryFieldScript$Emit
}
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,51 @@ public void testGeoPointFieldExecutionContext() throws IOException {
assertEquals("Point", points.get(1).get("type"));
}

@SuppressWarnings("unchecked")
public void testGeometryFieldExecutionContext() throws IOException {
ScriptService scriptService = getInstanceFromNode(ScriptService.class);
IndexService indexService = createIndex("index", Settings.EMPTY, "doc", "test_point", "type=geo_point");

Request.ContextSetup contextSetup = new Request.ContextSetup(
"index",
new BytesArray("{\"test_point\":\"30.0,40.0\"}"),
new MatchAllQueryBuilder()
);
contextSetup.setXContentType(XContentType.JSON);
Request request = new Request(
new Script(
ScriptType.INLINE,
"painless",
"emit(\"Point(\" + doc['test_point'].value.lon + \" \" + doc['test_point'].value.lat + \")\")",
emptyMap()
),
"geometry_field",
contextSetup
);
Response response = innerShardOperation(request, scriptService, indexService);
List<Map<String, Object>> geometry = (List<Map<String, Object>>) response.getResult();
assertEquals(40.0, (double) ((List<Object>) geometry.get(0).get("coordinates")).get(0), 0.00001);
assertEquals(30.0, (double) ((List<Object>) geometry.get(0).get("coordinates")).get(1), 0.00001);
assertEquals("Point", geometry.get(0).get("type"));

contextSetup = new Request.ContextSetup("index", new BytesArray("{}"), new MatchAllQueryBuilder());
contextSetup.setXContentType(XContentType.JSON);
request = new Request(
new Script(
ScriptType.INLINE,
"painless",
"emit(\"LINESTRING(78.96 12.12, 12.12 78.96)\"); emit(\"POINT(13.45 56.78)\");",
emptyMap()
),
"geometry_field",
contextSetup
);
response = innerShardOperation(request, scriptService, indexService);
geometry = (List<Map<String, Object>>) response.getResult();
assertEquals("GeometryCollection", geometry.get(0).get("type"));
assertEquals(2, ((List<?>) geometry.get(0).get("geometries")).size());
}

public void testIpFieldExecutionContext() throws IOException {
ScriptService scriptService = getInstanceFromNode(ScriptService.class);
IndexService indexService = createIndex("index", Settings.EMPTY, "doc", "test_ip", "type=ip");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@
import java.io.UncheckedIOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -215,7 +215,9 @@ public Weight createWeight(IndexSearcher searcher, ScoreMode scoreMode, float bo
return in.createWeight(searcher, scoreMode, boost);
}

final Set<Term> terms = new HashSet<>();
// We use a LinkedHashSet here to preserve the ordering of terms to ensure that
// later summing of float scores per term is consistent
final Set<Term> terms = new LinkedHashSet<>();
in.visit(QueryVisitor.termCollector(terms));
if (terms.isEmpty()) {
throw new IllegalStateException("Query " + in + " doesn't have any term");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,6 @@ public void testPhrase() throws Exception {
}
}

@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/98712")
public void testMultiPhrase() throws Exception {
try (Directory dir = newDirectory(); IndexWriter w = new IndexWriter(dir, newIndexWriterConfig(Lucene.STANDARD_ANALYZER))) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,14 @@
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.indices.recovery.RecoverySettings;
import org.elasticsearch.plugins.Plugin;
import org.elasticsearch.plugins.PluginsService;
import org.elasticsearch.plugins.TelemetryPlugin;
import org.elasticsearch.repositories.RepositoriesService;
import org.elasticsearch.repositories.Repository;
import org.elasticsearch.repositories.RepositoryData;
Expand All @@ -48,7 +51,11 @@
import org.elasticsearch.snapshots.SnapshotState;
import org.elasticsearch.snapshots.SnapshotsService;
import org.elasticsearch.snapshots.mockstore.BlobStoreWrapper;
import org.elasticsearch.telemetry.DelegatingMeter;
import org.elasticsearch.telemetry.TelemetryProvider;
import org.elasticsearch.telemetry.metric.LongCounter;
import org.elasticsearch.telemetry.metric.Meter;
import org.elasticsearch.telemetry.tracing.Tracer;
import org.elasticsearch.test.BackgroundIndexer;
import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.junit.annotations.TestLogging;
Expand All @@ -62,18 +69,26 @@
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;

import static org.elasticsearch.repositories.RepositoriesModule.METRIC_REQUESTS_COUNT;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.hasEntry;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
Expand Down Expand Up @@ -126,7 +141,7 @@ protected Settings repositorySettings(String repoName) {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Collections.singletonList(TestS3RepositoryPlugin.class);
return List.of(TestS3RepositoryPlugin.class, TestTelemetryPlugin.class);
}

@Override
Expand Down Expand Up @@ -212,6 +227,68 @@ public void testAbortRequestStats() throws Exception {
assertEquals(assertionErrorMsg, mockCalls, sdkRequestCounts);
}

public void testMetrics() throws Exception {
// Create the repository and perform some activities
final String repository = createRepository(randomRepositoryName());
final String index = "index-no-merges";
createIndex(index, 1, 0);

final long nbDocs = randomLongBetween(10_000L, 20_000L);
try (BackgroundIndexer indexer = new BackgroundIndexer(index, client(), (int) nbDocs)) {
waitForDocs(nbDocs, indexer);
}
flushAndRefresh(index);
ForceMergeResponse forceMerge = client().admin().indices().prepareForceMerge(index).setFlush(true).setMaxNumSegments(1).get();
assertThat(forceMerge.getSuccessfulShards(), equalTo(1));
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);

final String snapshot = "snapshot";
assertSuccessfulSnapshot(clusterAdmin().prepareCreateSnapshot(repository, snapshot).setWaitForCompletion(true).setIndices(index));
assertAcked(client().admin().indices().prepareDelete(index));
assertSuccessfulRestore(clusterAdmin().prepareRestoreSnapshot(repository, snapshot).setWaitForCompletion(true));
ensureGreen(index);
assertHitCount(client().prepareSearch(index).setSize(0).setTrackTotalHits(true).get(), nbDocs);
assertAcked(clusterAdmin().prepareDeleteSnapshot(repository, snapshot).get());

final Map<String, Long> aggregatedMetrics = new HashMap<>();
// Compare collected stats and metrics for each node and they should be the same
for (var nodeName : internalCluster().getNodeNames()) {
final BlobStoreRepository blobStoreRepository;
try {
blobStoreRepository = (BlobStoreRepository) internalCluster().getInstance(RepositoriesService.class, nodeName)
.repository(repository);
} catch (RepositoryMissingException e) {
continue;
}

final BlobStore blobStore = blobStoreRepository.blobStore();
final BlobStore delegateBlobStore = ((BlobStoreWrapper) blobStore).delegate();
final S3BlobStore s3BlobStore = (S3BlobStore) delegateBlobStore;
final Map<S3BlobStore.StatsKey, S3BlobStore.IgnoreNoResponseMetricsCollector> statsCollectors = s3BlobStore
.getStatsCollectors().collectors;

final var plugins = internalCluster().getInstance(PluginsService.class, nodeName).filterPlugins(TestTelemetryPlugin.class);
assertThat(plugins, hasSize(1));
final Map<Map<String, Object>, AtomicLong> metrics = plugins.get(0).metrics;

assertThat(statsCollectors.size(), equalTo(metrics.size()));
metrics.forEach((attributes, counter) -> {
final S3BlobStore.Operation operation = S3BlobStore.Operation.parse((String) attributes.get("operation"));
final S3BlobStore.StatsKey statsKey = new S3BlobStore.StatsKey(
operation,
OperationPurpose.parse((String) attributes.get("purpose"))
);
assertThat(statsCollectors, hasKey(statsKey));
assertThat(counter.get(), equalTo(statsCollectors.get(statsKey).counter.sum()));

aggregatedMetrics.compute(operation.getKey(), (k, v) -> v == null ? counter.get() : v + counter.get());
});
}

// Metrics number should be consistent with server side request count as well.
assertThat(aggregatedMetrics, equalTo(getMockRequestCounts()));
}

public void testRequestStatsWithOperationPurposes() throws IOException {
final String repoName = createRepository(randomRepositoryName());
final RepositoriesService repositoriesService = internalCluster().getCurrentMasterNodeInstance(RepositoriesService.class);
Expand Down Expand Up @@ -356,7 +433,7 @@ protected S3Repository createRepository(
BigArrays bigArrays,
RecoverySettings recoverySettings
) {
return new S3Repository(metadata, registry, getService(), clusterService, bigArrays, recoverySettings, Meter.NOOP) {
return new S3Repository(metadata, registry, getService(), clusterService, bigArrays, recoverySettings, getMeter()) {

@Override
public BlobStore blobStore() {
Expand Down Expand Up @@ -476,4 +553,64 @@ private boolean isMultiPartUpload(String request) {
|| Regex.simpleMatch("PUT /*/*?*uploadId=*", request);
}
}

public static class TestTelemetryPlugin extends Plugin implements TelemetryPlugin {

private final Map<Map<String, Object>, AtomicLong> metrics = ConcurrentCollections.newConcurrentMap();

private final LongCounter longCounter = new LongCounter() {
@Override
public void increment() {
throw new UnsupportedOperationException();
}

@Override
public void incrementBy(long inc) {
throw new UnsupportedOperationException();
}

@Override
public void incrementBy(long inc, Map<String, Object> attributes) {
assertThat(
attributes,
allOf(hasEntry("repo_type", S3Repository.TYPE), hasKey("repo_name"), hasKey("operation"), hasKey("purpose"))
);
metrics.computeIfAbsent(attributes, k -> new AtomicLong()).addAndGet(inc);
}

@Override
public String getName() {
return METRIC_REQUESTS_COUNT;
}
};

private final Meter meter = new DelegatingMeter(Meter.NOOP) {
@Override
public LongCounter registerLongCounter(String name, String description, String unit) {
assertThat(name, equalTo(METRIC_REQUESTS_COUNT));
return longCounter;
}

@Override
public LongCounter getLongCounter(String name) {
assertThat(name, equalTo(METRIC_REQUESTS_COUNT));
return longCounter;
}
};

@Override
public TelemetryProvider getTelemetryProvider(Settings settings) {
return new TelemetryProvider() {
@Override
public Tracer getTracer() {
return Tracer.NOOP;
}

@Override
public Meter getMeter() {
return meter;
}
};
}
}
}
Loading

0 comments on commit 2d0e23d

Please sign in to comment.