Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ReplaceStringProcessor for simple string substitution that doesn't involve regex #4954

Merged
merged 3 commits into from
Sep 18, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions data-prepper-plugins/mutate-string-processors/README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Mutate String Processors
The following is a list of processors to mutate a string.
* [substitute_string](#substitutestringprocessor)
* [replace_string](#replacestringprocessor)
* [split_string](#splitstringprocessor)
* [uppercase_string](#uppercasestringprocessor)
* [lowercase_string](#lowercasestringprocessor)
Expand Down Expand Up @@ -50,6 +51,48 @@ must be escaped using `\\` when using double quotes and `\ ` when using single q

---

## ReplaceStringProcessor
A processor that takes in a key and changes its value by replacing each occurrence of from substring to target substring.

### Basic Usage
To get started, create the following `pipeline.yaml`.
```yaml
pipeline:
source:
file:
path: "/full/path/to/logs_json.log"
record_type: "event"
format: "json"
processor:
- replace_string:
entries:
- source: "message"
from: "ab"
to: "ef"
sink:
- stdout:
```

Create the following file named `logs_json.log` and replace the `path` in the file source of your `pipeline.yaml` with the path of this file.

```json
{"message": "ab:cd:ab:cd"}
```
When you run Data Prepper with this `pipeline.yaml`, you should see the following output:

```json
{"message": "ef:cd:ef:cd"}
```
If `from` substring does not have a match, the key will be returned as it is.

### Configuration
* `entries` - (required) - A list of entries to add to an event
* `source` - (required) - The key to be modified
* `from` - (required) - The substring to be replaced. This cannot be regex.
* `to` - (required) - The String to be substituted for each match of `from`

---

## SplitStringProcessor
A processor that splits a field into an array using a delimiter character.

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.mutatestring;

import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin;
import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventKey;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.processor.Processor;

import java.util.Objects;

/**
* This processor takes in a key and changes its value by replacing each occurrence of from substring to target substring.
* If the value is not a string, no action is performed.
*/
@DataPrepperPlugin(name = "replace_string", pluginType = Processor.class, pluginConfigurationType = ReplaceStringProcessorConfig.class)
public class ReplaceStringProcessor extends AbstractStringProcessor<ReplaceStringProcessorConfig.Entry> {
private final ExpressionEvaluator expressionEvaluator;

@DataPrepperPluginConstructor
public ReplaceStringProcessor(final PluginMetrics pluginMetrics, final ReplaceStringProcessorConfig config, final ExpressionEvaluator expressionEvaluator) {
super(pluginMetrics, config);
this.expressionEvaluator = expressionEvaluator;

for(final ReplaceStringProcessorConfig.Entry entry : config.getEntries()) {
if (entry.getSubstituteWhen() != null
&& !expressionEvaluator.isValidExpressionStatement(entry.getSubstituteWhen())) {
throw new InvalidPluginConfigurationException(
String.format("substitute_when %s is not a valid expression statement. See https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/ for valid expression syntax", entry.getSubstituteWhen()));
}
}
}

@Override
protected void performKeyAction(final Event recordEvent, final ReplaceStringProcessorConfig.Entry entry, final String value)
{
if (Objects.nonNull(entry.getSubstituteWhen()) && !expressionEvaluator.evaluateConditional(entry.getSubstituteWhen(), recordEvent)) {
return;
}

final String newValue = value.replace(entry.getFrom(), entry.getTo());
recordEvent.put(entry.getSource(), newValue);
}

@Override
protected EventKey getKey(final ReplaceStringProcessorConfig.Entry entry) {
return entry.getSource();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.mutatestring;

import com.fasterxml.jackson.annotation.JsonClassDescription;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import com.fasterxml.jackson.annotation.JsonPropertyOrder;
import org.opensearch.dataprepper.model.event.EventKey;

import java.util.List;

@JsonPropertyOrder
@JsonClassDescription("The `replace_string` processor replaces all occurrence of substring in key’s value with a " +
"replacement string.")
public class ReplaceStringProcessorConfig implements StringProcessorConfig<ReplaceStringProcessorConfig.Entry> {
public static class Entry {
@JsonPropertyDescription("The key to modify.")
private EventKey source;
@JsonPropertyDescription("The substring to be replaced in the source.")
private String from;
@JsonPropertyDescription("The string to be substituted for each match of `from`.")
private String to;

@JsonProperty("substitute_when")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we call it replace_when here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch

@JsonPropertyDescription("A Data Prepper [conditional expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/), " +
"such as `/some-key == \"test\"'`, that will be evaluated to determine whether the processor will be " +
"run on the event. Default is `null`. All events will be processed unless otherwise stated.")
private String substituteWhen;

public EventKey getSource() {
return source;
}

public String getFrom() {
return from;
}

public String getTo() {
return to;
}

public String getSubstituteWhen() { return substituteWhen; }

public Entry(final EventKey source, final String from, final String to, final String substituteWhen) {
this.source = source;
this.from = from;
this.to = to;
this.substituteWhen = substituteWhen;
}

public Entry() {}
}

@JsonPropertyDescription("List of entries. Valid values are `source`, `from`, and `to`, and `substitute_when`.")
private List<Entry> entries;

public List<Entry> getEntries() {
return entries;
}

@Override
public List<Entry> getIterativeConfig() {
return entries;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.plugins.processor.mutatestring;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.opensearch.dataprepper.event.TestEventFactory;
import org.opensearch.dataprepper.event.TestEventKeyFactory;
import org.opensearch.dataprepper.expression.ExpressionEvaluator;
import org.opensearch.dataprepper.metrics.PluginMetrics;
import org.opensearch.dataprepper.model.event.Event;
import org.opensearch.dataprepper.model.event.EventBuilder;
import org.opensearch.dataprepper.model.event.EventFactory;
import org.opensearch.dataprepper.model.event.EventKey;
import org.opensearch.dataprepper.model.event.EventKeyFactory;
import org.opensearch.dataprepper.model.plugin.InvalidPluginConfigurationException;
import org.opensearch.dataprepper.model.record.Record;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
public class ReplaceStringProcessorTests {
private static final EventFactory TEST_EVENT_FACTORY = TestEventFactory.getTestEventFactory();
private final EventKeyFactory eventKeyFactory = TestEventKeyFactory.getTestEventFactory();
@Mock
private PluginMetrics pluginMetrics;

@Mock
private ReplaceStringProcessorConfig config;

@Mock
private ExpressionEvaluator expressionEvaluator;


@BeforeEach
public void setup() {
lenient().when(config.getIterativeConfig()).thenReturn(Collections.singletonList(createEntry("message", "a", "b", null)));
lenient().when(config.getEntries()).thenReturn(Collections.singletonList(createEntry("message", "a", "b", null)));
}

@Test
void invalid_Replace_when_throws_InvalidPluginConfigurationException() {
final String ReplaceWhen = UUID.randomUUID().toString();
when(config.getEntries()).thenReturn(Collections.singletonList(createEntry("message", "a", "b", ReplaceWhen)));

when(expressionEvaluator.isValidExpressionStatement(ReplaceWhen)).thenReturn(false);

assertThrows(InvalidPluginConfigurationException.class, this::createObjectUnderTest);
}

@Test
public void testHappyPathReplaceStringProcessor() {
final ReplaceStringProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("abcd");
final List<Record<Event>> editedRecords = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));

assertThat(editedRecords.get(0).getData().containsKey("message"), is(true));
assertThat(editedRecords.get(0).getData().get("message", Object.class), equalTo("bbcd"));
}

@Test
public void testNoMatchReplaceStringProcessor() {
final ReplaceStringProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("qwerty");
final List<Record<Event>> editedRecords = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));

assertThat(editedRecords.get(0).getData().containsKey("message"), is(true));
assertThat(editedRecords.get(0).getData().get("message", Object.class), equalTo("qwerty"));
}

@Test
public void testHappyPathMultiReplaceStringProcessor() {
when(config.getIterativeConfig()).thenReturn(Arrays.asList(createEntry("message", "a", "b", null),
createEntry("message2", "c", "d", null)));
when(config.getEntries()).thenReturn(Arrays.asList(createEntry("message", "a", "b", null),
createEntry("message2", "c", "d", null)));

final ReplaceStringProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("abcd");
record.getData().put("message2", "cdef");
final List<Record<Event>> editedRecords = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));

assertThat(editedRecords.get(0).getData().containsKey("message"), is(true));
assertThat(editedRecords.get(0).getData().get("message", Object.class), equalTo("bbcd"));
assertThat(editedRecords.get(0).getData().containsKey("message2"), is(true));
assertThat(editedRecords.get(0).getData().get("message2", Object.class), equalTo("ddef"));
}

@Test
public void testValueIsNotStringReplaceStringProcessor() {
final ReplaceStringProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent(3);
final List<Record<Event>> editedRecords = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));

assertThat(editedRecords.get(0).getData().containsKey("message"), is(true));
assertThat(editedRecords.get(0).getData().get("message", Object.class), equalTo(3));
}

@Test
public void testValueIsNullReplaceStringProcessor() {
final ReplaceStringProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent(null);
final List<Record<Event>> editedRecords = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));

assertThat(editedRecords.get(0).getData().containsKey("message"), is(true));
assertThat(editedRecords.get(0).getData().get("message", Object.class), equalTo(null));
}

@Test
public void testValueIsObjectReplaceStringProcessor() {
final ReplaceStringProcessor processor = createObjectUnderTest();
final TestObject testObject = new TestObject();
testObject.a = "msg";
final Record<Event> record = getEvent(testObject);
final List<Record<Event>> editedRecords = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));

assertThat(editedRecords.get(0).getData().containsKey("message"), is(true));
assertThat(editedRecords.get(0).getData().get("message", TestObject.class), equalTo(testObject));
assertThat(editedRecords.get(0).getData().get("message", TestObject.class).a, equalTo(testObject.a));
}

@Test
public void test_events_are_identical_when_ReplaceWhen_condition_returns_false() {
final String ReplaceWhen = UUID.randomUUID().toString();

when(config.getIterativeConfig()).thenReturn(Collections.singletonList(createEntry("message", "[?\\\\+]", "b", ReplaceWhen)));
when(config.getEntries()).thenReturn(Collections.singletonList(createEntry("message", "[?\\\\+]", "b", ReplaceWhen)));
when(expressionEvaluator.isValidExpressionStatement(ReplaceWhen)).thenReturn(true);

final ReplaceStringProcessor processor = createObjectUnderTest();
final Record<Event> record = getEvent("abcd");

when(expressionEvaluator.evaluateConditional(ReplaceWhen, record.getData())).thenReturn(false);
final List<Record<Event>> editedRecords = (List<Record<Event>>) processor.doExecute(Collections.singletonList(record));

assertThat(editedRecords.get(0).getData().toMap(), equalTo(record.getData().toMap()));
}

@Test
public void testShutdown() {
final ReplaceStringProcessor processor = createObjectUnderTest();
assertThat(processor.isReadyForShutdown(), is(true));
}

private static class TestObject {
public String a;

@Override
public boolean equals(Object other) {
if(other instanceof TestObject) {
return ((TestObject) other).a.equals(this.a);
}

return false;
}
}

private ReplaceStringProcessorConfig.Entry createEntry(final String source, final String from, final String to, final String ReplaceWhen) {
final EventKey sourceKey = eventKeyFactory.createEventKey(source);

return new ReplaceStringProcessorConfig.Entry(sourceKey, from, to, ReplaceWhen);
}

private ReplaceStringProcessor createObjectUnderTest() {
return new ReplaceStringProcessor(pluginMetrics, config, expressionEvaluator);
}

private Record<Event> getEvent(Object message) {
final Map<String, Object> testData = new HashMap<>();
testData.put("message", message);
return buildRecordWithEvent(testData);
}

private static Record<Event> buildRecordWithEvent(final Map<String, Object> data) {
return new Record<>(TEST_EVENT_FACTORY.eventBuilder(EventBuilder.class)
.withData(data)
.withEventType("event")
.build());
}
}
Loading