Skip to content

Commit

Permalink
indirect slowlog provider is not stateful
Browse files Browse the repository at this point in the history
  • Loading branch information
rjernst committed Dec 27, 2024
1 parent 974f346 commit a0a4a54
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ public final class IndexingSlowLog implements IndexingOperationListener {
* <em>characters</em> of the source.
*/
private int maxSourceCharsToLog;
private final SlowLogFieldProvider slowLogFieldProvider;
private final SlowLogFields slowLogFields;

/**
* Reads how much of the source to log. The user can specify any value they
Expand All @@ -126,7 +126,7 @@ public final class IndexingSlowLog implements IndexingOperationListener {
);

IndexingSlowLog(IndexSettings indexSettings, SlowLogFieldProvider slowLogFieldProvider) {
this.slowLogFieldProvider = slowLogFieldProvider;
this.slowLogFields = slowLogFieldProvider.create(indexSettings);
this.index = indexSettings.getIndex();

indexSettings.getScopedSettings().addSettingsUpdateConsumer(INDEX_INDEXING_SLOWLOG_REFORMAT_SETTING, this::setReformat);
Expand Down Expand Up @@ -180,7 +180,7 @@ public void postIndex(ShardId shardId, Engine.Index indexOperation, Engine.Index
if (indexWarnThreshold >= 0 && tookInNanos > indexWarnThreshold) {
indexLogger.warn(
IndexingSlowLogMessage.of(
this.slowLogFieldProvider.indexSlowLogFields(),
this.slowLogFields.indexFields(),
index,
doc,
tookInNanos,
Expand All @@ -191,7 +191,7 @@ public void postIndex(ShardId shardId, Engine.Index indexOperation, Engine.Index
} else if (indexInfoThreshold >= 0 && tookInNanos > indexInfoThreshold) {
indexLogger.info(
IndexingSlowLogMessage.of(
this.slowLogFieldProvider.indexSlowLogFields(),
this.slowLogFields.indexFields(),
index,
doc,
tookInNanos,
Expand All @@ -202,7 +202,7 @@ public void postIndex(ShardId shardId, Engine.Index indexOperation, Engine.Index
} else if (indexDebugThreshold >= 0 && tookInNanos > indexDebugThreshold) {
indexLogger.debug(
IndexingSlowLogMessage.of(
this.slowLogFieldProvider.indexSlowLogFields(),
this.slowLogFields.indexFields(),
index,
doc,
tookInNanos,
Expand All @@ -213,7 +213,7 @@ public void postIndex(ShardId shardId, Engine.Index indexOperation, Engine.Index
} else if (indexTraceThreshold >= 0 && tookInNanos > indexTraceThreshold) {
indexLogger.trace(
IndexingSlowLogMessage.of(
this.slowLogFieldProvider.indexSlowLogFields(),
this.slowLogFields.indexFields(),
index,
doc,
tookInNanos,
Expand Down
21 changes: 10 additions & 11 deletions server/src/main/java/org/elasticsearch/index/SearchSlowLog.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public final class SearchSlowLog implements SearchOperationListener {
private static final Logger queryLogger = LogManager.getLogger(INDEX_SEARCH_SLOWLOG_PREFIX + ".query");
private static final Logger fetchLogger = LogManager.getLogger(INDEX_SEARCH_SLOWLOG_PREFIX + ".fetch");

private final SlowLogFieldProvider slowLogFieldProvider;
private final SlowLogFields slowLogFields;

public static final Setting<Boolean> INDEX_SEARCH_SLOWLOG_INCLUDE_USER_SETTING = Setting.boolSetting(
INDEX_SEARCH_SLOWLOG_PREFIX + ".include.user",
Expand Down Expand Up @@ -127,8 +127,7 @@ public final class SearchSlowLog implements SearchOperationListener {
private static final ToXContent.Params FORMAT_PARAMS = new ToXContent.MapParams(Collections.singletonMap("pretty", "false"));

public SearchSlowLog(IndexSettings indexSettings, SlowLogFieldProvider slowLogFieldProvider) {
slowLogFieldProvider.init(indexSettings);
this.slowLogFieldProvider = slowLogFieldProvider;
this.slowLogFields = slowLogFieldProvider.create(indexSettings);
indexSettings.getScopedSettings()
.addSettingsUpdateConsumer(INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_WARN_SETTING, this::setQueryWarnThreshold);
this.queryWarnThreshold = indexSettings.getValue(INDEX_SEARCH_SLOWLOG_THRESHOLD_QUERY_WARN_SETTING).nanos();
Expand Down Expand Up @@ -159,26 +158,26 @@ public SearchSlowLog(IndexSettings indexSettings, SlowLogFieldProvider slowLogFi
@Override
public void onQueryPhase(SearchContext context, long tookInNanos) {
if (queryWarnThreshold >= 0 && tookInNanos > queryWarnThreshold) {
queryLogger.warn(SearchSlowLogMessage.of(this.slowLogFieldProvider.searchSlowLogFields(), context, tookInNanos));
queryLogger.warn(SearchSlowLogMessage.of(this.slowLogFields.searchFields(), context, tookInNanos));
} else if (queryInfoThreshold >= 0 && tookInNanos > queryInfoThreshold) {
queryLogger.info(SearchSlowLogMessage.of(this.slowLogFieldProvider.searchSlowLogFields(), context, tookInNanos));
queryLogger.info(SearchSlowLogMessage.of(this.slowLogFields.searchFields(), context, tookInNanos));
} else if (queryDebugThreshold >= 0 && tookInNanos > queryDebugThreshold) {
queryLogger.debug(SearchSlowLogMessage.of(this.slowLogFieldProvider.searchSlowLogFields(), context, tookInNanos));
queryLogger.debug(SearchSlowLogMessage.of(this.slowLogFields.searchFields(), context, tookInNanos));
} else if (queryTraceThreshold >= 0 && tookInNanos > queryTraceThreshold) {
queryLogger.trace(SearchSlowLogMessage.of(this.slowLogFieldProvider.searchSlowLogFields(), context, tookInNanos));
queryLogger.trace(SearchSlowLogMessage.of(this.slowLogFields.searchFields(), context, tookInNanos));
}
}

@Override
public void onFetchPhase(SearchContext context, long tookInNanos) {
if (fetchWarnThreshold >= 0 && tookInNanos > fetchWarnThreshold) {
fetchLogger.warn(SearchSlowLogMessage.of(this.slowLogFieldProvider.searchSlowLogFields(), context, tookInNanos));
fetchLogger.warn(SearchSlowLogMessage.of(this.slowLogFields.searchFields(), context, tookInNanos));
} else if (fetchInfoThreshold >= 0 && tookInNanos > fetchInfoThreshold) {
fetchLogger.info(SearchSlowLogMessage.of(this.slowLogFieldProvider.searchSlowLogFields(), context, tookInNanos));
fetchLogger.info(SearchSlowLogMessage.of(this.slowLogFields.searchFields(), context, tookInNanos));
} else if (fetchDebugThreshold >= 0 && tookInNanos > fetchDebugThreshold) {
fetchLogger.debug(SearchSlowLogMessage.of(this.slowLogFieldProvider.searchSlowLogFields(), context, tookInNanos));
fetchLogger.debug(SearchSlowLogMessage.of(this.slowLogFields.searchFields(), context, tookInNanos));
} else if (fetchTraceThreshold >= 0 && tookInNanos > fetchTraceThreshold) {
fetchLogger.trace(SearchSlowLogMessage.of(this.slowLogFieldProvider.searchSlowLogFields(), context, tookInNanos));
fetchLogger.trace(SearchSlowLogMessage.of(this.slowLogFields.searchFields(), context, tookInNanos));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,14 @@

package org.elasticsearch.index;

import java.util.Map;

/**
* Interface for providing additional fields to the slow log from a plugin.
* Intended to be loaded through SPI.
*/
public interface SlowLogFieldProvider {
/**
* Initialize field provider with index level settings to be able to listen for updates and set initial values
* Create a field provider with index level settings to be able to listen for updates and set initial values
* @param indexSettings settings for the index
*/
void init(IndexSettings indexSettings);

/**
* Slow log fields for indexing events
* @return map of field name to value
*/
Map<String, String> indexSlowLogFields();

/**
* Slow log fields for search events
* @return map of field name to value
*/
Map<String, String> searchSlowLogFields();
SlowLogFields create(IndexSettings indexSettings);
}
30 changes: 30 additions & 0 deletions server/src/main/java/org/elasticsearch/index/SlowLogFields.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.index;

import java.util.Map;

/**
* Fields for the slow log. These may be different each call depending on the state of the system.
*/
public interface SlowLogFields {

/**
* Slow log fields for indexing events
* @return map of field name to value
*/
Map<String, String> indexFields();

/**
* Slow log fields for search events
* @return map of field name to value
*/
Map<String, String> searchFields();
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.elasticsearch.gateway.MetaStateService;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.SlowLogFieldProvider;
import org.elasticsearch.index.SlowLogFields;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.engine.EngineFactory;
import org.elasticsearch.index.mapper.MapperMetrics;
Expand Down Expand Up @@ -82,16 +83,18 @@ public class IndicesServiceBuilder {
QueryRewriteInterceptor queryRewriteInterceptor = null;
SlowLogFieldProvider slowLogFieldProvider = new SlowLogFieldProvider() {
@Override
public void init(IndexSettings indexSettings) {}

@Override
public Map<String, String> indexSlowLogFields() {
return Map.of();
}
public SlowLogFields create(IndexSettings indexSettings) {
return new SlowLogFields() {
@Override
public Map<String, String> indexFields() {
return Map.of();
}

@Override
public Map<String, String> searchSlowLogFields() {
return Map.of();
@Override
public Map<String, String> searchFields() {
return Map.of();
}
};
}
};

Expand Down
35 changes: 17 additions & 18 deletions server/src/main/java/org/elasticsearch/node/NodeConstruction.java
Original file line number Diff line number Diff line change
Expand Up @@ -111,9 +111,9 @@
import org.elasticsearch.index.IndexMode;
import org.elasticsearch.index.IndexSettingProvider;
import org.elasticsearch.index.IndexSettingProviders;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexingPressure;
import org.elasticsearch.index.SlowLogFieldProvider;
import org.elasticsearch.index.SlowLogFields;
import org.elasticsearch.index.analysis.AnalysisRegistry;
import org.elasticsearch.index.mapper.MapperMetrics;
import org.elasticsearch.index.mapper.SourceFieldMetrics;
Expand Down Expand Up @@ -810,25 +810,24 @@ private void construct(
List<? extends SlowLogFieldProvider> slowLogFieldProviders = pluginsService.loadServiceProviders(SlowLogFieldProvider.class);
// NOTE: the response of index/search slow log fields below must be calculated dynamically on every call
// because the responses may change dynamically at runtime
SlowLogFieldProvider slowLogFieldProvider = new SlowLogFieldProvider() {
@Override
public void init(IndexSettings indexSettings) {
slowLogFieldProviders.forEach(provider -> provider.init(indexSettings));
}

@Override
public Map<String, String> indexSlowLogFields() {
return slowLogFieldProviders.stream()
.flatMap(provider -> provider.indexSlowLogFields().entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
SlowLogFieldProvider slowLogFieldProvider = indexSettings -> {
final List<SlowLogFields> fields = new ArrayList<>();
for (var provider : slowLogFieldProviders) {
fields.add(provider.create(indexSettings));
}
return new SlowLogFields() {
@Override
public Map<String, String> indexFields() {
return fields.stream().flatMap(f -> f.indexFields().entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

@Override
public Map<String, String> searchSlowLogFields() {
return slowLogFieldProviders.stream()
.flatMap(provider -> provider.searchSlowLogFields().entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
@Override
public Map<String, String> searchFields() {
return fields.stream().flatMap(f -> f.searchFields().entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
};
};

IndicesService indicesService = new IndicesServiceBuilder().settings(settings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.IndexVersions;
import org.elasticsearch.index.SlowLogFieldProvider;
import org.elasticsearch.index.SlowLogFields;
import org.elasticsearch.index.engine.Engine;
import org.elasticsearch.index.engine.EngineConfig;
import org.elasticsearch.index.engine.EngineFactory;
Expand Down Expand Up @@ -209,16 +210,18 @@ static void setFields(Map<String, String> fields) {
}

@Override
public void init(IndexSettings indexSettings) {}

@Override
public Map<String, String> indexSlowLogFields() {
return fields;
}

@Override
public Map<String, String> searchSlowLogFields() {
return fields;
public SlowLogFields create(IndexSettings indexSettings) {
return new SlowLogFields() {
@Override
public Map<String, String> indexFields() {
return fields;
}

@Override
public Map<String, String> searchFields() {
return fields;
}
};
}
}

Expand All @@ -231,16 +234,18 @@ static void setFields(Map<String, String> fields) {
}

@Override
public void init(IndexSettings indexSettings) {}

@Override
public Map<String, String> indexSlowLogFields() {
return fields;
}

@Override
public Map<String, String> searchSlowLogFields() {
return fields;
public SlowLogFields create(IndexSettings indexSettings) {
return new SlowLogFields() {
@Override
public Map<String, String> indexFields() {
return fields;
}

@Override
public Map<String, String> searchFields() {
return fields;
}
};
}
}

Expand Down Expand Up @@ -806,32 +811,33 @@ public void testLoadSlowLogFieldProvider() {

var indicesService = getIndicesService();
SlowLogFieldProvider fieldProvider = indicesService.slowLogFieldProvider;
SlowLogFields fields = fieldProvider.create(null);

// The map of fields from the two providers are merged to a single map of fields
assertEquals(Map.of("key1", "value1", "key2", "value2"), fieldProvider.searchSlowLogFields());
assertEquals(Map.of("key1", "value1", "key2", "value2"), fieldProvider.indexSlowLogFields());
assertEquals(Map.of("key1", "value1", "key2", "value2"), fields.searchFields());
assertEquals(Map.of("key1", "value1", "key2", "value2"), fields.indexFields());

TestSlowLogFieldProvider.setFields(Map.of("key1", "value1"));
TestAnotherSlowLogFieldProvider.setFields(Map.of("key1", "value2"));

// There is an overlap of field names, since this isn't deterministic and probably a
// programming error (two providers provide the same field) throw an exception
assertThrows(IllegalStateException.class, fieldProvider::searchSlowLogFields);
assertThrows(IllegalStateException.class, fieldProvider::indexSlowLogFields);
assertThrows(IllegalStateException.class, fields::searchFields);
assertThrows(IllegalStateException.class, fields::indexFields);

TestSlowLogFieldProvider.setFields(Map.of("key1", "value1"));
TestAnotherSlowLogFieldProvider.setFields(Map.of());

// One provider has no fields
assertEquals(Map.of("key1", "value1"), fieldProvider.searchSlowLogFields());
assertEquals(Map.of("key1", "value1"), fieldProvider.indexSlowLogFields());
assertEquals(Map.of("key1", "value1"), fields.searchFields());
assertEquals(Map.of("key1", "value1"), fields.indexFields());

TestSlowLogFieldProvider.setFields(Map.of());
TestAnotherSlowLogFieldProvider.setFields(Map.of());

// Both providers have no fields
assertEquals(Map.of(), fieldProvider.searchSlowLogFields());
assertEquals(Map.of(), fieldProvider.indexSlowLogFields());
assertEquals(Map.of(), fields.searchFields());
assertEquals(Map.of(), fields.indexFields());
}

public void testWithTempIndexServiceHandlesExistingIndex() throws Exception {
Expand Down
Loading

0 comments on commit a0a4a54

Please sign in to comment.