Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ENH: support plugin loading in conifg #4974

Conversation

chenqi0805
Copy link
Collaborator

@chenqi0805 chenqi0805 commented Sep 24, 2024

Description

This PR

  • introduces @UsesDataPrepperPlugin annotation for PluginModel attribute in plugin config
  • validate pluginModel name against allowed pluginNames in UsesDataPrepperPlugin::pluginType by reflection
  • generate anyOf schemas for embedded pluginModel attribute to represent enum schemas

e.g. with the change in the PR, the aggregate processor schema will look like

{
  "$schema" : "https://json-schema.org/draft/2020-12/schema",
  "type" : "object",
  "properties" : {
    "identification_keys" : {
      "description" : "An unordered list by which to group events. Events with the same values as these keys are put into the same group. If an event does not contain one of the identification_keys, then the value of that key is considered to be equal to null. At least one identification_key is required (for example, [\"sourceIp\", \"destinationIp\", \"port\"].",
      "minItems" : 1,
      "type" : "array",
      "items" : {
        "type" : "string"
      }
    },
    "group_duration" : {
      "type" : "string",
      "format" : "duration",
      "description" : "The amount of time that a group should exist before it is concluded automatically. Supports ISO_8601 notation strings (\"PT20.345S\", \"PT15M\", etc.) as well as simple notation for seconds (\"60s\") and milliseconds (\"1500ms\"). Default value is 180s."
    },
    "action" : {
      "anyOf" : [ {
        "type" : "object",
        "properties" : {
          "tail_sampler" : {
            "$schema" : "https://json-schema.org/draft/2020-12/schema",
            "type" : "object",
            "properties" : {
              "condition" : {
                "type" : "string",
                "description" : "A Data Prepper [conditional expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/), such as '/some-key == \"test\"', that will be evaluated to determine whether the event is an error event or not"
              },
              "percent" : {
                "type" : "integer",
                "description" : "Percent value to use for sampling non error events. 0.0 < percent < 100.0"
              },
              "wait_period" : {
                "type" : "string",
                "format" : "duration",
                "description" : "Period to wait before considering that a trace event is complete"
              }
            },
            "required" : [ "percent", "wait_period" ]
          }
        },
        "description" : "The action to be performed on each group. One of the available aggregate actions must be provided."
      }, {
        "type" : "object",
        "properties" : {
          "rate_limiter" : {
            "$schema" : "https://json-schema.org/draft/2020-12/schema",
            "type" : "object",
            "properties" : {
              "events_per_second" : {
                "type" : "integer",
                "description" : "The number of events allowed per second."
              },
              "when_exceeds" : {
                "type" : "string",
                "description" : "Indicates what action the rate_limiter takes when the number of events received is greater than the number of events allowed per second. Default value is block, which blocks the processor from running after the maximum number of events allowed per second is reached until the next second. Alternatively, the drop option drops the excess events received in that second. Default is block"
              }
            },
            "required" : [ "events_per_second" ]
          }
        },
        "description" : "The action to be performed on each group. One of the available aggregate actions must be provided."
      }, {
        "type" : "object",
        "properties" : {
          "put_all" : {
            "$schema" : "https://json-schema.org/draft/2020-12/schema",
            "type" : "object",
            "properties" : {
              "name" : {
                "type" : "string"
              },
              "pipeline_name" : {
                "type" : "string"
              },
              "process_workers" : {
                "type" : "integer"
              },
              "settings" : {
                "type" : "object"
              }
            }
          }
        },
        "description" : "The action to be performed on each group. One of the available aggregate actions must be provided."
      }, {
        "type" : "object",
        "properties" : {
          "histogram" : {
            "$schema" : "https://json-schema.org/draft/2020-12/schema",
            "type" : "object",
            "properties" : {
              "buckets" : {
                "description" : "A list of buckets (values of type double) indicating the buckets in the histogram.",
                "type" : "array",
                "items" : {
                  "type" : "number"
                }
              },
              "generated_key_prefix" : {
                "type" : "string",
                "description" : "Key prefix used by all the fields created in the aggregated event. Having a prefix ensures that the names of the histogram event do not conflict with the field names in the event."
              },
              "key" : {
                "type" : "string",
                "description" : "Name of the field in the events the histogram generates."
              },
              "metric_name" : {
                "type" : "string",
                "description" : "Metric name to be used when otel format is used."
              },
              "output_format" : {
                "type" : "string",
                "description" : "Format of the aggregated event. otel_metrics is the default output format which outputs in OTel metrics SUM type with count as value. Other options is - raw - which generates a JSON object with the count_key field as a count value and the start_time_key field with aggregation start time as value."
              },
              "record_minmax" : {
                "type" : "boolean",
                "description" : "A Boolean value indicating whether the histogram should include the min and max of the values in the aggregation."
              },
              "units" : {
                "type" : "string",
                "description" : "The name of units for the values in the key. For example, bytes, traces etc"
              }
            },
            "required" : [ "buckets", "key", "units" ]
          }
        },
        "description" : "The action to be performed on each group. One of the available aggregate actions must be provided."
      }, {
        "type" : "object",
        "properties" : {
          "count" : {
            "$schema" : "https://json-schema.org/draft/2020-12/schema",
            "type" : "object",
            "properties" : {
              "count_key" : {
                "type" : "string",
                "description" : "Key used for storing the count. Default name is aggr._count."
              },
              "end_time_key" : {
                "type" : "string",
                "description" : "Key used for storing the end time. Default name is aggr._end_time."
              },
              "metric_name" : {
                "type" : "string",
                "description" : "Metric name to be used when otel format is used."
              },
              "output_format" : {
                "type" : "string",
                "description" : "Format of the aggregated event. otel_metrics is the default output format which outputs in OTel metrics SUM type with count as value. Other options is - raw - which generates a JSON object with the count_key field as a count value and the start_time_key field with aggregation start time as value."
              },
              "start_time_key" : {
                "type" : "string",
                "description" : "Key used for storing the start time. Default name is aggr._start_time."
              },
              "unique_keys" : {
                "description" : "List of unique keys to count.",
                "type" : "array",
                "items" : {
                  "type" : "string"
                }
              }
            }
          }
        },
        "description" : "The action to be performed on each group. One of the available aggregate actions must be provided."
      }, {
        "type" : "object",
        "properties" : {
          "percent_sampler" : {
            "$schema" : "https://json-schema.org/draft/2020-12/schema",
            "type" : "object",
            "properties" : {
              "percent" : {
                "type" : "number",
                "description" : "The percentage of events to be processed during a one second interval. Must be greater than 0.0 and less than 100.0"
              }
            },
            "required" : [ "percent" ]
          }
        },
        "description" : "The action to be performed on each group. One of the available aggregate actions must be provided."
      }, {
        "type" : "object",
        "properties" : {
          "remove_duplicates" : {
            "$schema" : "https://json-schema.org/draft/2020-12/schema",
            "type" : "object",
            "properties" : {
              "name" : {
                "type" : "string"
              },
              "pipeline_name" : {
                "type" : "string"
              },
              "process_workers" : {
                "type" : "integer"
              },
              "settings" : {
                "type" : "object"
              }
            }
          }
        },
        "description" : "The action to be performed on each group. One of the available aggregate actions must be provided."
      }, {
        "type" : "object",
        "properties" : {
          "append" : {
            "$schema" : "https://json-schema.org/draft/2020-12/schema",
            "type" : "object",
            "properties" : {
              "keys_to_append" : {
                "description" : "A list of keys to append to for the aggregated result.",
                "type" : "array",
                "items" : {
                  "type" : "string"
                }
              }
            }
          }
        },
        "description" : "The action to be performed on each group. One of the available aggregate actions must be provided."
      } ]
    },
    "local_mode" : {
      "type" : "boolean",
      "description" : "When local_mode is set to true, the aggregation is performed locally on each Data Prepper node instead of forwarding events to a specific node based on the identification_keys using a hash function. Default is false."
    },
    "output_unaggregated_events" : {
      "type" : "boolean",
      "description" : "A boolean indicating if the unaggregated events should be forwarded to the next processor/sink in the chain."
    },
    "aggregated_events_tag" : {
      "type" : "string",
      "description" : "Tag to be used for aggregated events to distinguish aggregated events from unaggregated events."
    },
    "aggregate_when" : {
      "type" : "string",
      "description" : "A Data Prepper [conditional expression](https://opensearch.org/docs/latest/data-prepper/pipelines/expression-syntax/), such as '/some-key == \"test\"', that will be evaluated to determine whether the processor will be run on the event."
    }
  },
  "required" : [ "identification_keys", "action", "local_mode" ],
  "description" : "The `aggregate` processor groups events based on the values of identification_keys. Then, the processor performs an action on each group, helping reduce unnecessary log volume and creating aggregated logs over time.",
  "name" : "aggregate",
  "documentation" : "https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/processors/aggregate/"
}

Issues Resolved

Contributes toward #4838

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@@ -36,7 +47,7 @@
@JsonSerialize(using = PluginModel.PluginModelSerializer.class)
@JsonDeserialize(using = PluginModel.PluginModelDeserializer.class)
public class PluginModel {

static final String DEFAULT_PLUGINS_CLASSPATH = "org.opensearch.dataprepper.plugins";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this approach is the right one for a few reasons:

  1. This is leaking information on the plugins classpath out of core and into Data Prepper API.
  2. This class deals with basic serialization and this approach is coupling it with the plugin framework to some degree.
  3. This duplicates concerns and code from the plugin framework.
  4. The plugin framework supports loading plugins from other packages as well, and this does not honor that.

This class - PluginModel - should remain concerned only with the most basic form of a plugin.

Use the plugin framework to detect this metadata. Then, have the schema converter use the plugin framework to get the metadata out of there. It may necessitate new method method on PluginFactory (though I'm not sure on this yet).

.orElse(null));
}

private void overrideDataPrepperPluginTypeAttribute(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scanning for plugins should only be done in the plugin framework. Make modifications there and use those.

* @return The Java class
* @since 1.2
*/
Class<?> pluginType();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably don't need this if we support it on the target itself.

Signed-off-by: George Chen <[email protected]>
Signed-off-by: George Chen <[email protected]>
Signed-off-by: George Chen <[email protected]>
@@ -12,9 +12,10 @@ dependencies {
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jsr310'
implementation 'com.fasterxml.jackson.datatype:jackson-datatype-jdk8'
implementation libs.reflections.core
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We shouldn't need this anymore.

@@ -54,7 +54,7 @@ dependencyResolutionManagement {
library('bouncycastle-bcpkix', 'org.bouncycastle', 'bcpkix-jdk18on').versionRef('bouncycastle')
version('guava', '32.1.2-jre')
library('guava-core', 'com.google.guava', 'guava').versionRef('guava')
version('reflections', '0.9.12')
version('reflections', '0.10.2')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this update!

Copy link
Member

@dlvenable dlvenable left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @chenqi0805 for this improvement!

@chenqi0805 chenqi0805 merged commit 7bac644 into opensearch-project:main Oct 2, 2024
71 of 74 checks passed
@chenqi0805 chenqi0805 deleted the enh/4838-support-plugin-loading-in-conifg branch October 2, 2024 21:01
sb2k16 pushed a commit to sb2k16/data-prepper that referenced this pull request Oct 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants