From 5d16838f4e71d527aa929fcee41e82be89141801 Mon Sep 17 00:00:00 2001 From: Price Hiller Date: Fri, 13 Dec 2024 15:00:15 -0600 Subject: [PATCH] feat(aws_cloudwatch_logs sink): allow setting type of log class to create Problem: Prior to this commit it was not possible to specify the log group's class type. The method prior always created a Standard type. ------------------------------------------------------------ Solution: Allow specifying the log group class type via a new field, `group_class` which takes over the `create_missing_group`. deprecation: `create_missing_group` Initial Issue Report: https://github.com/vectordotdev/vector/issues/22008 Closes https://github.com/vectordotdev/vector/issues/22008 --- src/sinks/aws_cloudwatch_logs/config.rs | 39 +++++++++++++++++++ src/sinks/aws_cloudwatch_logs/healthcheck.rs | 2 +- .../aws_cloudwatch_logs/integration_tests.rs | 7 ++++ src/sinks/aws_cloudwatch_logs/request.rs | 20 ++++++---- src/sinks/aws_cloudwatch_logs/service.rs | 13 ++++--- .../sinks/base/aws_cloudwatch_logs.cue | 29 ++++++++------ 6 files changed, 85 insertions(+), 25 deletions(-) diff --git a/src/sinks/aws_cloudwatch_logs/config.rs b/src/sinks/aws_cloudwatch_logs/config.rs index 0dc7f909176201..45350658c7956c 100644 --- a/src/sinks/aws_cloudwatch_logs/config.rs +++ b/src/sinks/aws_cloudwatch_logs/config.rs @@ -76,6 +76,30 @@ where } } +/// Defines the log class to create if missing +/// +/// See https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CloudWatch_Logs_Log_Classes.html +#[configurable_component] +#[derive(Clone, Debug, Default)] +pub enum LogGroupClassDef { + /// Logs that require real-time monitoring or frequently accessed logs + #[default] + Standard, + /// Log class that can be used to cost-effectively consolidate logs + InfrequentAccess, +} + +impl From for aws_sdk_cloudwatchlogs::types::LogGroupClass { + fn from(value: LogGroupClassDef) -> Self { + match value { + LogGroupClassDef::Standard => aws_sdk_cloudwatchlogs::types::LogGroupClass::Standard, + LogGroupClassDef::InfrequentAccess => { + aws_sdk_cloudwatchlogs::types::LogGroupClass::InfrequentAccess + } + } + } +} + /// Configuration for the `aws_cloudwatch_logs` sink. #[configurable_component(sink( "aws_cloudwatch_logs", @@ -115,9 +139,22 @@ pub struct CloudwatchLogsSinkConfig { /// the first stream. /// /// [log_group]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/Working-with-log-groups-and-streams.html + #[configurable(deprecated, metadata(docs::hidden))] #[serde(default = "crate::serde::default_true")] pub create_missing_group: bool, + /// Dynamically create a [log group][log_group] if it does not already exist with the specified + /// [group class][group_class]. + /// + /// This ignores `create_missing_stream` directly after creating the group and creates + /// the first stream. + /// + /// [log_group]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/Working-with-log-groups-and-streams.html + /// [group_class]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CloudWatch_Logs_Log_Classes.html + #[configurable(derived)] + #[serde(default)] + pub group_class: Option, + /// Dynamically create a [log stream][log_stream] if it does not already exist. /// /// [log_stream]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/Working-with-log-groups-and-streams.html @@ -232,12 +269,14 @@ impl GenerateConfig for CloudwatchLogsSinkConfig { } fn default_config(encoding: EncodingConfig) -> CloudwatchLogsSinkConfig { + #[allow(deprecated)] CloudwatchLogsSinkConfig { encoding, group_name: Default::default(), stream_name: Default::default(), region: Default::default(), create_missing_group: true, + group_class: Default::default(), create_missing_stream: true, retention: Default::default(), compression: Default::default(), diff --git a/src/sinks/aws_cloudwatch_logs/healthcheck.rs b/src/sinks/aws_cloudwatch_logs/healthcheck.rs index b8a4dd3017d308..c761a0f3ae8078 100644 --- a/src/sinks/aws_cloudwatch_logs/healthcheck.rs +++ b/src/sinks/aws_cloudwatch_logs/healthcheck.rs @@ -57,7 +57,7 @@ pub async fn healthcheck( if config.group_name.is_dynamic() { info!("Skipping healthcheck log group check: `group_name` is dynamic."); Ok(()) - } else if config.create_missing_group { + } else if config.group_class.is_some() { info!("Skipping healthcheck log group check: `group_name` will be created if missing."); Ok(()) } else { diff --git a/src/sinks/aws_cloudwatch_logs/integration_tests.rs b/src/sinks/aws_cloudwatch_logs/integration_tests.rs index 4e80493adaf88a..b9cde31b54338e 100644 --- a/src/sinks/aws_cloudwatch_logs/integration_tests.rs +++ b/src/sinks/aws_cloudwatch_logs/integration_tests.rs @@ -43,6 +43,7 @@ async fn cloudwatch_insert_log_event() { encoding: TextSerializerConfig::default().into(), create_missing_group: true, create_missing_stream: true, + group_class: Default::default(), retention: Default::default(), compression: Default::default(), batch: Default::default(), @@ -94,6 +95,7 @@ async fn cloudwatch_insert_log_events_sorted() { encoding: TextSerializerConfig::default().into(), create_missing_group: true, create_missing_stream: true, + group_class: Default::default(), retention: Default::default(), compression: Default::default(), batch: Default::default(), @@ -170,6 +172,7 @@ async fn cloudwatch_insert_out_of_range_timestamp() { encoding: TextSerializerConfig::default().into(), create_missing_group: true, create_missing_stream: true, + group_class: Default::default(), retention: Default::default(), compression: Default::default(), batch: Default::default(), @@ -247,6 +250,7 @@ async fn cloudwatch_dynamic_group_and_stream_creation() { encoding: TextSerializerConfig::default().into(), create_missing_group: true, create_missing_stream: true, + group_class: Default::default(), retention: Default::default(), compression: Default::default(), batch: Default::default(), @@ -303,6 +307,7 @@ async fn cloudwatch_insert_log_event_batched() { encoding: TextSerializerConfig::default().into(), create_missing_group: true, create_missing_stream: true, + group_class: Default::default(), retention: Default::default(), compression: Default::default(), batch, @@ -354,6 +359,7 @@ async fn cloudwatch_insert_log_event_partitioned() { encoding: TextSerializerConfig::default().into(), create_missing_group: true, create_missing_stream: true, + group_class: Default::default(), retention: Default::default(), compression: Default::default(), batch: Default::default(), @@ -447,6 +453,7 @@ async fn cloudwatch_healthcheck() { encoding: TextSerializerConfig::default().into(), create_missing_group: true, create_missing_stream: true, + group_class: Default::default(), retention: Default::default(), compression: Default::default(), batch: Default::default(), diff --git a/src/sinks/aws_cloudwatch_logs/request.rs b/src/sinks/aws_cloudwatch_logs/request.rs index 60d9e88a6bb051..f1a09619de8190 100644 --- a/src/sinks/aws_cloudwatch_logs/request.rs +++ b/src/sinks/aws_cloudwatch_logs/request.rs @@ -12,7 +12,7 @@ use aws_sdk_cloudwatchlogs::{ put_log_events::{PutLogEventsError, PutLogEventsOutput}, put_retention_policy::PutRetentionPolicyError, }, - types::InputLogEvent, + types::{InputLogEvent, LogGroupClass}, Client as CloudwatchLogsClient, }; use aws_smithy_runtime_api::client::{orchestrator::HttpResponse, result::SdkError}; @@ -38,6 +38,7 @@ struct Client { client: CloudwatchLogsClient, stream_name: String, group_name: String, + group_class: Option, headers: IndexMap, retention_days: u32, } @@ -60,7 +61,7 @@ impl CloudwatchFuture { headers: IndexMap, stream_name: String, group_name: String, - create_missing_group: bool, + group_class: Option, create_missing_stream: bool, retention: Retention, mut events: Vec>, @@ -68,10 +69,12 @@ impl CloudwatchFuture { token_tx: oneshot::Sender>, ) -> Self { let retention_days = retention.days; + let create_missing_group = group_class.is_some(); let client = Client { client, stream_name, group_name, + group_class, headers, retention_days, }; @@ -288,12 +291,15 @@ impl Client { pub fn create_log_group(&self) -> ClientResult<(), CreateLogGroupError> { let client = self.client.clone(); let group_name = self.group_name.clone(); + let group_class = self.group_class.clone(); + Box::pin(async move { - client - .create_log_group() - .log_group_name(group_name) - .send() - .await?; + let mut client_log_group_builder = client.create_log_group().log_group_name(group_name); + client_log_group_builder = match group_class { + Some(class) => client_log_group_builder.log_group_class(class), + None => client_log_group_builder, + }; + client_log_group_builder.send().await?; Ok(()) }) } diff --git a/src/sinks/aws_cloudwatch_logs/service.rs b/src/sinks/aws_cloudwatch_logs/service.rs index ab8ea09daf5515..def7790b7a26a0 100644 --- a/src/sinks/aws_cloudwatch_logs/service.rs +++ b/src/sinks/aws_cloudwatch_logs/service.rs @@ -10,7 +10,7 @@ use aws_sdk_cloudwatchlogs::{ describe_log_streams::DescribeLogStreamsError, put_log_events::PutLogEventsError, put_retention_policy::PutRetentionPolicyError, }, - types::InputLogEvent, + types::{InputLogEvent, LogGroupClass}, Client as CloudwatchLogsClient, }; use aws_smithy_runtime_api::client::{orchestrator::HttpResponse, result::SdkError}; @@ -235,7 +235,10 @@ impl CloudwatchLogsSvc { let group_name = key.group.clone(); let stream_name = key.stream.clone(); - let create_missing_group = config.create_missing_group; + let group_class = match config.group_class { + Some(class) => Some(LogGroupClass::from(class)), + None => None, + }; let create_missing_stream = config.create_missing_stream; let retention = config.retention.clone(); @@ -245,7 +248,7 @@ impl CloudwatchLogsSvc { client, stream_name, group_name, - create_missing_group, + group_class, create_missing_stream, retention, token: None, @@ -319,7 +322,7 @@ impl Service> for CloudwatchLogsSvc { self.headers.clone(), self.stream_name.clone(), self.group_name.clone(), - self.create_missing_group, + self.group_class.clone(), self.create_missing_stream, self.retention.clone(), event_batches, @@ -337,7 +340,7 @@ pub struct CloudwatchLogsSvc { headers: IndexMap, stream_name: String, group_name: String, - create_missing_group: bool, + group_class: Option, create_missing_stream: bool, retention: Retention, token: Option, diff --git a/website/cue/reference/components/sinks/base/aws_cloudwatch_logs.cue b/website/cue/reference/components/sinks/base/aws_cloudwatch_logs.cue index 9e9aa670e2eb2e..4d58daa97eb3b8 100644 --- a/website/cue/reference/components/sinks/base/aws_cloudwatch_logs.cue +++ b/website/cue/reference/components/sinks/base/aws_cloudwatch_logs.cue @@ -198,18 +198,6 @@ base: components: sinks: aws_cloudwatch_logs: configuration: { } } } - create_missing_group: { - description: """ - Dynamically create a [log group][log_group] if it does not already exist. - - This ignores `create_missing_stream` directly after creating the group and creates - the first stream. - - [log_group]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/Working-with-log-groups-and-streams.html - """ - required: false - type: bool: default: true - } create_missing_stream: { description: """ Dynamically create a [log stream][log_stream] if it does not already exist. @@ -566,6 +554,23 @@ base: components: sinks: aws_cloudwatch_logs: configuration: { required: false type: string: examples: ["http://127.0.0.0:5000/path/to/service"] } + group_class: { + description: """ + Dynamically create a [log group][log_group] if it does not already exist with the specified + [group class][group_class]. + + This ignores `create_missing_stream` directly after creating the group and creates + the first stream. + + [log_group]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/Working-with-log-groups-and-streams.html + [group_class]: https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CloudWatch_Logs_Log_Classes.html + """ + required: false + type: string: enum: { + InfrequentAccess: "Log class that can be used to cost-effectively consolidate logs" + Standard: "Logs that require real-time monitoring or frequently accessed logs" + } + } group_name: { description: """ The [group name][group_name] of the target CloudWatch Logs stream.