From c35be9b691fe3f0414b354634f587634cd0b9aa6 Mon Sep 17 00:00:00 2001
From: dtspence <33552925+dtspence@users.noreply.github.com>
Date: Fri, 4 Oct 2024 02:15:10 +0000
Subject: [PATCH] Field configuration helper cache implementation
---
warehouse/ingest-core/pom.xml | 15 +++
.../data/config/CachedFieldConfigHelper.java | 75 ++++++++++++++
.../data/config/ingest/BaseIngestHelper.java | 15 ++-
.../config/CachingFieldConfigHelperTest.java | 99 +++++++++++++++++++
4 files changed, 203 insertions(+), 1 deletion(-)
create mode 100644 warehouse/ingest-core/src/main/java/datawave/ingest/data/config/CachedFieldConfigHelper.java
create mode 100644 warehouse/ingest-core/src/test/java/datawave/ingest/data/config/CachingFieldConfigHelperTest.java
diff --git a/warehouse/ingest-core/pom.xml b/warehouse/ingest-core/pom.xml
index 93c4b6b61f1..f5ca2dd1117 100644
--- a/warehouse/ingest-core/pom.xml
+++ b/warehouse/ingest-core/pom.xml
@@ -212,6 +212,21 @@
javassist
test
+
+ org.junit.jupiter
+ junit-jupiter-api
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-engine
+ test
+
+
+ org.junit.jupiter
+ junit-jupiter-params
+ test
+
diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/CachedFieldConfigHelper.java b/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/CachedFieldConfigHelper.java
new file mode 100644
index 00000000000..d42a0060e50
--- /dev/null
+++ b/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/CachedFieldConfigHelper.java
@@ -0,0 +1,75 @@
+package datawave.ingest.data.config;
+
+import java.util.EnumMap;
+import java.util.Map;
+import java.util.function.Function;
+
+import org.apache.commons.collections4.map.LRUMap;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class CachedFieldConfigHelper implements FieldConfigHelper {
+ private final FieldConfigHelper underlyingHelper;
+ private final Map resultCache;
+
+ enum AttributeType {
+ INDEXED_FIELD, REVERSE_INDEXED_FIELD, TOKENIZED_FIELD, REVERSE_TOKENIZED_FIELD, STORED_FIELD, INDEXED_ONLY
+ }
+
+ public CachedFieldConfigHelper(FieldConfigHelper helper, int limit) {
+ if (limit < 1) {
+ throw new IllegalArgumentException("Limit must be a positive integer");
+ }
+ this.underlyingHelper = helper;
+ this.resultCache = new LRUMap<>(limit);
+ }
+
+ @Override
+ public boolean isStoredField(String fieldName) {
+ return getOrEvaluate(AttributeType.STORED_FIELD, fieldName, underlyingHelper::isStoredField);
+ }
+
+ @Override
+ public boolean isIndexedField(String fieldName) {
+ return getOrEvaluate(AttributeType.INDEXED_FIELD, fieldName, underlyingHelper::isIndexedField);
+ }
+
+ @Override
+ public boolean isIndexOnlyField(String fieldName) {
+ return getOrEvaluate(AttributeType.INDEXED_ONLY, fieldName, underlyingHelper::isIndexOnlyField);
+ }
+
+ @Override
+ public boolean isReverseIndexedField(String fieldName) {
+ return getOrEvaluate(AttributeType.REVERSE_INDEXED_FIELD, fieldName, underlyingHelper::isReverseIndexedField);
+ }
+
+ @Override
+ public boolean isTokenizedField(String fieldName) {
+ return getOrEvaluate(AttributeType.TOKENIZED_FIELD, fieldName, underlyingHelper::isTokenizedField);
+ }
+
+ @Override
+ public boolean isReverseTokenizedField(String fieldName) {
+ return getOrEvaluate(AttributeType.REVERSE_TOKENIZED_FIELD, fieldName, underlyingHelper::isReverseTokenizedField);
+ }
+
+ @VisibleForTesting
+ boolean getOrEvaluate(AttributeType attributeType, String fieldName, Function evaluateFn) {
+ return resultCache.computeIfAbsent(fieldName, ResultEntry::new).resolveResult(attributeType, evaluateFn);
+ }
+
+ private static class ResultEntry {
+ private final String fieldName;
+ private final EnumMap resultMap;
+
+ ResultEntry(String fieldName) {
+ this.fieldName = fieldName;
+ this.resultMap = new EnumMap<>(AttributeType.class);
+ }
+
+ boolean resolveResult(AttributeType attributeType, Function evaluateFn) {
+ return resultMap.computeIfAbsent(attributeType, (t) -> evaluateFn.apply(fieldName));
+ }
+ }
+}
diff --git a/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/ingest/BaseIngestHelper.java b/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/ingest/BaseIngestHelper.java
index 4be57bdf85e..d6c42eba3a3 100644
--- a/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/ingest/BaseIngestHelper.java
+++ b/warehouse/ingest-core/src/main/java/datawave/ingest/data/config/ingest/BaseIngestHelper.java
@@ -33,6 +33,7 @@
import datawave.ingest.config.IngestConfigurationFactory;
import datawave.ingest.data.Type;
import datawave.ingest.data.TypeRegistry;
+import datawave.ingest.data.config.CachedFieldConfigHelper;
import datawave.ingest.data.config.DataTypeHelperImpl;
import datawave.ingest.data.config.FieldConfigHelper;
import datawave.ingest.data.config.MarkingsHelper;
@@ -137,9 +138,14 @@ public abstract class BaseIngestHelper extends AbstractIngestHelper implements C
public static final String FIELD_FAILED_NORMALIZATION_POLICY = ".data.field.normalization.failure.policy";
public static final String FIELD_CONFIG_FILE = ".data.category.field.config.file";
+ public static final String FIELD_CONFIG_CACHE_ENABLED = ".data.category.field.config.cache.enabled";
+ public static final String FIELD_CONFIG_CACHE_KEY_LIMIT = ".data.category.field.config.cache.limit";
private static final Logger log = ThreadConfigurableLogger.getLogger(BaseIngestHelper.class);
+ private static final boolean DEFAULT_FIELD_CACHE_ENABLED = false;
+ private static final int DEFAULT_FIELD_CACHE_LIMIT = 100;
+
private Multimap> typeFieldMap = null;
private Multimap> typePatternMap = null;
private TreeMultimap> typeCompiledPatternMap = null;
@@ -254,10 +260,17 @@ public void setup(Configuration config) {
// Load the field helper, which takes precedence over the individual field configurations
final String fieldConfigFile = config.get(this.getType().typeName() + FIELD_CONFIG_FILE);
if (fieldConfigFile != null) {
+ final boolean fieldConfigCacheEnabled = config.getBoolean(this.getType().typeName() + FIELD_CONFIG_CACHE_ENABLED, DEFAULT_FIELD_CACHE_ENABLED);
+ final int fieldConfigCacheLimit = config.getInt(this.getType().typeName() + FIELD_CONFIG_CACHE_KEY_LIMIT, DEFAULT_FIELD_CACHE_LIMIT);
if (log.isDebugEnabled()) {
log.debug("Field config file " + fieldConfigFile + " specified for: " + this.getType().typeName() + FIELD_CONFIG_FILE);
+ log.debug("Field config cache enabled: " + fieldConfigCacheEnabled);
+ if (fieldConfigCacheEnabled) {
+ log.debug("Field config cache limit: " + fieldConfigCacheLimit);
+ }
}
- this.fieldConfigHelper = XMLFieldConfigHelper.load(fieldConfigFile, this);
+ final FieldConfigHelper baseHelper = XMLFieldConfigHelper.load(fieldConfigFile, this);
+ fieldConfigHelper = fieldConfigCacheEnabled ? new CachedFieldConfigHelper(baseHelper, fieldConfigCacheLimit) : baseHelper;
}
// Process the indexed fields
diff --git a/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/CachingFieldConfigHelperTest.java b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/CachingFieldConfigHelperTest.java
new file mode 100644
index 00000000000..bda872db318
--- /dev/null
+++ b/warehouse/ingest-core/src/test/java/datawave/ingest/data/config/CachingFieldConfigHelperTest.java
@@ -0,0 +1,99 @@
+package datawave.ingest.data.config;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+public class CachingFieldConfigHelperTest {
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testCachingBehaviorWillCallBaseMethods() {
+ // @formatter:off
+ Stream.of(new Object[][] {
+ new Object[] {
+ (BiConsumer) FieldConfigHelper::isIndexOnlyField,
+ (BiConsumer) (h, f) -> verify(h).isIndexOnlyField(eq(f)),
+ (BiConsumer) FieldConfigHelper::isIndexedField,
+ (BiConsumer) (h, f) -> verify(h).isIndexedField(eq(f)),
+ (BiConsumer) FieldConfigHelper::isTokenizedField,
+ (BiConsumer) (h, f) -> verify(h).isTokenizedField(eq(f)),
+ (BiConsumer) FieldConfigHelper::isStoredField,
+ (BiConsumer) (h, f) -> verify(h).isStoredField(eq(f)),
+ (BiConsumer) FieldConfigHelper::isReverseIndexedField,
+ (BiConsumer) (h, f) -> verify(h).isReverseIndexedField(eq(f)),
+ (BiConsumer) FieldConfigHelper::isReverseTokenizedField,
+ (BiConsumer) (h, f) -> verify(h).isReverseTokenizedField(eq(f)),
+ }
+ }).forEach(arg -> {
+ // param[0] = helper method
+ // param[1] = validation method
+ String fieldName = "testField";
+ BiConsumer testAction = (BiConsumer) arg[0];
+ BiConsumer verifyAction = (BiConsumer) arg[1];
+ FieldConfigHelper mockHelper = mock(FieldConfigHelper.class);
+ FieldConfigHelper cachedHelper = new CachedFieldConfigHelper(mockHelper, 1);
+ testAction.accept(cachedHelper, fieldName);
+ verifyAction.accept(mockHelper, fieldName);
+ });
+ // @formatter:on
+ }
+
+ @ParameterizedTest
+ @ValueSource(ints = {-1,0})
+ public void testConstructorWithNonPositiveLimitWillThrow(int limit) {
+ assertThrows(IllegalArgumentException.class, () -> new CachedFieldConfigHelper(mock(FieldConfigHelper.class), limit));
+ }
+
+ @Test
+ public void testCachingLimitsBetweenFieldsAndAttributeTypes() {
+ AtomicLong counter = new AtomicLong();
+ CachedFieldConfigHelper helper = new CachedFieldConfigHelper(mock(FieldConfigHelper.class), 2);
+ Function fn = (f) -> {
+ counter.incrementAndGet();
+ return true;
+ };
+
+ // following ensures that:
+ // 1. fields are computed, where appropriate per attribute-type
+ // 2. limit allows cache results to return
+ // 3. limit blocks results to return if exceeded
+ // 4. limit functions across attribute-types
+
+ helper.getOrEvaluate(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field1", fn);
+ Assertions.assertEquals(1, counter.get(), "field1 should compute result (new field)");
+
+ helper.getOrEvaluate(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field1", fn);
+ Assertions.assertEquals(1, counter.get(), "field1 repeated (existing field)");
+
+ helper.getOrEvaluate(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field2", fn);
+ Assertions.assertEquals(2, counter.get(), "field2 should compute result (new field)");
+
+ helper.getOrEvaluate(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field2", fn);
+ Assertions.assertEquals(2, counter.get(), "field2 repeated (existing)");
+
+ helper.getOrEvaluate(CachedFieldConfigHelper.AttributeType.INDEXED_FIELD, "field1", fn);
+ Assertions.assertEquals(3, counter.get(), "field1 should compute result (new attribute)");
+
+ helper.getOrEvaluate(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field3", fn);
+ Assertions.assertEquals(4, counter.get(), "field3 exceeded limit (new field)");
+
+ helper.getOrEvaluate(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field3", fn);
+ Assertions.assertEquals(4, counter.get(), "field3 exceeded limit (existing field)");
+
+ // LRU map should evict field #2
+ // we access field #1 above which has more accesses over field #2
+ helper.getOrEvaluate(CachedFieldConfigHelper.AttributeType.STORED_FIELD, "field2", fn);
+ Assertions.assertEquals(5, counter.get(), "field1 exceeded limit (new field/eviction)");
+ }
+}