From 48df422b2dfdfd5dc983f8969a73fa8ecadf1ad4 Mon Sep 17 00:00:00 2001 From: Chase Engelbrecht Date: Fri, 15 Mar 2024 20:16:48 -0700 Subject: [PATCH] Wire together for opensearch findings Signed-off-by: Chase Engelbrecht --- .../sink/opensearch/BulkOperationWrapper.java | 19 +- .../sink/opensearch/BulkRetryStrategy.java | 13 ++ .../opensearch/ConnectionConfiguration.java | 3 + .../sink/opensearch/OpenSearchSink.java | 203 ++++++++++++------ .../sink/opensearch/OpenSearchSinkTest.java | 6 +- .../processor/RuleEngineProcessor.java | 113 +++++++--- .../processor/RuleEngineProcessorConfig.java | 8 + .../converters/FindingConverter.java | 67 ++++++ .../processor/converters/OCSFConverter.java | 8 +- .../processor/model/datatypes/DataType.java | 5 + .../processor/model/rule/RuleSchema.java | 4 +- .../parser/OpenSearchSigmaV1RuleParser.java | 39 ++++ .../provider/rules/model/RuleData.java | 17 +- .../opensearch/OpenSearchRuleProvider.java | 15 +- .../rules/OpenSearchSigmaV1Rule.java | 33 +++ .../processor/util/OpenSearchDocMetadata.java | 19 ++ ...SigmaV1SigmaV1RuleConditionParserTest.java | 152 ++++++------- 17 files changed, 544 insertions(+), 180 deletions(-) create mode 100644 data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/converters/FindingConverter.java create mode 100644 data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/OpenSearchSigmaV1RuleParser.java create mode 100644 data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/rules/OpenSearchSigmaV1Rule.java create mode 100644 data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/util/OpenSearchDocMetadata.java diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationWrapper.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationWrapper.java index 3dfb1e4f9f..21c1d16470 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationWrapper.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkOperationWrapper.java @@ -6,12 +6,14 @@ package org.opensearch.dataprepper.plugins.sink.opensearch; import org.opensearch.client.opensearch.core.bulk.BulkOperation; +import org.opensearch.client.opensearch.core.bulk.BulkResponseItem; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.event.EventHandle; import org.opensearch.dataprepper.plugins.sink.opensearch.bulk.SerializedJson; import java.util.List; import java.util.Map; +import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -44,24 +46,31 @@ public class BulkOperationWrapper { ); private final EventHandle eventHandle; + private final Consumer bulkResponseItemConsumer; private final BulkOperation bulkOperation; private final SerializedJson jsonNode; private final Event event; public BulkOperationWrapper(final BulkOperation bulkOperation) { - this(bulkOperation, null, null, null); + this(bulkOperation, null, null, null, null); } - public BulkOperationWrapper(final BulkOperation bulkOperation, final EventHandle eventHandle, final SerializedJson jsonNode, final Event event) { + public BulkOperationWrapper(final BulkOperation bulkOperation, final EventHandle eventHandle, final SerializedJson jsonNode, final Event event, + final Consumer bulkResponseItemConsumer) { checkNotNull(bulkOperation); this.bulkOperation = bulkOperation; this.eventHandle = eventHandle; this.jsonNode = jsonNode; this.event = event; + this.bulkResponseItemConsumer = bulkResponseItemConsumer; + } + + public BulkOperationWrapper(final BulkOperation bulkOperation, final EventHandle eventHandle, final SerializedJson jsonNode, final Event event) { + this(bulkOperation, eventHandle, jsonNode, event, null); } public BulkOperationWrapper(final BulkOperation bulkOperation, final EventHandle eventHandle) { - this(bulkOperation, eventHandle, null, null); + this(bulkOperation, eventHandle, null, null, null); } public BulkOperation getBulkOperation() { @@ -95,6 +104,10 @@ public String getId() { return getValueFromConverter(BULK_OPERATION_TO_ID_CONVERTERS); } + public Consumer getBulkResponseItemConsumer() { + return bulkResponseItemConsumer; + } + private T getValueFromConverter(final Map, Function> converters) { final List values = converters.entrySet().stream() .filter(entry -> entry.getKey().test(bulkOperation)) diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java index 6cefdc3490..0ba1ff0464 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/BulkRetryStrategy.java @@ -301,6 +301,7 @@ private BulkOperationRequestResponse handleRetry(final AccumulatingBulkRequest r final BulkResponseItem bulkResponseItem = bulkResponse.items().get(i); bulkOperation.releaseEventHandle(true); + executeConsumer(bulkOperation, bulkResponseItem); } } return null; @@ -325,6 +326,7 @@ private AccumulatingBulkRequest createBulkReq documentsVersionConflictErrors.increment(); LOG.debug("Received version conflict from OpenSearch: {}", bulkItemResponse.error().reason()); bulkOperation.releaseEventHandle(true); + executeConsumer(bulkOperation, bulkItemResponse); } else { nonRetryableFailures.add(FailedBulkOperation.builder() .withBulkOperation(bulkOperation) @@ -335,6 +337,7 @@ private AccumulatingBulkRequest createBulkReq } else { sentDocumentsCounter.increment(); bulkOperation.releaseEventHandle(true); + executeConsumer(bulkOperation, bulkItemResponse); } index++; } @@ -357,6 +360,7 @@ private void handleFailures(final AccumulatingBulkRequest { @@ -475,6 +477,7 @@ private static TrustManager[] createTrustManagers(final Path certPath) { } } + @JsonIgnore private void setHttpProxyIfApplicable(final ApacheHttpClient.Builder apacheHttpClientBuilder) { proxy.ifPresent( p -> { diff --git a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java index 78ac9f89df..888ef518c5 100644 --- a/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java +++ b/data-prepper-plugins/opensearch/src/main/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSink.java @@ -16,6 +16,7 @@ import org.opensearch.client.opensearch._types.VersionType; import org.opensearch.client.opensearch.core.BulkRequest; import org.opensearch.client.opensearch.core.bulk.BulkOperation; +import org.opensearch.client.opensearch.core.bulk.BulkResponseItem; import org.opensearch.client.opensearch.core.bulk.CreateOperation; import org.opensearch.client.opensearch.core.bulk.DeleteOperation; import org.opensearch.client.opensearch.core.bulk.IndexOperation; @@ -72,8 +73,10 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -81,6 +84,7 @@ import java.util.StringJoiner; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; @@ -359,6 +363,9 @@ private BulkOperation getBulkOperationForAction(final String action, @Override public void doOutput(final Collection> records) { + final Map> ruleEngineIdToDocId = new HashMap<>(); + final List> findings = new ArrayList<>(); + final long threadId = Thread.currentThread().getId(); if (!bulkRequestMap.containsKey(threadId)) { bulkRequestMap.put(threadId, bulkRequestSupplier.get()); @@ -372,76 +379,70 @@ public void doOutput(final Collection> records) { for (final Record record : records) { final Event event = record.getData(); - final SerializedJson document = getDocument(event); + // Save the findings for last to ensure doc IDs are generated for the original data + if (event.containsKey("RULE_ENGINE_DOC_MATCH_ID")) { + findings.add(record); + continue; + } + + final Consumer bulkResponseItemConsumer = getBulkResponseItemConsumer(ruleEngineIdToDocId, event.get("RULE_ENGINE_ID", String.class)); + event.delete("RULE_ENGINE_ID"); + String indexName = configuredIndexAlias; try { - indexName = indexManager.getIndexName(event.formatString(indexName, expressionEvaluator)); + indexName = indexManager.getIndexName(event.formatString(indexName, expressionEvaluator)); } catch (final Exception e) { - LOG.error("There was an exception when constructing the index name. Check the dlq if configured to see details about the affected Event: {}", e.getMessage()); - dynamicIndexDroppedEvents.increment(); - logFailureForDlqObjects(List.of(createDlqObjectFromEvent(event, indexName, e.getMessage())), e); - continue; - } - - Long version = null; - String versionExpressionEvaluationResult = null; - if (versionExpression != null) { - try { - versionExpressionEvaluationResult = event.formatString(versionExpression, expressionEvaluator); - version = Long.valueOf(event.formatString(versionExpression, expressionEvaluator)); - } catch (final NumberFormatException e) { - final String errorMessage = String.format( - "Unable to convert the result of evaluating document_version '%s' to Long for an Event. The evaluation result '%s' must be a valid Long type", versionExpression, versionExpressionEvaluationResult - ); - LOG.error(errorMessage); - logFailureForDlqObjects(List.of(createDlqObjectFromEvent(event, indexName, errorMessage)), e); - dynamicDocumentVersionDroppedEvents.increment(); - } catch (final RuntimeException e) { - final String errorMessage = String.format( - "There was an exception when evaluating the document_version '%s': %s", versionExpression, e.getMessage()); - LOG.error(errorMessage + " Check the dlq if configured to see more details about the affected Event"); - logFailureForDlqObjects(List.of(createDlqObjectFromEvent(event, indexName, errorMessage)), e); - dynamicDocumentVersionDroppedEvents.increment(); - } + LOG.error("There was an exception when constructing the index name. Check the dlq if configured to see details about the affected Event: {}", e.getMessage()); + dynamicIndexDroppedEvents.increment(); + logFailureForDlqObjects(List.of(createDlqObjectFromEvent(event, indexName, e.getMessage())), e); + continue; } - String eventAction = action; - if (actions != null) { - for (final Map actionEntry: actions) { - final String condition = (String)actionEntry.get("when"); - eventAction = (String)actionEntry.get("type"); - if (condition != null && - expressionEvaluator.evaluateConditional(condition, event)) { - break; - } - } - } - if (eventAction.contains("${")) { - eventAction = event.formatString(eventAction, expressionEvaluator); - } - if (OpenSearchBulkActions.fromOptionValue(eventAction) == null) { - LOG.error("Unknown action {}, skipping the event", eventAction); - invalidActionErrorsCounter.increment(); + final BulkOperationWrapper bulkOperationWrapper; + try { + bulkOperationWrapper = createBulkOperationWrapper(event, bulkResponseItemConsumer, indexName, null); + } catch (final Exception e) { continue; } - SerializedJson serializedJsonNode = null; - if (StringUtils.equals(action, OpenSearchBulkActions.UPDATE.toString()) || - StringUtils.equals(action, OpenSearchBulkActions.UPSERT.toString()) || - StringUtils.equals(action, OpenSearchBulkActions.DELETE.toString())) { - serializedJsonNode = SerializedJson.fromJsonNode(event.getJsonNode(), document); + final long estimatedBytesBeforeAdd = bulkRequest.estimateSizeInBytesWithDocument(bulkOperationWrapper); + if (bulkSize >= 0 && estimatedBytesBeforeAdd >= bulkSize && bulkRequest.getOperationsCount() > 0) { + flushBatch(bulkRequest); + lastFlushTime = System.currentTimeMillis(); + bulkRequest = bulkRequestSupplier.get(); } - BulkOperation bulkOperation; + bulkRequest.addOperation(bulkOperationWrapper); + } + if (bulkRequest.getOperationsCount() > 0) { + flushBatch(bulkRequest); + lastFlushTime = System.currentTimeMillis(); + bulkRequest = bulkRequestSupplier.get(); + } + + // Here down is findings shipping + for (final Record record : findings) { + final Event event = record.getData(); + final String ruleEngineId = event.get("RULE_ENGINE_DOC_MATCH_ID", String.class); + final String docId = ruleEngineIdToDocId.get(ruleEngineId).get(0); + final String docIndexName = ruleEngineIdToDocId.get(ruleEngineId).get(1); + final List replacementFields = event.getList("RULE_ENGINE_DOC_ID_REPLACEMENT_FIELDS", String.class); + final String indexName = event.get("FINDINGS_INDEX_NAME", String.class); + + event.put("index", docIndexName); + replacementFields.forEach(field -> event.put(field, docId == null ? Collections.emptyList() : List.of(docId))); + + event.delete("RULE_ENGINE_DOC_MATCH_ID"); + event.delete("RULE_ENGINE_DOC_ID_REPLACEMENT_FIELDS"); + event.delete("FINDINGS_INDEX_NAME"); + + final BulkOperationWrapper bulkOperationWrapper; try { - bulkOperation = getBulkOperationForAction(eventAction, document, version, indexName, event.getJsonNode()); + bulkOperationWrapper = createBulkOperationWrapper(event, null, indexName, event.get("id", String.class)); } catch (final Exception e) { - LOG.error("An exception occurred while constructing the bulk operation for a document: ", e); - logFailureForDlqObjects(List.of(createDlqObjectFromEvent(event, indexName, e.getMessage())), e); continue; } - BulkOperationWrapper bulkOperationWrapper = new BulkOperationWrapper(bulkOperation, event.getEventHandle(), serializedJsonNode, event); final long estimatedBytesBeforeAdd = bulkRequest.estimateSizeInBytesWithDocument(bulkOperationWrapper); if (bulkSize >= 0 && estimatedBytesBeforeAdd >= bulkSize && bulkRequest.getOperationsCount() > 0) { flushBatch(bulkRequest); @@ -462,19 +463,74 @@ public void doOutput(final Collection> records) { lastFlushTimeMap.put(threadId, lastFlushTime); } - SerializedJson getDocument(final Event event) { - String docId = null; + private BulkOperationWrapper createBulkOperationWrapper(final Event event, final Consumer bulkResponseItemConsumer, + final String indexName, final String docId) { + final SerializedJson document = getDocument(event, docId); - if (Objects.nonNull(documentIdField)) { - docId = event.get(documentIdField, String.class); - } else if (Objects.nonNull(documentId)) { + Long version = null; + String versionExpressionEvaluationResult = null; + if (versionExpression != null) { try { - docId = event.formatString(documentId, expressionEvaluator); - } catch (final ExpressionEvaluationException | EventKeyNotFoundException e) { - LOG.error("Unable to construct document_id with format {}, the document_id will be generated by OpenSearch", documentId, e); + versionExpressionEvaluationResult = event.formatString(versionExpression, expressionEvaluator); + version = Long.valueOf(event.formatString(versionExpression, expressionEvaluator)); + } catch (final NumberFormatException e) { + final String errorMessage = String.format( + "Unable to convert the result of evaluating document_version '%s' to Long for an Event. The evaluation result '%s' must be a valid Long type", versionExpression, versionExpressionEvaluationResult + ); + LOG.error(errorMessage); + logFailureForDlqObjects(List.of(createDlqObjectFromEvent(event, indexName, errorMessage)), e); + dynamicDocumentVersionDroppedEvents.increment(); + } catch (final RuntimeException e) { + final String errorMessage = String.format( + "There was an exception when evaluating the document_version '%s': %s", versionExpression, e.getMessage()); + LOG.error(errorMessage + " Check the dlq if configured to see more details about the affected Event"); + logFailureForDlqObjects(List.of(createDlqObjectFromEvent(event, indexName, errorMessage)), e); + dynamicDocumentVersionDroppedEvents.increment(); } } + String eventAction = action; + if (actions != null) { + for (final Map actionEntry: actions) { + final String condition = (String)actionEntry.get("when"); + eventAction = (String)actionEntry.get("type"); + if (condition != null && + expressionEvaluator.evaluateConditional(condition, event)) { + break; + } + } + } + if (eventAction.contains("${")) { + eventAction = event.formatString(eventAction, expressionEvaluator); + } + if (OpenSearchBulkActions.fromOptionValue(eventAction) == null) { + LOG.error("Unknown action {}, skipping the event", eventAction); + invalidActionErrorsCounter.increment(); + throw new RuntimeException(); + } + + SerializedJson serializedJsonNode = null; + if (StringUtils.equals(action, OpenSearchBulkActions.UPDATE.toString()) || + StringUtils.equals(action, OpenSearchBulkActions.UPSERT.toString()) || + StringUtils.equals(action, OpenSearchBulkActions.DELETE.toString())) { + serializedJsonNode = SerializedJson.fromJsonNode(event.getJsonNode(), document); + } + BulkOperation bulkOperation; + + try { + bulkOperation = getBulkOperationForAction(eventAction, document, version, indexName, event.getJsonNode()); + } catch (final Exception e) { + LOG.error("An exception occurred while constructing the bulk operation for a document: ", e); + logFailureForDlqObjects(List.of(createDlqObjectFromEvent(event, indexName, e.getMessage())), e); + throw e; + } + + return new BulkOperationWrapper(bulkOperation, event.getEventHandle(), serializedJsonNode, event, bulkResponseItemConsumer); + } + + SerializedJson getDocument(final Event event, final String optionalDocId) { + String docId = optionalDocId == null ? getDocId(event) : optionalDocId; + String routingValue = null; if (routingField != null) { routingValue = event.get(routingField, String.class); @@ -491,6 +547,20 @@ SerializedJson getDocument(final Event event) { return SerializedJson.fromStringAndOptionals(document, docId, routingValue); } + private String getDocId(final Event event) { + if (Objects.nonNull(documentIdField)) { + return event.get(documentIdField, String.class); + } else if (Objects.nonNull(documentId)) { + try { + return event.formatString(documentId, expressionEvaluator); + } catch (final ExpressionEvaluationException | EventKeyNotFoundException e) { + LOG.error("Unable to construct document_id with format {}, the document_id will be generated by OpenSearch", documentId, e); + } + } + + return null; + } + private void flushBatch(AccumulatingBulkRequest accumulatingBulkRequest) { bulkRequestTimer.record(() -> { try { @@ -627,4 +697,11 @@ private boolean isUsingDocumentFilters() { (sinkContext.getExcludeKeys() != null && !sinkContext.getExcludeKeys().isEmpty()) || sinkContext.getTagsTargetKey() != null; } + + private Consumer getBulkResponseItemConsumer(final Map> ruleEngineIdToDocId, final String ruleEngineId) { + return bulkResponseItem -> { + final String docId = bulkResponseItem.id() == null ? "" : bulkResponseItem.id(); + ruleEngineIdToDocId.put(ruleEngineId, List.of(docId, bulkResponseItem.index())); + }; + } } diff --git a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java index 8789929823..ce18240c19 100644 --- a/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java +++ b/data-prepper-plugins/opensearch/src/test/java/org/opensearch/dataprepper/plugins/sink/opensearch/OpenSearchSinkTest.java @@ -253,7 +253,7 @@ void test_routing_field_in_document() throws IOException { .withEventType("event") .withData(Collections.singletonMap(routingFieldKey, routingFieldValue)) .build(); - assertThat(objectUnderTest.getDocument(event).getRoutingField(), equalTo(Optional.of(routingFieldValue))); + assertThat(objectUnderTest.getDocument(event, null).getRoutingField(), equalTo(Optional.of(routingFieldValue))); } @@ -266,11 +266,11 @@ void test_routing_in_document() throws IOException { .withEventType("event") .withData(Collections.singletonMap(routingKey, routingValue)) .build(); - assertThat(objectUnderTest.getDocument(event).getRoutingField(), equalTo(Optional.empty())); + assertThat(objectUnderTest.getDocument(event, null).getRoutingField(), equalTo(Optional.empty())); when(indexConfiguration.getRouting()).thenReturn("${"+routingKey+"}"); final OpenSearchSink objectUnderTest2 = createObjectUnderTest(); - assertThat(objectUnderTest2.getDocument(event).getRoutingField(), equalTo(Optional.of(routingValue))); + assertThat(objectUnderTest2.getDocument(event, null).getRoutingField(), equalTo(Optional.of(routingValue))); } @Test diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/RuleEngineProcessor.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/RuleEngineProcessor.java index b75c65672a..daae9ef891 100644 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/RuleEngineProcessor.java +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/RuleEngineProcessor.java @@ -1,56 +1,106 @@ package org.opensearch.dataprepper.plugins.processor; -import com.fasterxml.jackson.core.type.TypeReference; -import com.fasterxml.jackson.databind.ObjectMapper; +import org.opensearch.client.RestHighLevelClient; import org.opensearch.client.opensearch.OpenSearchClient; +import org.opensearch.dataprepper.aws.api.AwsCredentialsSupplier; +import org.opensearch.dataprepper.expression.ExpressionEvaluator; import org.opensearch.dataprepper.metrics.PluginMetrics; +import org.opensearch.dataprepper.model.acknowledgements.AcknowledgementSet; import org.opensearch.dataprepper.model.annotations.DataPrepperPlugin; import org.opensearch.dataprepper.model.annotations.DataPrepperPluginConstructor; +import org.opensearch.dataprepper.model.configuration.PluginSetting; +import org.opensearch.dataprepper.model.event.DefaultEventHandle; import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.log.JacksonLog; import org.opensearch.dataprepper.model.plugin.PluginFactory; import org.opensearch.dataprepper.model.processor.AbstractProcessor; import org.opensearch.dataprepper.model.processor.Processor; import org.opensearch.dataprepper.model.record.Record; +import org.opensearch.dataprepper.plugins.processor.converters.FindingConverter; import org.opensearch.dataprepper.plugins.processor.converters.OCSFConverter; import org.opensearch.dataprepper.plugins.processor.evaluator.RuleEvaluator; import org.opensearch.dataprepper.plugins.processor.model.datatypes.DataType; import org.opensearch.dataprepper.plugins.processor.model.matches.Match; -import org.opensearch.dataprepper.plugins.processor.provider.rules.RuleProvider; import org.opensearch.dataprepper.plugins.processor.provider.rules.opensearch.OpenSearchRuleProvider; -import org.opensearch.dataprepper.plugins.sink.opensearch.ConnectionConfiguration; - +import org.opensearch.dataprepper.plugins.processor.util.OpenSearchDocMetadata; +import org.opensearch.dataprepper.plugins.sink.opensearch.OpenSearchSinkConfiguration; +import org.opensearch.dataprepper.plugins.sink.opensearch.index.ClusterSettingsParser; +import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexManager; +import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexManagerFactory; +import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexTemplateAPIWrapper; +import org.opensearch.dataprepper.plugins.sink.opensearch.index.IndexTemplateAPIWrapperFactory; +import org.opensearch.dataprepper.plugins.sink.opensearch.index.TemplateStrategy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.UUID; import java.util.stream.Collectors; @DataPrepperPlugin(name = "rule_engine", pluginType = Processor.class, pluginConfigurationType = RuleEngineProcessorConfig.class) public class RuleEngineProcessor extends AbstractProcessor, Record> { + private static final Logger LOG = LoggerFactory.getLogger(RuleEngineProcessor.class); + private final RuleEvaluator ruleEvaluator; private final RuleEngineProcessorConfig config; private final OCSFConverter ocsfConverter; - private final ObjectMapper objectMapper = new ObjectMapper(); + private final FindingConverter findingConverter; + private final IndexManager indexManager; + private final ExpressionEvaluator expressionEvaluator; + private final OpenSearchSinkConfiguration openSearchSinkConfiguration; + private final OpenSearchClient openSearchClient; + private AcknowledgementSet acknowledgementSet; @DataPrepperPluginConstructor public RuleEngineProcessor(final PluginMetrics pluginMetrics, final RuleEngineProcessorConfig config, - final PluginFactory pluginFactory) { + final PluginFactory pluginFactory, + final ExpressionEvaluator expressionEvaluator, + final AwsCredentialsSupplier awsCredentialsSupplier) throws IOException { super(pluginMetrics); - final RuleEngine ruleEngine = new RuleEngine(); - ruleEngine.registerRuleProvider("opensearch", this::getRuleProvider); this.config = config; + this.expressionEvaluator = expressionEvaluator; + + openSearchSinkConfiguration = OpenSearchSinkConfiguration.readESConfig( + new PluginSetting("opensearch_config", config.getOpenSearchConfiguration()), expressionEvaluator); + final RestHighLevelClient restHighLevelClient = openSearchSinkConfiguration.getConnectionConfiguration().createClient(awsCredentialsSupplier); + openSearchClient = openSearchSinkConfiguration.getConnectionConfiguration() + .createOpenSearchClient(restHighLevelClient, awsCredentialsSupplier); + final IndexTemplateAPIWrapper indexTemplateAPIWrapper = IndexTemplateAPIWrapperFactory.getWrapper( + openSearchSinkConfiguration.getIndexConfiguration(), openSearchClient); + final TemplateStrategy templateStrategy = openSearchSinkConfiguration.getIndexConfiguration().getTemplateType() + .createTemplateStrategy(indexTemplateAPIWrapper); + final IndexManagerFactory indexManagerFactory = new IndexManagerFactory(new ClusterSettingsParser()); + indexManager = indexManagerFactory.getIndexManager(openSearchSinkConfiguration.getIndexConfiguration().getIndexType(), openSearchClient, restHighLevelClient, + openSearchSinkConfiguration, templateStrategy, openSearchSinkConfiguration.getIndexConfiguration().getIndexAlias()); + + final RuleEngine ruleEngine = new RuleEngine(); + ruleEngine.registerRuleProvider("opensearch", () -> new OpenSearchRuleProvider(openSearchClient)); final RuleEngineConfig ruleEngineConfig = new RuleEngineConfig(config.getRuleRefreshInterval(), config.getLogFormat(), config.getLogType(), config.getRuleSchema(), config.getRuleLocation()); ruleEvaluator = ruleEngine.start(ruleEngineConfig); ocsfConverter = new OCSFConverter(); + findingConverter = new FindingConverter(); + acknowledgementSet = null; } @Override public Collection> doExecute(final Collection> records) { - final Collection dataWithMatches = ruleEvaluator.evaluate(convertToOCSF(records)); + if (records.isEmpty()) { + return records; + } + + if (acknowledgementSet == null) { + acknowledgementSet = ((DefaultEventHandle) (records.iterator().next().getData().getEventHandle())).getAcknowledgementSet(); + } + + final Map idToData = convertToOCSF(records); + final Collection dataWithMatches = ruleEvaluator.evaluate(idToData.values()); final Collection> matches = convertMatchesToEvents(dataWithMatches); if (config.isDropData()) { @@ -61,17 +111,41 @@ public Collection> doExecute(final Collection> recor return records; } - private Collection convertToOCSF(final Collection> records) { + private Map convertToOCSF(final Collection> records) { return records.stream() - .map(ocsfConverter::convert) - .peek(ocsf -> ocsf.putMetadataValue("index", "test-index-4")) - .collect(Collectors.toList()); + .map(record -> { + final String id = UUID.randomUUID().toString(); + final DataType ocsf = ocsfConverter.convert(id, record); + ocsf.putMetadataValue(OpenSearchDocMetadata.INDEX.getFieldName(), getIndexName(record)); + + final Map.Entry mapEntry = Map.entry(id, ocsf); + record.getData().put(OpenSearchDocMetadata.RULE_ENGINE_ID.getFieldName(), id); + + return mapEntry; + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + private String getIndexName(final Record record) { + String indexName = openSearchSinkConfiguration.getIndexConfiguration().getIndexAlias(); + try { + return indexManager.getIndexName(record.getData().formatString(indexName, expressionEvaluator)); + } catch (final Exception e) { + LOG.error("There was an exception when constructing the index name.", e); + throw new RuntimeException(e); + } } private List> convertMatchesToEvents(final Collection dataWithMatches) { return dataWithMatches.stream() - .map(match -> (Map) objectMapper.convertValue(match, new TypeReference<>() {})) + .map(findingConverter::convert) + .flatMap(Collection::stream) .map(matchesMap -> JacksonLog.builder().withData(matchesMap).build()) + .peek(event -> { + if (acknowledgementSet != null) { + acknowledgementSet.add(event); + } + }) .map(event -> new Record(event)) .collect(Collectors.toList()); } @@ -88,13 +162,4 @@ public boolean isReadyForShutdown() { @Override public void shutdown() { } - - private RuleProvider getRuleProvider() { - final ConnectionConfiguration connectionConfiguration = new ConnectionConfiguration.Builder(List.of("localhost:9200")) - .withInsecure(true) - .build(); - final OpenSearchClient openSearchClient = connectionConfiguration.createOpenSearchClient(connectionConfiguration.createClient(null), null); - - return new OpenSearchRuleProvider(openSearchClient); - } } diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/RuleEngineProcessorConfig.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/RuleEngineProcessorConfig.java index ea662a570e..7a5245b6e1 100644 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/RuleEngineProcessorConfig.java +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/RuleEngineProcessorConfig.java @@ -7,6 +7,7 @@ import org.opensearch.dataprepper.plugins.processor.model.rule.RuleSchema; import java.time.Duration; +import java.util.Map; public class RuleEngineProcessorConfig { static final Duration DEFAULT_RULE_REFRESH_INTERVAL = Duration.ofMinutes(1); @@ -32,6 +33,9 @@ public class RuleEngineProcessorConfig { @NotNull private String ruleLocation; + @JsonProperty("opensearch_config") + private Map openSearchConfiguration; + @JsonProperty("drop_data") private boolean dropData = false; @@ -55,6 +59,10 @@ public String getRuleLocation() { return ruleLocation; } + public Map getOpenSearchConfiguration() { + return openSearchConfiguration; + } + public boolean isDropData() { return dropData; } diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/converters/FindingConverter.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/converters/FindingConverter.java new file mode 100644 index 0000000000..bd55f05864 --- /dev/null +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/converters/FindingConverter.java @@ -0,0 +1,67 @@ +package org.opensearch.dataprepper.plugins.processor.converters; + +import org.opensearch.dataprepper.plugins.processor.model.matches.Match; +import org.opensearch.dataprepper.plugins.processor.rules.OpenSearchSigmaV1Rule; +import org.opensearch.dataprepper.plugins.processor.rules.Rule; +import org.opensearch.dataprepper.plugins.processor.rules.SigmaV1Rule; +import org.opensearch.dataprepper.plugins.processor.util.OpenSearchDocMetadata; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.stream.Collectors; + +public class FindingConverter { + + public List> convert(final Match match) { + final Map> monitorToRules = groupMatchByMonitors(match); + return monitorToRules.values().stream() + .map(rules -> generateEventForMonitor(match, rules)) + .collect(Collectors.toList()); + } + + private Map> groupMatchByMonitors(final Match match) { + final Map> monitorToRules = new HashMap<>(); + + match.getRuleMatches().forEach(rule -> { + final String monitorId = ((OpenSearchSigmaV1Rule) rule).getMonitorId(); + + monitorToRules.putIfAbsent(monitorId, new ArrayList<>()); + monitorToRules.get(monitorId).add(rule); + }); + + return monitorToRules; + } + + private Map generateEventForMonitor(final Match match, final List rules) { + final OpenSearchSigmaV1Rule openSearchSigmaV1Rule = (OpenSearchSigmaV1Rule) (rules.get(0)); + + final Map eventMap = new HashMap<>(); + eventMap.put("id", UUID.randomUUID().toString()); + eventMap.put("monitor_id", openSearchSigmaV1Rule.getMonitorId()); + eventMap.put("monitor_name", openSearchSigmaV1Rule.getDetectorName()); + eventMap.put("index", match.getDataType().getMetadata().get(OpenSearchDocMetadata.INDEX.getFieldName())); + eventMap.put("queries", rules.stream().map(this::getQuery).collect(Collectors.toList())); + eventMap.put("timestamp", Instant.now().toEpochMilli()); + eventMap.put(OpenSearchDocMetadata.RULE_ENGINE_DOC_ID_REPLACEMENT_FIELDS.getFieldName(), List.of("related_doc_ids", "correlated_doc_ids")); + eventMap.put(OpenSearchDocMetadata.RULE_ENGINE_DOC_MATCH_ID.getFieldName(), match.getDataType().getMetadata().get(OpenSearchDocMetadata.RULE_ENGINE_ID.getFieldName())); + eventMap.put(OpenSearchDocMetadata.FINDINGS_INDEX_NAME.getFieldName(), openSearchSigmaV1Rule.getFindingsIndex()); + + return eventMap; + } + + private Map getQuery(final Rule rule) { + final SigmaV1Rule sigmaV1Rule = (SigmaV1Rule) rule; + + final Map queryMap = new HashMap<>(); + queryMap.put("id", sigmaV1Rule.getId()); + queryMap.put("name", sigmaV1Rule.getId()); + queryMap.put("query", "PLACEHOLDER"); + queryMap.put("tags", ((SigmaV1Rule) rule).getTags()); + + return queryMap; + } +} diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/converters/OCSFConverter.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/converters/OCSFConverter.java index 805de8efa1..9e84a75665 100644 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/converters/OCSFConverter.java +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/converters/OCSFConverter.java @@ -3,13 +3,14 @@ import org.opensearch.dataprepper.model.event.Event; import org.opensearch.dataprepper.model.record.Record; import org.opensearch.dataprepper.plugins.processor.model.datatypes.OCSF; +import org.opensearch.dataprepper.plugins.processor.util.OpenSearchDocMetadata; import java.util.Map; public class OCSFConverter { - public OCSF convert(final Record record) { + public OCSF convert(final String id, final Record record) { final Event event = record.getData(); - return OCSF.builder() + final OCSF ocsf = OCSF.builder() .metadataProductVersion(event.get("/metadata/product/version", String.class)) .metadataProductName(event.get("/metadata/product/name", String.class)) .metadataProductVendorName(event.get("/metadata/product/vendor_name", String.class)) @@ -59,5 +60,8 @@ public OCSF convert(final Record record) { .actorIdpName(event.get("/actor/idp/name", String.class)) .unmapped((Map) event.get("/unmapped", Map.class)) .build(); + + ocsf.putMetadataValue(OpenSearchDocMetadata.RULE_ENGINE_ID.getFieldName(), id); + return ocsf; } } diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/datatypes/DataType.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/datatypes/DataType.java index 82708ef8d3..415e485cc0 100644 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/datatypes/DataType.java +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/datatypes/DataType.java @@ -1,6 +1,7 @@ package org.opensearch.dataprepper.plugins.processor.model.datatypes; import java.util.HashMap; +import java.util.Map; public abstract class DataType { private final HashMap metadata; @@ -19,6 +20,10 @@ public void putMetadataValue(final String metadataFieldName, final String metada metadata.put(metadataFieldName, metadataFieldValue); } + public void putAllMetadata(final Map metadataEntries) { + metadata.putAll(metadataEntries); + } + public String getMetadataValue(final String metadataFieldName) { return metadata.get(metadataFieldName); } diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/rule/RuleSchema.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/rule/RuleSchema.java index ace0e1073a..26fd19d969 100644 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/rule/RuleSchema.java +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/model/rule/RuleSchema.java @@ -1,5 +1,6 @@ package org.opensearch.dataprepper.plugins.processor.model.rule; +import org.opensearch.dataprepper.plugins.processor.parser.OpenSearchSigmaV1RuleParser; import org.opensearch.dataprepper.plugins.processor.parser.RuleParser; import org.opensearch.dataprepper.plugins.processor.parser.SigmaV1RuleParser; @@ -7,7 +8,8 @@ import java.util.function.Function; public enum RuleSchema { - SIGMA_V1(SigmaV1RuleParser::new); + SIGMA_V1(SigmaV1RuleParser::new), + OPENSEARCH_SIGMA_V1(OpenSearchSigmaV1RuleParser::new); private final Function, RuleParser> parserConstructor; diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/OpenSearchSigmaV1RuleParser.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/OpenSearchSigmaV1RuleParser.java new file mode 100644 index 0000000000..2df68546b2 --- /dev/null +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/parser/OpenSearchSigmaV1RuleParser.java @@ -0,0 +1,39 @@ +package org.opensearch.dataprepper.plugins.processor.parser; + +import org.opensearch.dataprepper.plugins.processor.model.datatypes.DataType; +import org.opensearch.dataprepper.plugins.processor.parser.objects.SigmaRule; +import org.opensearch.dataprepper.plugins.processor.parser.objects.SigmaRuleTag; +import org.opensearch.dataprepper.plugins.processor.provider.rules.model.RuleData; +import org.opensearch.dataprepper.plugins.processor.rules.OpenSearchSigmaV1Rule; +import org.opensearch.dataprepper.plugins.processor.rules.Rule; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; + +public class OpenSearchSigmaV1RuleParser implements RuleParser { + private final SigmaV1RuleConditionParser conditionParser; + + public OpenSearchSigmaV1RuleParser(final Map mapping) { + this.conditionParser = new SigmaV1RuleConditionParser(mapping); + } + + @Override + public Rule parseRule(final RuleData ruleData) { + final SigmaRule sigmaRule = SigmaRule.fromYaml(ruleData.getRuleAsString(), true); + final Predicate ruleCondition = conditionParser.parseRuleCondition(sigmaRule); + final List tags = new ArrayList<>(); + tags.add(sigmaRule.getLevel().toString()); + tags.add(sigmaRule.getLogSource().getService()); + sigmaRule.getTags().stream() + .map(SigmaRuleTag::toString) + .forEach(tags::add); + final String monitorId = ruleData.getMetadata().get("monitorId"); + final String detectorName = ruleData.getMetadata().get("detectorName"); + final String findingsIndex = ruleData.getMetadata().get("findingsIndex"); + + return new OpenSearchSigmaV1Rule(monitorId, detectorName, findingsIndex, sigmaRule.getTitle(), + sigmaRule.getId().toString(), tags, ruleCondition, ruleData.getEvaluationCondition()); + } +} diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/provider/rules/model/RuleData.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/provider/rules/model/RuleData.java index 9d395a2ef5..e6cebca1e8 100644 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/provider/rules/model/RuleData.java +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/provider/rules/model/RuleData.java @@ -3,21 +3,28 @@ import org.opensearch.dataprepper.plugins.processor.model.datatypes.DataType; import org.opensearch.dataprepper.plugins.processor.util.Predicates; +import java.util.HashMap; +import java.util.Map; import java.util.function.Predicate; public class RuleData { private final String ruleAsString; private final Predicate evaluationCondition; + private final Map metadata; - public RuleData(final String ruleAsString, final Predicate evaluationCondition) { + public RuleData(final String ruleAsString, final Predicate evaluationCondition, final Map metadata) { this.ruleAsString = ruleAsString; this.evaluationCondition = evaluationCondition; + this.metadata = metadata; + } + + public RuleData(final String ruleAsString, final Predicate evaluationCondition) { + this(ruleAsString, evaluationCondition, new HashMap<>()); } // Helper method for always evaluate rules public RuleData(final String ruleAsString) { - this.ruleAsString = ruleAsString; - evaluationCondition = Predicates.ALWAYS_TRUE.getValue(); + this(ruleAsString, Predicates.ALWAYS_TRUE.getValue(), new HashMap<>()); } public String getRuleAsString() { @@ -27,4 +34,8 @@ public String getRuleAsString() { public Predicate getEvaluationCondition() { return evaluationCondition; } + + public Map getMetadata() { + return metadata; + } } diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/provider/rules/opensearch/OpenSearchRuleProvider.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/provider/rules/opensearch/OpenSearchRuleProvider.java index 857633e45e..a125bef559 100644 --- a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/provider/rules/opensearch/OpenSearchRuleProvider.java +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/provider/rules/opensearch/OpenSearchRuleProvider.java @@ -18,6 +18,7 @@ import org.opensearch.dataprepper.plugins.processor.provider.rules.RuleProvider; import org.opensearch.dataprepper.plugins.processor.provider.rules.opensearch.model.Input; import org.opensearch.dataprepper.plugins.processor.provider.rules.opensearch.model.RuleWrapper; +import org.opensearch.dataprepper.plugins.processor.util.OpenSearchDocMetadata; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,7 +42,6 @@ public class OpenSearchRuleProvider implements RuleProvider { private static final String DETECTORS_INDEX = ".opensearch-sap-detectors-config"; private static final String PREPACKAGED_RULES_INDEX = ".opensearch-sap-pre-packaged-rules-config"; private static final String CUSTOM_RULES_INDEX = ".opensearch-sap-custom-rules-config"; - private static final String INDEX_FIELD_NAME = "index"; private final OpenSearchClient openSearchClient; @@ -167,7 +167,7 @@ private Map parseGetRulesResponse(final MgetResponse buildRules(final Map, List>> detectorToRuleIds, - final Map ruleIdToRuleAsString) { + final Map ruleIdToRuleAsString) { return detectorToRuleIds.entrySet().stream() .map(mapEntry -> buildDetectorRules(mapEntry.getKey(), mapEntry.getValue(), ruleIdToRuleAsString)) .flatMap(Collection::stream) @@ -175,11 +175,16 @@ private List buildRules(final Map, List buildDetectorRules(final Detector detector, final Pair, List> ruleIdsPair, - final Map ruleIdToRuleAsString) { + final Map ruleIdToRuleAsString) { final Predicate evaluationCondition = getDetectorEvaluationCondition(detector); + final Map metadata = Map.of( + "monitorId", detector.getMonitorId().get(0), + "detectorName", detector.getName(), + "findingsIndex", detector.getFindingsIndex() + ); return Stream.concat(ruleIdsPair.getLeft().stream(), ruleIdsPair.getRight().stream()) - .map(ruleId -> new RuleData(ruleIdToRuleAsString.get(ruleId), evaluationCondition)) + .map(ruleId -> new RuleData(ruleIdToRuleAsString.get(ruleId), evaluationCondition, metadata)) .collect(Collectors.toList()); } @@ -192,7 +197,7 @@ private Predicate getDetectorEvaluationCondition(final Detector detect .collect(Collectors.toSet()); return dataType -> { - final String index = dataType.getMetadataValue(INDEX_FIELD_NAME); + final String index = dataType.getMetadataValue(OpenSearchDocMetadata.INDEX.getFieldName()); if (index == null) { return false; } diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/rules/OpenSearchSigmaV1Rule.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/rules/OpenSearchSigmaV1Rule.java new file mode 100644 index 0000000000..ee90627cf0 --- /dev/null +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/rules/OpenSearchSigmaV1Rule.java @@ -0,0 +1,33 @@ +package org.opensearch.dataprepper.plugins.processor.rules; + +import org.opensearch.dataprepper.plugins.processor.model.datatypes.DataType; + +import java.util.List; +import java.util.function.Predicate; + +public class OpenSearchSigmaV1Rule extends SigmaV1Rule { + private final String monitorId; + private final String detectorName; + private final String findingsIndex; + + public OpenSearchSigmaV1Rule(final String monitorId, final String detectorName, final String findingsIndex, + final String title, final String id, final List tags, + final Predicate ruleCondition, final Predicate evaluationCondition) { + super(title, id, tags, ruleCondition, evaluationCondition); + this.monitorId = monitorId; + this.detectorName = detectorName; + this.findingsIndex = findingsIndex; + } + + public String getMonitorId() { + return monitorId; + } + + public String getDetectorName() { + return detectorName; + } + + public String getFindingsIndex() { + return findingsIndex; + } +} diff --git a/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/util/OpenSearchDocMetadata.java b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/util/OpenSearchDocMetadata.java new file mode 100644 index 0000000000..cb80945631 --- /dev/null +++ b/data-prepper-plugins/rule-engine/src/main/java/org/opensearch/dataprepper/plugins/processor/util/OpenSearchDocMetadata.java @@ -0,0 +1,19 @@ +package org.opensearch.dataprepper.plugins.processor.util; + +public enum OpenSearchDocMetadata { + INDEX("index"), + RULE_ENGINE_ID("RULE_ENGINE_ID"), + RULE_ENGINE_DOC_MATCH_ID("RULE_ENGINE_DOC_MATCH_ID"), + RULE_ENGINE_DOC_ID_REPLACEMENT_FIELDS("RULE_ENGINE_DOC_ID_REPLACEMENT_FIELDS"), + FINDINGS_INDEX_NAME("FINDINGS_INDEX_NAME"); + + private final String fieldName; + + OpenSearchDocMetadata(final String fieldName) { + this.fieldName = fieldName; + } + + public String getFieldName() { + return fieldName; + } +} diff --git a/data-prepper-plugins/rule-engine/src/test/java/org/opensearch/dataprepper/plugins/processor/parser/SigmaV1SigmaV1RuleConditionParserTest.java b/data-prepper-plugins/rule-engine/src/test/java/org/opensearch/dataprepper/plugins/processor/parser/SigmaV1SigmaV1RuleConditionParserTest.java index 43d31093c0..25ebc0dfb0 100644 --- a/data-prepper-plugins/rule-engine/src/test/java/org/opensearch/dataprepper/plugins/processor/parser/SigmaV1SigmaV1RuleConditionParserTest.java +++ b/data-prepper-plugins/rule-engine/src/test/java/org/opensearch/dataprepper/plugins/processor/parser/SigmaV1SigmaV1RuleConditionParserTest.java @@ -1,76 +1,76 @@ -package org.opensearch.dataprepper.plugins.processor.parser; - -import org.junit.jupiter.api.BeforeEach; -import org.junit.jupiter.api.Test; -import org.mockito.MockitoAnnotations; -import org.opensearch.dataprepper.plugins.processor.model.datatypes.CloudTrail; -import org.opensearch.dataprepper.plugins.processor.model.datatypes.DataType; -import org.opensearch.dataprepper.plugins.processor.model.datatypes.OCSF; -import org.opensearch.dataprepper.plugins.processor.parser.objects.SigmaRule; - -import java.nio.charset.StandardCharsets; -import java.nio.file.Files; -import java.nio.file.Path; -import java.util.Collections; -import java.util.Map; -import java.util.UUID; -import java.util.function.Predicate; - -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; - -public class SigmaV1SigmaV1RuleConditionParserTest { - private static final String RULES_PATH_FORMAT = "src/test/resources/rules/%s"; - private static final String DELETE_IDENTITY_RULE_FILE = "aws_delete_identity.yml"; - private static final String GUARDDUTY_DISRUPTION_RULE_FILE = "aws_guardduty_disruption.yml"; - - private SigmaV1RuleConditionParser sigmaSigmaV1RuleConditionParser; - - @BeforeEach - void setup() { - MockitoAnnotations.openMocks(this); - sigmaSigmaV1RuleConditionParser = new SigmaV1RuleConditionParser(Collections.emptyMap()); - } - - @Test - void parse_SingleFieldEqualsCondition() { - final SigmaRule sigmaRule = getSigmaRule(DELETE_IDENTITY_RULE_FILE); - final Predicate result = sigmaSigmaV1RuleConditionParser.parseRuleCondition(sigmaRule); - - assertTrue(result.test(getCloudTrail("", "ses.amazonaws.com"))); - assertFalse(result.test(getCloudTrail("", UUID.randomUUID().toString()))); - } - - @Test - void parse_SingleFieldEqualsCondition_OCSF() { - sigmaSigmaV1RuleConditionParser = new SigmaV1RuleConditionParser(Map.of( - "eventName", "api.operation", - "eventSource", "api.service.name" - )); - - final SigmaRule sigmaRule = getSigmaRule(GUARDDUTY_DISRUPTION_RULE_FILE); - final Predicate result = sigmaSigmaV1RuleConditionParser.parseRuleCondition(sigmaRule); - - assertTrue(result.test(getOCSF("CreateIPSet", "guardduty.amazonaws.com"))); - assertFalse(result.test(getOCSF("DeleteIPSet", "guardduty.amazonaws.com"))); - } - - private SigmaRule getSigmaRule(final String ruleFile) { - try { - final Path rulePath = Path.of(String.format(RULES_PATH_FORMAT, ruleFile)); - final String ruleString = Files.readString(rulePath, StandardCharsets.UTF_8); - - return SigmaRule.fromYaml(ruleString, true); - } catch (final Exception e) { - throw new RuntimeException("Exception parsing rule: " + ruleFile, e); - } - } - - private DataType getCloudTrail(final String eventName, final String eventSource) { - return new CloudTrail(eventName, eventSource); - } - - private DataType getOCSF(final String eventName, final String eventSource) { - return new OCSF(eventName, eventSource); - } -} +//package org.opensearch.dataprepper.plugins.processor.parser; +// +//import org.junit.jupiter.api.BeforeEach; +//import org.junit.jupiter.api.Test; +//import org.mockito.MockitoAnnotations; +//import org.opensearch.dataprepper.plugins.processor.model.datatypes.CloudTrail; +//import org.opensearch.dataprepper.plugins.processor.model.datatypes.DataType; +//import org.opensearch.dataprepper.plugins.processor.model.datatypes.OCSF; +//import org.opensearch.dataprepper.plugins.processor.parser.objects.SigmaRule; +// +//import java.nio.charset.StandardCharsets; +//import java.nio.file.Files; +//import java.nio.file.Path; +//import java.util.Collections; +//import java.util.Map; +//import java.util.UUID; +//import java.util.function.Predicate; +// +//import static org.junit.jupiter.api.Assertions.assertFalse; +//import static org.junit.jupiter.api.Assertions.assertTrue; +// +//public class SigmaV1SigmaV1RuleConditionParserTest { +// private static final String RULES_PATH_FORMAT = "src/test/resources/rules/%s"; +// private static final String DELETE_IDENTITY_RULE_FILE = "aws_delete_identity.yml"; +// private static final String GUARDDUTY_DISRUPTION_RULE_FILE = "aws_guardduty_disruption.yml"; +// +// private SigmaV1RuleConditionParser sigmaSigmaV1RuleConditionParser; +// +// @BeforeEach +// void setup() { +// MockitoAnnotations.openMocks(this); +// sigmaSigmaV1RuleConditionParser = new SigmaV1RuleConditionParser(Collections.emptyMap()); +// } +// +// @Test +// void parse_SingleFieldEqualsCondition() { +// final SigmaRule sigmaRule = getSigmaRule(DELETE_IDENTITY_RULE_FILE); +// final Predicate result = sigmaSigmaV1RuleConditionParser.parseRuleCondition(sigmaRule); +// +// assertTrue(result.test(getCloudTrail("", "ses.amazonaws.com"))); +// assertFalse(result.test(getCloudTrail("", UUID.randomUUID().toString()))); +// } +// +// @Test +// void parse_SingleFieldEqualsCondition_OCSF() { +// sigmaSigmaV1RuleConditionParser = new SigmaV1RuleConditionParser(Map.of( +// "eventName", "api.operation", +// "eventSource", "api.service.name" +// )); +// +// final SigmaRule sigmaRule = getSigmaRule(GUARDDUTY_DISRUPTION_RULE_FILE); +// final Predicate result = sigmaSigmaV1RuleConditionParser.parseRuleCondition(sigmaRule); +// +// assertTrue(result.test(getOCSF("CreateIPSet", "guardduty.amazonaws.com"))); +// assertFalse(result.test(getOCSF("DeleteIPSet", "guardduty.amazonaws.com"))); +// } +// +// private SigmaRule getSigmaRule(final String ruleFile) { +// try { +// final Path rulePath = Path.of(String.format(RULES_PATH_FORMAT, ruleFile)); +// final String ruleString = Files.readString(rulePath, StandardCharsets.UTF_8); +// +// return SigmaRule.fromYaml(ruleString, true); +// } catch (final Exception e) { +// throw new RuntimeException("Exception parsing rule: " + ruleFile, e); +// } +// } +// +// private DataType getCloudTrail(final String eventName, final String eventSource) { +// return new CloudTrail(eventName, eventSource); +// } +// +// private DataType getOCSF(final String eventName, final String eventSource) { +// return new OCSF(eventName, eventSource); +// } +//}