Skip to content

Commit

Permalink
Field configuration helper cache implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
dtspence committed Oct 16, 2024
1 parent 3dd29d0 commit c35be9b
Show file tree
Hide file tree
Showing 4 changed files with 203 additions and 1 deletion.
15 changes: 15 additions & 0 deletions warehouse/ingest-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,21 @@
<artifactId>javassist</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package datawave.ingest.data.config;

import java.util.EnumMap;
import java.util.Map;
import java.util.function.Function;

import org.apache.commons.collections4.map.LRUMap;

import com.google.common.annotations.VisibleForTesting;

public class CachedFieldConfigHelper implements FieldConfigHelper {
private final FieldConfigHelper underlyingHelper;
private final Map<String, ResultEntry> resultCache;

enum AttributeType {
INDEXED_FIELD, REVERSE_INDEXED_FIELD, TOKENIZED_FIELD, REVERSE_TOKENIZED_FIELD, STORED_FIELD, INDEXED_ONLY
}

public CachedFieldConfigHelper(FieldConfigHelper helper, int limit) {
if (limit < 1) {
throw new IllegalArgumentException("Limit must be a positive integer");
}
this.underlyingHelper = helper;
this.resultCache = new LRUMap<>(limit);
}

@Override
public boolean isStoredField(String fieldName) {
return getOrEvaluate(AttributeType.STORED_FIELD, fieldName, underlyingHelper::isStoredField);
}

@Override
public boolean isIndexedField(String fieldName) {
return getOrEvaluate(AttributeType.INDEXED_FIELD, fieldName, underlyingHelper::isIndexedField);
}

@Override
public boolean isIndexOnlyField(String fieldName) {
return getOrEvaluate(AttributeType.INDEXED_ONLY, fieldName, underlyingHelper::isIndexOnlyField);
}

@Override
public boolean isReverseIndexedField(String fieldName) {
return getOrEvaluate(AttributeType.REVERSE_INDEXED_FIELD, fieldName, underlyingHelper::isReverseIndexedField);
}

@Override
public boolean isTokenizedField(String fieldName) {
return getOrEvaluate(AttributeType.TOKENIZED_FIELD, fieldName, underlyingHelper::isTokenizedField);
}

@Override
public boolean isReverseTokenizedField(String fieldName) {
return getOrEvaluate(AttributeType.REVERSE_TOKENIZED_FIELD, fieldName, underlyingHelper::isReverseTokenizedField);
}

@VisibleForTesting
boolean getOrEvaluate(AttributeType attributeType, String fieldName, Function<String,Boolean> evaluateFn) {
return resultCache.computeIfAbsent(fieldName, ResultEntry::new).resolveResult(attributeType, evaluateFn);
}

private static class ResultEntry {
private final String fieldName;
private final EnumMap<AttributeType, Boolean> resultMap;

ResultEntry(String fieldName) {
this.fieldName = fieldName;
this.resultMap = new EnumMap<>(AttributeType.class);
}

boolean resolveResult(AttributeType attributeType, Function<String,Boolean> evaluateFn) {
return resultMap.computeIfAbsent(attributeType, (t) -> evaluateFn.apply(fieldName));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import datawave.ingest.config.IngestConfigurationFactory;
import datawave.ingest.data.Type;
import datawave.ingest.data.TypeRegistry;
import datawave.ingest.data.config.CachedFieldConfigHelper;
import datawave.ingest.data.config.DataTypeHelperImpl;
import datawave.ingest.data.config.FieldConfigHelper;
import datawave.ingest.data.config.MarkingsHelper;
Expand Down Expand Up @@ -137,9 +138,14 @@ public abstract class BaseIngestHelper extends AbstractIngestHelper implements C
public static final String FIELD_FAILED_NORMALIZATION_POLICY = ".data.field.normalization.failure.policy";

public static final String FIELD_CONFIG_FILE = ".data.category.field.config.file";
public static final String FIELD_CONFIG_CACHE_ENABLED = ".data.category.field.config.cache.enabled";
public static final String FIELD_CONFIG_CACHE_KEY_LIMIT = ".data.category.field.config.cache.limit";

private static final Logger log = ThreadConfigurableLogger.getLogger(BaseIngestHelper.class);

private static final boolean DEFAULT_FIELD_CACHE_ENABLED = false;
private static final int DEFAULT_FIELD_CACHE_LIMIT = 100;

private Multimap<String,datawave.data.type.Type<?>> typeFieldMap = null;
private Multimap<String,datawave.data.type.Type<?>> typePatternMap = null;
private TreeMultimap<Matcher,datawave.data.type.Type<?>> typeCompiledPatternMap = null;
Expand Down Expand Up @@ -254,10 +260,17 @@ public void setup(Configuration config) {
// Load the field helper, which takes precedence over the individual field configurations
final String fieldConfigFile = config.get(this.getType().typeName() + FIELD_CONFIG_FILE);
if (fieldConfigFile != null) {
final boolean fieldConfigCacheEnabled = config.getBoolean(this.getType().typeName() + FIELD_CONFIG_CACHE_ENABLED, DEFAULT_FIELD_CACHE_ENABLED);
final int fieldConfigCacheLimit = config.getInt(this.getType().typeName() + FIELD_CONFIG_CACHE_KEY_LIMIT, DEFAULT_FIELD_CACHE_LIMIT);
if (log.isDebugEnabled()) {
log.debug("Field config file " + fieldConfigFile + " specified for: " + this.getType().typeName() + FIELD_CONFIG_FILE);
log.debug("Field config cache enabled: " + fieldConfigCacheEnabled);
if (fieldConfigCacheEnabled) {
log.debug("Field config cache limit: " + fieldConfigCacheLimit);
}
}
this.fieldConfigHelper = XMLFieldConfigHelper.load(fieldConfigFile, this);
final FieldConfigHelper baseHelper = XMLFieldConfigHelper.load(fieldConfigFile, this);
fieldConfigHelper = fieldConfigCacheEnabled ? new CachedFieldConfigHelper(baseHelper, fieldConfigCacheLimit) : baseHelper;
}

// Process the indexed fields
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package datawave.ingest.data.config;

import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;

import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Stream;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

public class CachingFieldConfigHelperTest {
@SuppressWarnings("unchecked")
@Test
public void testCachingBehaviorWillCallBaseMethods() {
// @formatter:off
Stream.of(new Object[][] {
new Object[] {
(BiConsumer<FieldConfigHelper, String>) FieldConfigHelper::isIndexOnlyField,
(BiConsumer<FieldConfigHelper, String>) (h, f) -> verify(h).isIndexOnlyField(eq(f)),
(BiConsumer<FieldConfigHelper, String>) FieldConfigHelper::isIndexedField,
(BiConsumer<FieldConfigHelper, String>) (h, f) -> verify(h).isIndexedField(eq(f)),
(BiConsumer<FieldConfigHelper, String>) FieldConfigHelper::isTokenizedField,
(BiConsumer<FieldConfigHelper, String>) (h, f) -> verify(h).isTokenizedField(eq(f)),
(BiConsumer<FieldConfigHelper, String>) FieldConfigHelper::isStoredField,
(BiConsumer<FieldConfigHelper, String>) (h, f) -> verify(h).isStoredField(eq(f)),
(BiConsumer<FieldConfigHelper, String>) FieldConfigHelper::isReverseIndexedField,
(BiConsumer<FieldConfigHelper, String>) (h, f) -> verify(h).isReverseIndexedField(eq(f)),
(BiConsumer<FieldConfigHelper, String>) FieldConfigHelper::isReverseTokenizedField,
(BiConsumer<FieldConfigHelper, String>) (h, f) -> verify(h).isReverseTokenizedField(eq(f)),
}
}).forEach(arg -> {
// param[0] = helper method
// param[1] = validation method
String fieldName = "testField";
BiConsumer<FieldConfigHelper, String> testAction = (BiConsumer<FieldConfigHelper, String>) arg[0];
BiConsumer<FieldConfigHelper, String> verifyAction = (BiConsumer<FieldConfigHelper, String>) arg[1];
FieldConfigHelper mockHelper = mock(FieldConfigHelper.class);
FieldConfigHelper cachedHelper = new CachedFieldConfigHelper(mockHelper, 1);
testAction.accept(cachedHelper, fieldName);
verifyAction.accept(mockHelper, fieldName);
});
// @formatter:on
}

@ParameterizedTest
@ValueSource(ints = {-1,0})
public void testConstructorWithNonPositiveLimitWillThrow(int limit) {
assertThrows(IllegalArgumentException.class, () -> new CachedFieldConfigHelper(mock(FieldConfigHelper.class), limit));
}

@Test
public void testCachingLimitsBetweenFieldsAndAttributeTypes() {
AtomicLong counter = new AtomicLong();
CachedFieldConfigHelper helper = new CachedFieldConfigHelper(mock(FieldConfigHelper.class), 2);
Function<String,Boolean> fn = (f) -> {
counter.incrementAndGet();
return true;
};

// following ensures that:
// 1. fields are computed, where appropriate per attribute-type
// 2. limit allows cache results to return
// 3. limit blocks results to return if exceeded
// 4. limit functions across attribute-types

helper.getOrEvaluate(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field1", fn);
Assertions.assertEquals(1, counter.get(), "field1 should compute result (new field)");

helper.getOrEvaluate(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field1", fn);
Assertions.assertEquals(1, counter.get(), "field1 repeated (existing field)");

helper.getOrEvaluate(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field2", fn);
Assertions.assertEquals(2, counter.get(), "field2 should compute result (new field)");

helper.getOrEvaluate(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field2", fn);
Assertions.assertEquals(2, counter.get(), "field2 repeated (existing)");

helper.getOrEvaluate(CachedFieldConfigHelper.AttributeType.INDEXED_FIELD, "field1", fn);
Assertions.assertEquals(3, counter.get(), "field1 should compute result (new attribute)");

helper.getOrEvaluate(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field3", fn);
Assertions.assertEquals(4, counter.get(), "field3 exceeded limit (new field)");

helper.getOrEvaluate(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field3", fn);
Assertions.assertEquals(4, counter.get(), "field3 exceeded limit (existing field)");

// LRU map should evict field #2
// we access field #1 above which has more accesses over field #2
helper.getOrEvaluate(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field2", fn);
Assertions.assertEquals(5, counter.get(), "field1 exceeded limit (new field/eviction)");
}
}

0 comments on commit c35be9b

Please sign in to comment.