diff --git a/data-prepper-plugins/mutate-string-processors/README.md b/data-prepper-plugins/mutate-string-processors/README.md index 92f5f492e6..8d72eebc87 100644 --- a/data-prepper-plugins/mutate-string-processors/README.md +++ b/data-prepper-plugins/mutate-string-processors/README.md @@ -1,6 +1,7 @@ # Mutate String Processors The following is a list of processors to mutate a string. * [substitute_string](#substitutestringprocessor) +* [replace_string](#replacestringprocessor) * [split_string](#splitstringprocessor) * [uppercase_string](#uppercasestringprocessor) * [lowercase_string](#lowercasestringprocessor) @@ -50,6 +51,48 @@ must be escaped using `\\` when using double quotes and `\ ` when using single q --- +## ReplaceStringProcessor +A processor that takes in a key and changes its value by replacing each occurrence of from substring to target substring. + +### Basic Usage +To get started, create the following `pipeline.yaml`. +```yaml +pipeline: + source: + file: + path: "/full/path/to/logs_json.log" + record_type: "event" + format: "json" + processor: + - replace_string: + entries: + - source: "message" + from: "ab" + to: "ef" + sink: + - stdout: +``` + +Create the following file named `logs_json.log` and replace the `path` in the file source of your `pipeline.yaml` with the path of this file. + +```json +{"message": "ab:cd:ab:cd"} +``` +When you run Data Prepper with this `pipeline.yaml`, you should see the following output: + +```json +{"message": "ef:cd:ef:cd"} +``` +If `from` substring does not have a match, the key will be returned as it is. + +### Configuration +* `entries` - (required) - A list of entries to add to an event + * `source` - (required) - The key to be modified + * `from` - (required) - The substring to be replaced. This doesn't support regex. + * `to` - (required) - The String to be substituted for each match of `from` + +--- + ## SplitStringProcessor A processor that splits a field into an array using a delimiter character. diff --git a/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/ReplaceStringProcessor.java b/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/ReplaceStringProcessor.java new file mode 100644 index 0000000000..15ec8615db --- /dev/null +++ b/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/ReplaceStringProcessor.java @@ -0,0 +1,56 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.mutatestring; + +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; +import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventKey; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; +import org.opensearch.dataprepper.model.processor.Processor; + +import java.util.Objects; + +/** + * This processor takes in a key and changes its value by replacing each occurrence of from substring to target substring. + * If the value is not a string, no action is performed. + */ +@DataPrepperPlugin(name = "replace_string", pluginType = Processor.class, pluginConfigurationType = ReplaceStringProcessorConfig.class) +public class ReplaceStringProcessor extends AbstractStringProcessor { + private final ExpressionEvaluator expressionEvaluator; + + @DataPrepperPluginConstructor + public ReplaceStringProcessor(final PluginMetrics pluginMetrics, final ReplaceStringProcessorConfig config, final ExpressionEvaluator expressionEvaluator) { + super(pluginMetrics, config); + this.expressionEvaluator = expressionEvaluator; + + for(final ReplaceStringProcessorConfig.Entry entry : config.getEntries()) { + if (entry.getReplaceWhen() != null + && !expressionEvaluator.isValidExpressionStatement(entry.getReplaceWhen())) { + throw new InvalidPluginConfigurationException( + String.format("substitute_when %s is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", entry.getReplaceWhen())); + } + } + } + + @Override + protected void performKeyAction(final Event recordEvent, final ReplaceStringProcessorConfig.Entry entry, final String value) + { + if (Objects.nonNull(entry.getReplaceWhen()) && !expressionEvaluator.evaluateConditional(entry.getReplaceWhen(), recordEvent)) { + return; + } + + final String newValue = value.replace(entry.getFrom(), entry.getTo()); + recordEvent.put(entry.getSource(), newValue); + } + + @Override + protected EventKey getKey(final ReplaceStringProcessorConfig.Entry entry) { + return entry.getSource(); + } +} diff --git a/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/ReplaceStringProcessorConfig.java b/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/ReplaceStringProcessorConfig.java new file mode 100644 index 0000000000..6d049910fb --- /dev/null +++ b/data-prepper-plugins/mutate-string-processors/src/main/java/org/opensearch/dataprepper/plugins/processor/mutatestring/ReplaceStringProcessorConfig.java @@ -0,0 +1,69 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.mutatestring; + +import com.fasterxml.jackson.annotation.JsonClassDescription; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyDescription; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import org.opensearch.dataprepper.model.event.EventKey; + +import java.util.List; + +@JsonPropertyOrder +@JsonClassDescription("The `replace_string` processor replaces all occurrence of substring in key’s value with a " + + "replacement string.") +public class ReplaceStringProcessorConfig implements StringProcessorConfig { + public static class Entry { + @JsonPropertyDescription("The key to modify.") + private EventKey source; + @JsonPropertyDescription("The substring to be replaced in the source.") + private String from; + @JsonPropertyDescription("The string to be replaced for each match of `from`.") + private String to; + + @JsonProperty("replace_when") + @JsonPropertyDescription("A Data Prepper [conditional expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/), " + + "such as `/some-key == \"test\"'`, that will be evaluated to determine whether the processor will be " + + "run on the event. Default is `null`. All events will be processed unless otherwise stated.") + private String replaceWhen; + + public EventKey getSource() { + return source; + } + + public String getFrom() { + return from; + } + + public String getTo() { + return to; + } + + public String getReplaceWhen() { return replaceWhen; } + + public Entry(final EventKey source, final String from, final String to, final String replaceWhen) { + this.source = source; + this.from = from; + this.to = to; + this.replaceWhen = replaceWhen; + } + + public Entry() {} + } + + @JsonPropertyDescription("List of entries. Valid values are `source`, `from`, and `to`, and `substitute_when`.") + private List entries; + + public List getEntries() { + return entries; + } + + @Override + public List getIterativeConfig() { + return entries; + } +} diff --git a/data-prepper-plugins/mutate-string-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutatestring/ReplaceStringProcessorTests.java b/data-prepper-plugins/mutate-string-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutatestring/ReplaceStringProcessorTests.java new file mode 100644 index 0000000000..a552596d38 --- /dev/null +++ b/data-prepper-plugins/mutate-string-processors/src/test/java/org/opensearch/dataprepper/plugins/processor/mutatestring/ReplaceStringProcessorTests.java @@ -0,0 +1,198 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.dataprepper.plugins.processor.mutatestring; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.opensearch.dataprepper.event.TestEventFactory; +import org.opensearch.dataprepper.event.TestEventKeyFactory; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; +import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.event.Event; +import org.opensearch.dataprepper.model.event.EventBuilder; +import org.opensearch.dataprepper.model.event.EventFactory; +import org.opensearch.dataprepper.model.event.EventKey; +import org.opensearch.dataprepper.model.event.EventKeyFactory; +import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException; +import org.opensearch.dataprepper.model.record.Record; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.mockito.Mockito.lenient; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +public class ReplaceStringProcessorTests { + private static final EventFactory TEST_EVENT_FACTORY = TestEventFactory.getTestEventFactory(); + private final EventKeyFactory eventKeyFactory = TestEventKeyFactory.getTestEventFactory(); + @Mock + private PluginMetrics pluginMetrics; + + @Mock + private ReplaceStringProcessorConfig config; + + @Mock + private ExpressionEvaluator expressionEvaluator; + + + @BeforeEach + public void setup() { + lenient().when(config.getIterativeConfig()).thenReturn(Collections.singletonList(createEntry("message", "a", "b", null))); + lenient().when(config.getEntries()).thenReturn(Collections.singletonList(createEntry("message", "a", "b", null))); + } + + @Test + void invalid_Replace_when_throws_InvalidPluginConfigurationException() { + final String ReplaceWhen = UUID.randomUUID().toString(); + when(config.getEntries()).thenReturn(Collections.singletonList(createEntry("message", "a", "b", ReplaceWhen))); + + when(expressionEvaluator.isValidExpressionStatement(ReplaceWhen)).thenReturn(false); + + assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest); + } + + @Test + public void testHappyPathReplaceStringProcessor() { + final ReplaceStringProcessor processor = createObjectUnderTest(); + final Record record = getEvent("abcd"); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().get("message", Object.class), equalTo("bbcd")); + } + + @Test + public void testNoMatchReplaceStringProcessor() { + final ReplaceStringProcessor processor = createObjectUnderTest(); + final Record record = getEvent("qwerty"); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().get("message", Object.class), equalTo("qwerty")); + } + + @Test + public void testHappyPathMultiReplaceStringProcessor() { + when(config.getIterativeConfig()).thenReturn(Arrays.asList(createEntry("message", "a", "b", null), + createEntry("message2", "c", "d", null))); + when(config.getEntries()).thenReturn(Arrays.asList(createEntry("message", "a", "b", null), + createEntry("message2", "c", "d", null))); + + final ReplaceStringProcessor processor = createObjectUnderTest(); + final Record record = getEvent("abcd"); + record.getData().put("message2", "cdef"); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().get("message", Object.class), equalTo("bbcd")); + assertThat(editedRecords.get(0).getData().containsKey("message2"), is(true)); + assertThat(editedRecords.get(0).getData().get("message2", Object.class), equalTo("ddef")); + } + + @Test + public void testValueIsNotStringReplaceStringProcessor() { + final ReplaceStringProcessor processor = createObjectUnderTest(); + final Record record = getEvent(3); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().get("message", Object.class), equalTo(3)); + } + + @Test + public void testValueIsNullReplaceStringProcessor() { + final ReplaceStringProcessor processor = createObjectUnderTest(); + final Record record = getEvent(null); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().get("message", Object.class), equalTo(null)); + } + + @Test + public void testValueIsObjectReplaceStringProcessor() { + final ReplaceStringProcessor processor = createObjectUnderTest(); + final TestObject testObject = new TestObject(); + testObject.a = "msg"; + final Record record = getEvent(testObject); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().containsKey("message"), is(true)); + assertThat(editedRecords.get(0).getData().get("message", TestObject.class), equalTo(testObject)); + assertThat(editedRecords.get(0).getData().get("message", TestObject.class).a, equalTo(testObject.a)); + } + + @Test + public void test_events_are_identical_when_ReplaceWhen_condition_returns_false() { + final String ReplaceWhen = UUID.randomUUID().toString(); + + when(config.getIterativeConfig()).thenReturn(Collections.singletonList(createEntry("message", "[?\\\\+]", "b", ReplaceWhen))); + when(config.getEntries()).thenReturn(Collections.singletonList(createEntry("message", "[?\\\\+]", "b", ReplaceWhen))); + when(expressionEvaluator.isValidExpressionStatement(ReplaceWhen)).thenReturn(true); + + final ReplaceStringProcessor processor = createObjectUnderTest(); + final Record record = getEvent("abcd"); + + when(expressionEvaluator.evaluateConditional(ReplaceWhen, record.getData())).thenReturn(false); + final List> editedRecords = (List>) processor.doExecute(Collections.singletonList(record)); + + assertThat(editedRecords.get(0).getData().toMap(), equalTo(record.getData().toMap())); + } + + @Test + public void testShutdown() { + final ReplaceStringProcessor processor = createObjectUnderTest(); + assertThat(processor.isReadyForShutdown(), is(true)); + } + + private static class TestObject { + public String a; + + @Override + public boolean equals(Object other) { + if(other instanceof TestObject) { + return ((TestObject) other).a.equals(this.a); + } + + return false; + } + } + + private ReplaceStringProcessorConfig.Entry createEntry(final String source, final String from, final String to, final String ReplaceWhen) { + final EventKey sourceKey = eventKeyFactory.createEventKey(source); + + return new ReplaceStringProcessorConfig.Entry(sourceKey, from, to, ReplaceWhen); + } + + private ReplaceStringProcessor createObjectUnderTest() { + return new ReplaceStringProcessor(pluginMetrics, config, expressionEvaluator); + } + + private Record getEvent(Object message) { + final Map testData = new HashMap<>(); + testData.put("message", message); + return buildRecordWithEvent(testData); + } + + private static Record buildRecordWithEvent(final Map data) { + return new Record<>(TEST_EVENT_FACTORY.eventBuilder(EventBuilder.class) + .withData(data) + .withEventType("event") + .build()); + } +}