diff --git a/Cargo.lock b/Cargo.lock index ca3f11b..9968934 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1916,19 +1916,18 @@ dependencies = [ [[package]] name = "rustls-pemfile" -version = "2.1.2" +version = "2.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29993a25686778eb88d4189742cd713c9bce943bc54251a33509dc63cbacf73d" +checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50" dependencies = [ - "base64 0.22.1", "rustls-pki-types", ] [[package]] name = "rustls-pki-types" -version = "1.7.0" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "976295e77ce332211c0d24d92c0e83e50f5c5f046d11082cea19f3df13a3562d" +checksum = "16f1201b3c9a7ee8039bcadc17b7e605e2945b27eee7631788c1bd2b0643674b" [[package]] name = "rustls-webpki" diff --git a/cli/src/skell.rs b/cli/src/skell.rs index 3f52781..9bd9eaa 100644 --- a/cli/src/skell.rs +++ b/cli/src/skell.rs @@ -178,10 +178,19 @@ fn get_outputs() -> String { # driver = "Tcp" # format = "Raw" -# Tcp driver has the following paramters: -# - addr (required): Hostname or IP Address to send events to +# Tcp driver has the following parameters: +# - host (required): Hostname or IP Address to send events to # - port (required): Tcp port to send events to -# config = { addr = "localhost", port = 5000 } +# - tls_enabled (optional, defaults to false): wrap the TCP stream in a TLS session. +# Must be set for other tls_ options to take effect +# - tls_certificate_authorities (optional, defaults to undefined): Validate server certificate +# chain against these authorities. You can define multiple files or paths. +# All the certificates will be read and added to the trust store. +# - tls_certificate (optional, defaults to undefined): Path to certificate in PEM format. +# This certificate will be presented to the server. +# - tls_key (optional, defaults to undefined): Path to the private key corresponding to the +# specified certificate (PEM format). +# config = { host = "localhost", port = 5000 } # Configure a Redis output diff --git a/cli/src/subscriptions.rs b/cli/src/subscriptions.rs index 114d0f2..7ed4af6 100644 --- a/cli/src/subscriptions.rs +++ b/cli/src/subscriptions.rs @@ -740,7 +740,14 @@ fn outputs_add_tcp(matches: &ArgMatches) -> Result { .ok_or_else(|| anyhow!("Missing TCP port"))?; info!("Adding TCP output: {}:{}", addr, port); - Ok(TcpConfiguration::new(addr.clone(), *port)) + Ok(TcpConfiguration::new( + addr.clone(), + *port, + false, + Vec::new(), + None, + None, + )?) } fn outputs_add_redis(matches: &ArgMatches) -> Result { @@ -1101,4 +1108,4 @@ fn check_subscriptions_ro(settings: &Settings) -> Result<()> { fn deprecated_cli_warn() { warn!("Using commands to manage subscriptions and there outputs is deprecated and will be removed in future releases. Use subscription configuration files instead.") -} \ No newline at end of file +} diff --git a/common/src/models/config.rs b/common/src/models/config.rs index 79372a3..3c6376a 100644 --- a/common/src/models/config.rs +++ b/common/src/models/config.rs @@ -1,13 +1,13 @@ use std::collections::{HashMap, HashSet}; use anyhow::{bail, Context, Result}; -use log::error; use serde::Deserialize; use uuid::Uuid; -use crate::{subscription::{ - SubscriptionData, DEFAULT_OUTPUT_ENABLED, -}, transformers::output_files_use_path::transform_files_config_to_path}; +use crate::{ + subscription::{SubscriptionData, DEFAULT_OUTPUT_ENABLED}, + transformers::output_files_use_path::transform_files_config_to_path, +}; #[derive(Debug, Clone, Deserialize, Eq, PartialEq)] #[serde(deny_unknown_fields)] @@ -36,16 +36,44 @@ impl From for crate::subscription::RedisConfiguration { } } +#[derive(Debug, Clone, Deserialize, Eq, PartialEq)] +#[serde(untagged)] +enum StringOrVecString { + String(String), + Vec(Vec), +} + #[derive(Debug, Clone, Deserialize, Eq, PartialEq)] #[serde(deny_unknown_fields)] struct TcpConfiguration { - pub addr: String, + // Stay compatible with old 'addr' attribute + #[serde(alias = "addr")] + pub host: String, pub port: u16, + pub tls_enabled: Option, + // Accept String or Vec + pub tls_certificate_authorities: Option, + pub tls_certificate: Option, + pub tls_key: Option, } -impl From for crate::subscription::TcpConfiguration { - fn from(value: TcpConfiguration) -> Self { - crate::subscription::TcpConfiguration::new(value.addr, value.port) +impl TryFrom for crate::subscription::TcpConfiguration { + type Error = anyhow::Error; + + fn try_from(value: TcpConfiguration) -> std::result::Result { + crate::subscription::TcpConfiguration::new( + value.host.clone(), + value.port, + value.tls_enabled.unwrap_or(false), + match &value.tls_certificate_authorities { + Some(StringOrVecString::String(s)) => Vec::from([s.clone()]), + Some(StringOrVecString::Vec(v)) => v.clone(), + _ => Vec::new(), + }, + value.tls_certificate.clone(), + value.tls_key.clone(), + ) + .with_context(|| format!("Loading {:?}", value)) } } @@ -59,21 +87,27 @@ struct FilesConfiguration { pub filename: Option, } -impl From for crate::subscription::FilesConfiguration { - fn from(value: FilesConfiguration) -> Self { +impl TryFrom for crate::subscription::FilesConfiguration { + type Error = anyhow::Error; + + fn try_from(value: FilesConfiguration) -> std::result::Result { let path = match value.path { Some(path) => path, - None => { - match transform_files_config_to_path(&value.base, &value.split_on_addr_index, &value.append_node_name, &value.filename) { - Ok(path) => path, - Err(err) => { - error!("Failed to import Files configuration {:?}: {:?}", value, err); - String::new() - } - } - } + None => transform_files_config_to_path( + &value.base, + &value.split_on_addr_index, + &value.append_node_name, + &value.filename, + ) + .map_err(|err| { + anyhow::anyhow!( + "Failed to import Files configuration {:?}: {:?}", + value, + err + ) + })?, }; - crate::subscription::FilesConfiguration::new(path) + Ok(crate::subscription::FilesConfiguration::new(path)) } } @@ -99,17 +133,19 @@ enum SubscriptionOutputDriver { UnixDatagram(UnixDatagramConfiguration), } -impl From for crate::subscription::SubscriptionOutputDriver { - fn from(value: SubscriptionOutputDriver) -> Self { - match value { +impl TryFrom for crate::subscription::SubscriptionOutputDriver { + type Error = anyhow::Error; + + fn try_from(value: SubscriptionOutputDriver) -> std::result::Result { + Ok(match value { SubscriptionOutputDriver::Files(config) => { - crate::subscription::SubscriptionOutputDriver::Files(config.into()) + crate::subscription::SubscriptionOutputDriver::Files(config.try_into()?) } SubscriptionOutputDriver::Kafka(config) => { crate::subscription::SubscriptionOutputDriver::Kafka(config.into()) } SubscriptionOutputDriver::Tcp(config) => { - crate::subscription::SubscriptionOutputDriver::Tcp(config.into()) + crate::subscription::SubscriptionOutputDriver::Tcp(config.try_into()?) } SubscriptionOutputDriver::Redis(config) => { crate::subscription::SubscriptionOutputDriver::Redis(config.into()) @@ -117,7 +153,7 @@ impl From for crate::subscription::SubscriptionOutputD SubscriptionOutputDriver::UnixDatagram(config) => { crate::subscription::SubscriptionOutputDriver::UnixDatagram(config.into()) } - } + }) } } @@ -130,13 +166,15 @@ struct SubscriptionOutput { pub enabled: Option, } -impl From for crate::subscription::SubscriptionOutput { - fn from(value: SubscriptionOutput) -> Self { - crate::subscription::SubscriptionOutput::new( +impl TryFrom for crate::subscription::SubscriptionOutput { + type Error = anyhow::Error; + + fn try_from(value: SubscriptionOutput) -> std::result::Result { + Ok(crate::subscription::SubscriptionOutput::new( value.format.into(), - value.driver.into(), + value.driver.try_into()?, value.enabled.unwrap_or(DEFAULT_OUTPUT_ENABLED), - ) + )) } } @@ -145,7 +183,7 @@ enum SubscriptionOutputFormat { Json, Raw, RawJson, - Nxlog + Nxlog, } impl From for crate::subscription::SubscriptionOutputFormat { @@ -153,8 +191,10 @@ impl From for crate::subscription::SubscriptionOutputF match value { SubscriptionOutputFormat::Json => crate::subscription::SubscriptionOutputFormat::Json, SubscriptionOutputFormat::Raw => crate::subscription::SubscriptionOutputFormat::Raw, - SubscriptionOutputFormat::RawJson => crate::subscription::SubscriptionOutputFormat::RawJson, - SubscriptionOutputFormat::Nxlog => crate::subscription::SubscriptionOutputFormat::Nxlog + SubscriptionOutputFormat::RawJson => { + crate::subscription::SubscriptionOutputFormat::RawJson + } + SubscriptionOutputFormat::Nxlog => crate::subscription::SubscriptionOutputFormat::Nxlog, } } } @@ -303,7 +343,12 @@ impl TryFrom for crate::subscription::SubscriptionData { } for output in subscription.outputs.iter() { - data.add_output(output.clone().into()); + data.add_output(output.clone().try_into().with_context(|| { + format!( + "Loading subscription {} output {:?}", + subscription.name, output + ) + })?); } if let Some(options) = subscription.options { @@ -391,6 +436,41 @@ enabled = true addr = "127.0.0.1" port = 8080 +[[outputs]] +driver = "Tcp" +format = "Json" +enabled = true + +[outputs.config] +host = "localhost" +port = 8080 +tls_enabled = true +tls_certificate_authorities = "test_path" + +[[outputs]] +driver = "Tcp" +format = "Json" +enabled = true + +[outputs.config] +host = "localhost" +port = 8080 +tls_enabled = true +tls_certificate_authorities = ["test_path1", "test_path2"] + +[[outputs]] +driver = "Tcp" +format = "Json" +enabled = true + +[outputs.config] +host = "localhost" +port = 8080 +tls_enabled = true +tls_certificate_authorities = ["test_path1", "test_path2"] +tls_certificate = "test_cert_path" +tls_key = "test_key_path" + ## Redis output [[outputs]] driver = "Redis" @@ -452,7 +532,7 @@ path = "/whatever/you/{ip}/want/{principal}/{ip:2}/{node}/end" crate::subscription::SubscriptionOutputFormat::Json, crate::subscription::SubscriptionOutputDriver::Files( crate::subscription::FilesConfiguration::new( - "/tmp/{ip:2}/{ip:3}/{ip}/{principal}/{node}/courgette".to_string() + "/tmp/{ip:2}/{ip:3}/{ip}/{principal}/{node}/courgette".to_string(), ), ), true, @@ -470,7 +550,56 @@ path = "/whatever/you/{ip}/want/{principal}/{ip:2}/{node}/end" crate::subscription::SubscriptionOutput::new( crate::subscription::SubscriptionOutputFormat::RawJson, crate::subscription::SubscriptionOutputDriver::Tcp( - crate::subscription::TcpConfiguration::new("127.0.0.1".to_string(), 8080), + crate::subscription::TcpConfiguration::new( + "127.0.0.1".to_string(), + 8080, + false, + Vec::new(), + None, + None, + )?, + ), + true, + ), + crate::subscription::SubscriptionOutput::new( + crate::subscription::SubscriptionOutputFormat::Json, + crate::subscription::SubscriptionOutputDriver::Tcp( + crate::subscription::TcpConfiguration::new( + "localhost".to_string(), + 8080, + true, + vec!["test_path".to_string()], + None, + None, + )?, + ), + true, + ), + crate::subscription::SubscriptionOutput::new( + crate::subscription::SubscriptionOutputFormat::Json, + crate::subscription::SubscriptionOutputDriver::Tcp( + crate::subscription::TcpConfiguration::new( + "localhost".to_string(), + 8080, + true, + vec!["test_path1".to_string(), "test_path2".to_string()], + None, + None, + )?, + ), + true, + ), + crate::subscription::SubscriptionOutput::new( + crate::subscription::SubscriptionOutputFormat::Json, + crate::subscription::SubscriptionOutputDriver::Tcp( + crate::subscription::TcpConfiguration::new( + "localhost".to_string(), + 8080, + true, + vec!["test_path1".to_string(), "test_path2".to_string()], + Some("test_cert_path".to_string()), + Some("test_key_path".to_string()), + )?, ), true, ), @@ -497,7 +626,7 @@ path = "/whatever/you/{ip}/want/{principal}/{ip:2}/{node}/end" crate::subscription::SubscriptionOutputFormat::Json, crate::subscription::SubscriptionOutputDriver::Files( crate::subscription::FilesConfiguration::new( - "/whatever/you/{ip}/want/{principal}/{ip:2}/{node}/end".to_string() + "/whatever/you/{ip}/want/{principal}/{ip:2}/{node}/end".to_string(), ), ), true, @@ -719,22 +848,26 @@ config = { topic = "my-kafka-topic", options = { "bootstrap.servers" = "localhos fn test_getting_started_conf() -> Result<()> { let mut data = parse(GETTING_STARTED_CONF, None)?; - let mut expected = - crate::subscription::SubscriptionData::new("my-test-subscription", GETTING_STARTED_QUERY); - expected - .set_uuid(crate::subscription::SubscriptionUuid(Uuid::from_str( - "28fcc206-1336-4e4a-b76b-18b0ab46e585", - )?)); + let mut expected = crate::subscription::SubscriptionData::new( + "my-test-subscription", + GETTING_STARTED_QUERY, + ); + expected.set_uuid(crate::subscription::SubscriptionUuid(Uuid::from_str( + "28fcc206-1336-4e4a-b76b-18b0ab46e585", + )?)); let mut kafka_options = HashMap::new(); - kafka_options.insert("bootstrap.servers".to_string(), "localhost:9092".to_string()); + kafka_options.insert( + "bootstrap.servers".to_string(), + "localhost:9092".to_string(), + ); let outputs = vec![ crate::subscription::SubscriptionOutput::new( crate::subscription::SubscriptionOutputFormat::Raw, crate::subscription::SubscriptionOutputDriver::Files( crate::subscription::FilesConfiguration::new( - "/data/logs/{ip}/{principal}/messages".to_string() + "/data/logs/{ip}/{principal}/messages".to_string(), ), ), true, diff --git a/common/src/models/export.rs b/common/src/models/export.rs index d466448..b02d161 100644 --- a/common/src/models/export.rs +++ b/common/src/models/export.rs @@ -26,8 +26,12 @@ pub fn serialize(subscriptions: &[crate::subscription::SubscriptionData]) -> Res pub fn parse(content: &str) -> Result> { let import: ImportExport = serde_json::from_str(content).context("Failed to parse file")?; let subscriptions = match import { - ImportExport::V1(subscriptions) => subscriptions.into(), - ImportExport::V2(subscriptions) => subscriptions.into(), + ImportExport::V1(subscriptions) => subscriptions + .try_into() + .context("Invalid subscription data")?, + ImportExport::V2(subscriptions) => subscriptions + .try_into() + .context("Invalid subscription data")?, }; Ok(subscriptions) } @@ -70,9 +74,18 @@ mod v1 { pub port: u16, } - impl From for crate::subscription::TcpConfiguration { - fn from(value: TcpConfiguration) -> Self { - crate::subscription::TcpConfiguration::new(value.addr, value.port) + impl TryFrom for crate::subscription::TcpConfiguration { + type Error = anyhow::Error; + + fn try_from(value: TcpConfiguration) -> Result { + crate::subscription::TcpConfiguration::new( + value.addr, + value.port, + false, + Vec::new(), + None, + None, + ) } } @@ -86,7 +99,13 @@ mod v1 { impl From for crate::subscription::FilesConfiguration { fn from(value: FilesConfiguration) -> Self { - let path = transform_files_config_to_path(&Some(value.base), &value.split_on_addr_index, &Some(value.append_node_name), &Some(value.filename)).expect("Failed to convert old Files driver configuration"); + let path = transform_files_config_to_path( + &Some(value.base), + &value.split_on_addr_index, + &Some(value.append_node_name), + &Some(value.filename), + ) + .expect("Failed to convert old Files driver configuration"); crate::subscription::FilesConfiguration::new(path) } } @@ -111,9 +130,11 @@ mod v1 { UnixDatagram(UnixDatagramConfiguration), } - impl From for crate::subscription::SubscriptionOutputDriver { - fn from(value: SubscriptionOutputDriver) -> Self { - match value { + impl TryFrom for crate::subscription::SubscriptionOutputDriver { + type Error = anyhow::Error; + + fn try_from(value: SubscriptionOutputDriver) -> Result { + Ok(match value { SubscriptionOutputDriver::Files(config) => { crate::subscription::SubscriptionOutputDriver::Files(config.into()) } @@ -121,7 +142,7 @@ mod v1 { crate::subscription::SubscriptionOutputDriver::Kafka(config.into()) } SubscriptionOutputDriver::Tcp(config) => { - crate::subscription::SubscriptionOutputDriver::Tcp(config.into()) + crate::subscription::SubscriptionOutputDriver::Tcp(config.try_into()?) } SubscriptionOutputDriver::Redis(config) => { crate::subscription::SubscriptionOutputDriver::Redis(config.into()) @@ -129,7 +150,7 @@ mod v1 { SubscriptionOutputDriver::UnixDatagram(config) => { crate::subscription::SubscriptionOutputDriver::UnixDatagram(config.into()) } - } + }) } } @@ -165,13 +186,15 @@ mod v1 { pub enabled: bool, } - impl From for crate::subscription::SubscriptionOutput { - fn from(value: SubscriptionOutput) -> Self { - crate::subscription::SubscriptionOutput::new( + impl TryFrom for crate::subscription::SubscriptionOutput { + type Error = anyhow::Error; + + fn try_from(value: SubscriptionOutput) -> Result { + Ok(crate::subscription::SubscriptionOutput::new( value.format.into(), - value.driver.into(), + value.driver.try_into()?, value.enabled, - ) + )) } } @@ -239,9 +262,13 @@ mod v1 { pub outputs: Vec, } - impl From for crate::subscription::SubscriptionData { - fn from(value: SubscriptionData) -> Self { + impl TryFrom for crate::subscription::SubscriptionData { + type Error = anyhow::Error; + + fn try_from(value: SubscriptionData) -> Result { let mut data = crate::subscription::SubscriptionData::new(&value.name, &value.query); + let outputs: Result, _> = + value.outputs.iter().map(|s| s.clone().try_into()).collect(); data.set_uuid(crate::subscription::SubscriptionUuid(value.uuid)) .set_uri(value.uri) .set_heartbeat_interval(value.heartbeat_interval) @@ -256,10 +283,10 @@ mod v1 { .set_princs_filter(value.filter.into()) .set_locale(value.locale) .set_data_locale(value.data_locale) - .set_outputs(value.outputs.iter().map(|s| s.clone().into()).collect()) + .set_outputs(outputs?) .set_revision(value.revision); // Note: internal version is not exported nor set - data + Ok(data) } } @@ -268,13 +295,16 @@ mod v1 { pub subscriptions: Vec, } - impl From for Vec { - fn from(value: Subscriptions) -> Self { - value + impl TryFrom for Vec { + type Error = anyhow::Error; + + fn try_from(value: Subscriptions) -> Result { + let subscriptions: Result, _> = value .subscriptions .iter() - .map(|s| s.clone().into()) - .collect() + .map(|s| s.clone().try_into()) + .collect(); + Ok(subscriptions?) } } } @@ -332,26 +362,44 @@ pub mod v2 { pub(super) struct TcpConfiguration { pub addr: String, pub port: u16, - } - - impl From for crate::subscription::TcpConfiguration { - fn from(value: TcpConfiguration) -> Self { - crate::subscription::TcpConfiguration::new(value.addr, value.port) + pub tls_enabled: Option, + #[serde(default)] + pub tls_certificate_authorities: Vec, + pub tls_certificate: Option, + pub tls_key: Option, + } + + impl TryFrom for crate::subscription::TcpConfiguration { + type Error = anyhow::Error; + + fn try_from(value: TcpConfiguration) -> Result { + crate::subscription::TcpConfiguration::new( + value.addr, + value.port, + value.tls_enabled.unwrap_or(false), + value.tls_certificate_authorities, + value.tls_certificate, + value.tls_key, + ) } } impl From for TcpConfiguration { fn from(value: crate::subscription::TcpConfiguration) -> Self { Self { - addr: value.addr().to_string(), + addr: value.host().to_string(), port: value.port(), + tls_enabled: Some(value.tls_enabled()), + tls_certificate_authorities: value.tls_certificate_authorities().to_owned(), + tls_certificate: value.tls_certificate().cloned(), + tls_key: value.tls_key().cloned(), } } } #[derive(Debug, Clone, Deserialize, Eq, PartialEq, Serialize)] pub(super) struct FilesConfiguration { - pub path: String + pub path: String, } impl From for crate::subscription::FilesConfiguration { @@ -363,7 +411,7 @@ pub mod v2 { impl From for FilesConfiguration { fn from(value: crate::subscription::FilesConfiguration) -> Self { Self { - path: value.path().to_owned() + path: value.path().to_owned(), } } } @@ -396,9 +444,11 @@ pub mod v2 { UnixDatagram(UnixDatagramConfiguration), } - impl From for crate::subscription::SubscriptionOutputDriver { - fn from(value: SubscriptionOutputDriver) -> Self { - match value { + impl TryFrom for crate::subscription::SubscriptionOutputDriver { + type Error = anyhow::Error; + + fn try_from(value: SubscriptionOutputDriver) -> Result { + Ok(match value { SubscriptionOutputDriver::Files(config) => { crate::subscription::SubscriptionOutputDriver::Files(config.into()) } @@ -406,7 +456,7 @@ pub mod v2 { crate::subscription::SubscriptionOutputDriver::Kafka(config.into()) } SubscriptionOutputDriver::Tcp(config) => { - crate::subscription::SubscriptionOutputDriver::Tcp(config.into()) + crate::subscription::SubscriptionOutputDriver::Tcp(config.try_into()?) } SubscriptionOutputDriver::Redis(config) => { crate::subscription::SubscriptionOutputDriver::Redis(config.into()) @@ -414,7 +464,7 @@ pub mod v2 { SubscriptionOutputDriver::UnixDatagram(config) => { crate::subscription::SubscriptionOutputDriver::UnixDatagram(config.into()) } - } + }) } } @@ -489,13 +539,15 @@ pub mod v2 { pub enabled: bool, } - impl From for crate::subscription::SubscriptionOutput { - fn from(value: SubscriptionOutput) -> Self { - crate::subscription::SubscriptionOutput::new( + impl TryFrom for crate::subscription::SubscriptionOutput { + type Error = anyhow::Error; + + fn try_from(value: SubscriptionOutput) -> Result { + Ok(crate::subscription::SubscriptionOutput::new( value.format.into(), - value.driver.into(), + value.driver.try_into()?, value.enabled, - ) + )) } } @@ -601,9 +653,13 @@ pub mod v2 { pub outputs: Vec, } - impl From for crate::subscription::SubscriptionData { - fn from(value: SubscriptionData) -> Self { + impl TryFrom for crate::subscription::SubscriptionData { + type Error = anyhow::Error; + + fn try_from(value: SubscriptionData) -> Result { let mut data = crate::subscription::SubscriptionData::new(&value.name, &value.query); + let outputs: Result, _> = + value.outputs.iter().map(|s| s.clone().try_into()).collect(); data.set_uuid(crate::subscription::SubscriptionUuid(value.uuid)) .set_uri(value.uri) .set_heartbeat_interval(value.heartbeat_interval) @@ -619,10 +675,10 @@ pub mod v2 { .set_princs_filter(value.filter.into()) .set_locale(value.locale) .set_data_locale(value.data_locale) - .set_outputs(value.outputs.iter().map(|s| s.clone().into()).collect()) + .set_outputs(outputs?) .set_revision(value.revision); // Note: internal version is not exported nor set - data + Ok(data) } } @@ -658,13 +714,16 @@ pub mod v2 { pub subscriptions: Vec, } - impl From for Vec { - fn from(value: Subscriptions) -> Self { - value + impl TryFrom for Vec { + type Error = anyhow::Error; + + fn try_from(value: Subscriptions) -> Result { + let subscriptions: Result, _> = value .subscriptions .iter() - .map(|s| s.clone().into()) - .collect() + .map(|s| s.clone().try_into()) + .collect(); + subscriptions } } @@ -711,7 +770,14 @@ mod tests { .set_outputs(vec![crate::subscription::SubscriptionOutput::new( crate::subscription::SubscriptionOutputFormat::Json, crate::subscription::SubscriptionOutputDriver::Tcp( - crate::subscription::TcpConfiguration::new("127.0.0.1".to_string(), 5000), + crate::subscription::TcpConfiguration::new( + "127.0.0.1".to_string(), + 5000, + false, + vec![], + None, + None, + )?, ), true, )]) diff --git a/common/src/subscription.rs b/common/src/subscription.rs index 81d00ad..4cc813b 100644 --- a/common/src/subscription.rs +++ b/common/src/subscription.rs @@ -2,6 +2,7 @@ use std::{ collections::{HashMap, HashSet}, fmt::{Display, Formatter}, hash::{Hash, Hasher}, + net::IpAddr, str::FromStr, }; @@ -70,24 +71,77 @@ impl RedisConfiguration { } } -#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] +#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Default)] pub struct TcpConfiguration { - addr: String, + // Stay compatible with old 'addr' attribute + #[serde(alias = "addr")] + host: String, port: u16, + tls_enabled: bool, + tls_certificate_authorities: Vec, + tls_certificate: Option, + tls_key: Option, } impl TcpConfiguration { - pub fn new(addr: String, port: u16) -> Self { - TcpConfiguration { addr, port } + pub fn new( + host: String, + port: u16, + tls_enabled: bool, + tls_certificate_authorities: Vec, + tls_certificate: Option, + tls_key: Option, + ) -> Result { + // Check that addr is a hostname if tls is enabled + if tls_enabled { + if IpAddr::from_str(&host).is_ok() { + bail!("host must be a hostname if tls is enabled, found {}", &host); + } + + if tls_certificate_authorities.is_empty() { + bail!("tls_certificate_authorities must be not empty if tls is enabled") + } + + match (&tls_certificate, &tls_key) { + (Some(_), Some(_)) => (), + (None, None) => (), + (cert, key) => bail!("tls_certificate and tls_key must both be defined, found tls_certificate={:?} and tls_key={:?}", cert, key) + } + } + + Ok(TcpConfiguration { + host, + port, + tls_enabled, + tls_certificate_authorities, + tls_certificate, + tls_key, + }) } - pub fn addr(&self) -> &str { - self.addr.as_ref() + pub fn host(&self) -> &str { + self.host.as_ref() } pub fn port(&self) -> u16 { self.port } + + pub fn tls_enabled(&self) -> bool { + self.tls_enabled + } + + pub fn tls_certificate_authorities(&self) -> &[String] { + self.tls_certificate_authorities.as_ref() + } + + pub fn tls_certificate(&self) -> Option<&String> { + self.tls_certificate.as_ref() + } + + pub fn tls_key(&self) -> Option<&String> { + self.tls_key.as_ref() + } } #[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] @@ -96,12 +150,8 @@ pub struct FilesConfiguration { } impl FilesConfiguration { - pub fn new( - path: String, - ) -> Self { - Self { - path - } + pub fn new(path: String) -> Self { + Self { path } } pub fn path(&self) -> &str { @@ -683,8 +733,8 @@ impl SubscriptionData { self } - /// Set the subscription's max elements. - pub fn set_max_elements(&mut self, max_elements: Option) -> &mut Self { + /// Set the subscription's max elements. + pub fn set_max_elements(&mut self, max_elements: Option) -> &mut Self { self.parameters.max_elements = max_elements; self.update_internal_version(); self diff --git a/doc/outputs.md b/doc/outputs.md index a60fae8..f8567d0 100644 --- a/doc/outputs.md +++ b/doc/outputs.md @@ -102,7 +102,9 @@ The TCP driver send events in a "raw" TCP connection. The TCP connection is established when the first event has to be sent. It is kept opened as long as possible, and re-established if required. There is one TCP connection per output using TCP driver. -You must provide an IP address or a hostname and a port to connect to. +You must provide an IP address or a hostname (`host`) and a port to connect to. + +The TCP connection can optionally be secured using TLS (`tls_enabled`). The TCP driver verifies the server certificate against the specified certificate authorities (`tls_certificate_authorities`). The TCP driver can optionally use a client certificate `tls_certificate` (and its associated key `tls_key`) if the server requires client authentication. #### Configuration @@ -110,7 +112,18 @@ You must provide an IP address or a hostname and a port to connect to. [[outputs]] driver = "Tcp" format = "" # To replace -config = { addr = "", port = } # To replace +# - host (required): Hostname or IP Address to send events to +# - port (required): Tcp port to send events to +# - tls_enabled (optional, defaults to false): wrap the TCP stream in a TLS channel. +# Must be set for other tls_ options to take effect +# - tls_certificate_authorities (optional, defaults to undefined): Validate server certificate +# chain against these authorities. You can define multiple files or paths. +# All the certificates will be read and added to the trust store. +# - tls_certificate (optional, defaults to undefined): Path to certificate in PEM format. +# This certificate will be presented to the server. +# - tls_key (optional, defaults to undefined): Path to the private key corresponding to the +# specified certificate (PEM format). +config = { host = "", port = } # To replace ``` #### Command diff --git a/server/Cargo.toml b/server/Cargo.toml index 532022a..16954fb 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -36,7 +36,7 @@ xmlparser = "0.13.5" itertools = "0.13.0" futures = "0.3.28" bitreader = "0.3.7" -rustls-pemfile = "2.1.1" +rustls-pemfile = "2.2.0" x509-parser = "0.16.0" sha1 = "0.10.5" hex = "0.4.3" diff --git a/server/src/drivers/tcp.rs b/server/src/drivers/tcp.rs index 909ab4c..8f2b31e 100644 --- a/server/src/drivers/tcp.rs +++ b/server/src/drivers/tcp.rs @@ -1,19 +1,25 @@ -use std::sync::Arc; +use std::{pin::Pin, sync::Arc}; use crate::{ event::EventMetadata, output::OutputDriver, + tls::{load_certs, load_priv_key}, }; -use anyhow::{anyhow, Result}; +use anyhow::{anyhow, Context, Result}; use async_trait::async_trait; use common::subscription::TcpConfiguration; use log::{debug, info, warn}; use tokio::{ + io::AsyncWrite, net::TcpStream, sync::{mpsc, oneshot}, }; use tokio::io::AsyncWriteExt; +use tokio_rustls::{ + rustls::{pki_types::ServerName, ClientConfig, RootCertStore}, + TlsConnector, +}; use tokio_util::sync::CancellationToken; #[derive(Debug)] @@ -31,24 +37,62 @@ fn send_response(sender: oneshot::Sender>, msg: Result<()>) { } } +pub async fn connect( + config: &TcpConfiguration, +) -> Result>> { + if config.tls_enabled() { + let mut certificate_authorities = Vec::new(); + for certificate_authority_file in config.tls_certificate_authorities() { + certificate_authorities.extend(load_certs(&certificate_authority_file)?); + } + let mut root_cert_store = RootCertStore::empty(); + root_cert_store.add_parsable_certificates(certificate_authorities.clone()); + + let tls_config_builder = ClientConfig::builder().with_root_certificates(root_cert_store); + + let tls_config = if let Some(tls_certificate_file) = config.tls_certificate() { + let tls_certificate = load_certs(tls_certificate_file)?; + let tls_key_file = config.tls_key().ok_or_else(|| anyhow!("Missing tls_key"))?; + let tls_private_key = load_priv_key(&tls_key_file)?; + tls_config_builder.with_client_auth_cert(tls_certificate, tls_private_key)? + } else { + tls_config_builder.with_no_client_auth() + }; + let connector = TlsConnector::from(Arc::new(tls_config)); + let dnsname = ServerName::try_from(config.host().to_owned())?; + + let stream = TcpStream::connect((config.host(), config.port())) + .await + .context("Failed to establish TCP connection")?; + Ok(Box::pin(connector.connect(dnsname, stream).await?)) + } else { + Ok(Box::pin( + TcpStream::connect((config.host(), config.port())) + .await + .context("Failed to establish TCP connection")?, + )) + } +} + pub async fn run( config: TcpConfiguration, mut task_rx: mpsc::Receiver, cancellation_token: CancellationToken, ) { - let mut stream_opt: Option = None; + let mut stream_opt: Option>> = None; + loop { tokio::select! { Some(message) = task_rx.recv() => { // Establish TCP connection if not already done if stream_opt.is_none() { - match TcpStream::connect((config.addr(), config.port())).await { + match connect(&config).await { Ok(stream) => { stream_opt = Some(stream); }, Err(e) => { - warn!("Failed to connect to {}:{}: {}", config.addr(), config.port(), e); - send_response(message.resp, Err(anyhow!(format!("Failed to connect to {}:{}: {}", config.addr(), config.port(), e)))); + warn!("Failed to connect to {}:{}: {}", config.host(), config.port(), e); + send_response(message.resp, Err(anyhow!(format!("Failed to connect to {}:{}: {}", config.host(), config.port(), e)))); continue; } }; @@ -58,7 +102,7 @@ pub async fn run( Some(stream) => stream, None => { warn!("TCP stream is unset !"); - send_response(message.resp, Err(anyhow!(format!("TCP stream of {}:{} is unset!", config.addr(), config.port())))); + send_response(message.resp, Err(anyhow!(format!("TCP stream of {}:{} is unset!", config.host(), config.port())))); continue; } }; @@ -66,7 +110,7 @@ pub async fn run( // Write data to stream if let Err(e) = stream.write_all(message.content.as_bytes()).await { stream_opt = None; - send_response(message.resp, Err(anyhow!(format!("Failed to write in TCP connection ({}:{}): {}", config.addr(), config.port(), e)))); + send_response(message.resp, Err(anyhow!(format!("Failed to write in TCP connection ({}:{}): {}", config.host(), config.port(), e)))); continue; } @@ -77,11 +121,7 @@ pub async fn run( } }; } - info!( - "Exiting TCP output task ({}:{})", - config.addr(), - config.port() - ); + info!("Exiting TCP output task ({:?})", config); } pub struct OutputTcp { diff --git a/server/src/tls.rs b/server/src/tls.rs index c91f06c..390d7a5 100644 --- a/server/src/tls.rs +++ b/server/src/tls.rs @@ -16,7 +16,7 @@ use x509_parser::prelude::{FromDer, X509Certificate}; use crate::sldc; /// Load certificates contained inside a PEM file -fn load_certs(filename: &str) -> Result>> { +pub fn load_certs(filename: &str) -> Result>> { let certfile = fs::File::open(filename)?; let mut reader = BufReader::new(certfile); @@ -33,7 +33,7 @@ fn load_certs(filename: &str) -> Result>> { } /// Load private key contained inside a file -fn load_priv_key(filename: &str) -> Result> { +pub fn load_priv_key(filename: &str) -> Result> { let keyfile = fs::File::open(filename).context("Cannot open private key file")?; let mut reader = BufReader::new(keyfile); diff --git a/subscription.sample.toml b/subscription.sample.toml index d23298e..cfa42c2 100644 --- a/subscription.sample.toml +++ b/subscription.sample.toml @@ -1,10 +1,10 @@ # autogenerated by openwec 0.3.0 -# Wed, 20 Nov 2024 00:25:58 +0100 +# Mon, 2 Dec 2024 19:26:21 +0100 # Unique identifier of the subscription -uuid = "1a196390-cc5d-4b42-bfd1-a3481eb2302b" +uuid = "d9e046ec-92ad-4a18-899f-f5da94108fc8" # Unique name of the subscription -name = "subscription-1a196390-cc5d-4b42-bfd1-a3481eb2302b" +name = "subscription-d9e046ec-92ad-4a18-899f-f5da94108fc8" # Subscription query query = """ @@ -130,10 +130,19 @@ query = """ # driver = "Tcp" # format = "Raw" -# Tcp driver has the following paramters: -# - addr (required): Hostname or IP Address to send events to +# Tcp driver has the following parameters: +# - host (required): Hostname or IP Address to send events to # - port (required): Tcp port to send events to -# config = { addr = "localhost", port = 5000 } +# - tls_enabled (optional, defaults to false): wrap the TCP stream in a TLS session. +# Must be set for other tls_ options to take effect +# - tls_certificate_authorities (optional, defaults to undefined): Validate server certificate +# chain against these authorities. You can define multiple files or paths. +# All the certificates will be read and added to the trust store. +# - tls_certificate (optional, defaults to undefined): Path to certificate in PEM format. +# This certificate will be presented to the server. +# - tls_key (optional, defaults to undefined): Path to the private key corresponding to the +# specified certificate (PEM format). +# config = { host = "localhost", port = 5000 } # Configure a Redis output