diff --git a/Cargo.lock b/Cargo.lock index bc5f53f759..d3370fe34a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7502,6 +7502,7 @@ dependencies = [ "chrono-tz", "clap", "clickward", + "const_format", "criterion", "crossterm", "debug-ignore", @@ -7523,8 +7524,10 @@ dependencies = [ "oximeter", "oximeter-test-utils", "oxql-types", + "parse-display", "peg", "qorb", + "quote", "reedline", "regex", "reqwest 0.12.8", @@ -7669,6 +7672,7 @@ dependencies = [ "omicron-common", "omicron-workspace-hack", "oximeter-macro-impl", + "parse-display", "rand", "rand_distr", "regex", diff --git a/nexus-config/src/nexus_config.rs b/nexus-config/src/nexus_config.rs index a0af65b6ec..82362f2f0d 100644 --- a/nexus-config/src/nexus_config.rs +++ b/nexus-config/src/nexus_config.rs @@ -250,8 +250,12 @@ pub struct SchemaConfig { /// Optional configuration for the timeseries database. #[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)] pub struct TimeseriesDbConfig { + /// The HTTP address of the ClickHouse server. #[serde(default, skip_serializing_if = "Option::is_none")] pub address: Option, + /// The native TCP address of the ClickHouse server. + #[serde(default, skip_serializing_if = "Option::is_none")] + pub native_address: Option, } /// Configuration for the `Dendrite` dataplane daemon. @@ -774,7 +778,9 @@ impl std::fmt::Display for SchemeName { mod test { use super::*; - use omicron_common::address::{Ipv6Subnet, RACK_PREFIX}; + use omicron_common::address::{ + Ipv6Subnet, CLICKHOUSE_HTTP_PORT, CLICKHOUSE_TCP_PORT, RACK_PREFIX, + }; use omicron_common::api::internal::shared::SwitchLocation; use camino::{Utf8Path, Utf8PathBuf}; @@ -784,7 +790,7 @@ mod test { use dropshot::ConfigLoggingLevel; use std::collections::HashMap; use std::fs; - use std::net::{Ipv6Addr, SocketAddr}; + use std::net::{Ipv6Addr, SocketAddr, SocketAddrV6}; use std::str::FromStr; use std::time::Duration; @@ -889,6 +895,7 @@ mod test { if_exists = "fail" [timeseries_db] address = "[::1]:8123" + native_address = "[::1]:9000" [updates] trusted_root = "/path/to/root.json" [tunables] @@ -1007,7 +1014,20 @@ mod test { path: "/nonexistent/path".into() }, timeseries_db: TimeseriesDbConfig { - address: Some("[::1]:8123".parse().unwrap()) + address: Some(SocketAddr::V6(SocketAddrV6::new( + Ipv6Addr::LOCALHOST, + CLICKHOUSE_HTTP_PORT, + 0, + 0, + ))), + native_address: Some(SocketAddr::V6( + SocketAddrV6::new( + Ipv6Addr::LOCALHOST, + CLICKHOUSE_TCP_PORT, + 0, + 0, + ) + )), }, updates: Some(UpdatesConfig { trusted_root: Utf8PathBuf::from("/path/to/root.json"), diff --git a/nexus/src/app/metrics.rs b/nexus/src/app/metrics.rs index ba76f87392..40f7882281 100644 --- a/nexus/src/app/metrics.rs +++ b/nexus/src/app/metrics.rs @@ -115,7 +115,8 @@ impl super::Nexus { .timeseries_schema_list(&pagination.page, limit) .await .map_err(|e| match e { - oximeter_db::Error::DatabaseUnavailable(_) => { + oximeter_db::Error::DatabaseUnavailable(_) + | oximeter_db::Error::Connection(_) => { Error::ServiceUnavailable { internal_message: e.to_string(), } @@ -150,7 +151,8 @@ impl super::Nexus { result.tables }) .map_err(|e| match e { - oximeter_db::Error::DatabaseUnavailable(_) => { + oximeter_db::Error::DatabaseUnavailable(_) + | oximeter_db::Error::Connection(_) => { Error::ServiceUnavailable { internal_message: e.to_string(), } diff --git a/nexus/src/app/mod.rs b/nexus/src/app/mod.rs index fa9c2c69cf..e451119bfc 100644 --- a/nexus/src/app/mod.rs +++ b/nexus/src/app/mod.rs @@ -24,6 +24,7 @@ use nexus_db_queries::authn; use nexus_db_queries::authz; use nexus_db_queries::context::OpContext; use nexus_db_queries::db; +use omicron_common::address::CLICKHOUSE_HTTP_PORT; use omicron_common::address::CLICKHOUSE_TCP_PORT; use omicron_common::address::DENDRITE_PORT; use omicron_common::address::MGD_PORT; @@ -411,13 +412,12 @@ impl Nexus { .map_err(|e| e.to_string())?; // Client to the ClickHouse database. - let timeseries_client = - if let Some(http_address) = &config.pkg.timeseries_db.address { - let native_address = - SocketAddr::new(http_address.ip(), CLICKHOUSE_TCP_PORT); - oximeter_db::Client::new(*http_address, native_address, &log) - } else { - // TODO-cleanup: Remove this when we remove the HTTP client. + // TODO-cleanup: Simplify this when we remove the HTTP client. + let timeseries_client = match ( + &config.pkg.timeseries_db.address, + &config.pkg.timeseries_db.native_address, + ) { + (None, None) => { let http_resolver = qorb_resolver.for_service(ServiceName::Clickhouse); let native_resolver = @@ -427,7 +427,24 @@ impl Nexus { native_resolver, &log, ) - }; + } + (maybe_http, maybe_native) => { + let (http_address, native_address) = + match (maybe_http, maybe_native) { + (None, None) => unreachable!("handled above"), + (None, Some(native)) => ( + SocketAddr::new(native.ip(), CLICKHOUSE_HTTP_PORT), + *native, + ), + (Some(http), None) => ( + *http, + SocketAddr::new(http.ip(), CLICKHOUSE_TCP_PORT), + ), + (Some(http), Some(native)) => (*http, *native), + }; + oximeter_db::Client::new(http_address, native_address, &log) + } + }; // TODO-cleanup We may want to make the populator a first-class // background task. diff --git a/nexus/src/app/oximeter.rs b/nexus/src/app/oximeter.rs index 6a4a81a47a..708b0828c9 100644 --- a/nexus/src/app/oximeter.rs +++ b/nexus/src/app/oximeter.rs @@ -241,7 +241,8 @@ pub(crate) async fn unassign_producer( fn map_oximeter_err(error: oximeter_db::Error) -> Error { match error { - oximeter_db::Error::DatabaseUnavailable(_) => { + oximeter_db::Error::DatabaseUnavailable(_) + | oximeter_db::Error::Connection(_) => { Error::ServiceUnavailable { internal_message: error.to_string() } } _ => Error::InternalError { internal_message: error.to_string() }, diff --git a/nexus/test-utils/src/lib.rs b/nexus/test-utils/src/lib.rs index 631450db20..c74e2faf6b 100644 --- a/nexus/test-utils/src/lib.rs +++ b/nexus/test-utils/src/lib.rs @@ -485,6 +485,7 @@ impl<'a, N: NexusServer> ControlPlaneTestContextBuilder<'a, N> { let dataset_id = Uuid::new_v4(); let http_address = clickhouse.http_address(); let http_port = http_address.port(); + let native_address = clickhouse.native_address(); self.rack_init_builder.add_clickhouse_dataset( zpool_id, dataset_id, @@ -503,6 +504,8 @@ impl<'a, N: NexusServer> ControlPlaneTestContextBuilder<'a, N> { .as_mut() .expect("Tests expect to set a port of Clickhouse") .set_port(http_port); + self.config.pkg.timeseries_db.native_address = + Some(native_address.into()); let pool_name = illumos_utils::zpool::ZpoolName::new_external(zpool_id) .to_string() diff --git a/oximeter/db/Cargo.toml b/oximeter/db/Cargo.toml index b0afdfbb07..df4bc7d703 100644 --- a/oximeter/db/Cargo.toml +++ b/oximeter/db/Cargo.toml @@ -17,6 +17,7 @@ camino.workspace = true chrono.workspace = true chrono-tz.workspace = true clap.workspace = true +const_format.workspace = true clickward.workspace = true debug-ignore.workspace = true dropshot.workspace = true @@ -32,6 +33,7 @@ omicron-common.workspace = true omicron-workspace-hack.workspace = true oximeter.workspace = true oxql-types.workspace = true +parse-display.workspace = true qorb.workspace = true regex.workspace = true serde.workspace = true @@ -93,6 +95,11 @@ optional = true workspace = true features = [ "rt-multi-thread", "macros" ] +[build-dependencies] +anyhow.workspace = true +nom.workspace = true +quote.workspace = true + [dev-dependencies] camino-tempfile.workspace = true criterion = { workspace = true, features = [ "async_tokio" ] } diff --git a/oximeter/db/build.rs b/oximeter/db/build.rs new file mode 100644 index 0000000000..b166c52897 --- /dev/null +++ b/oximeter/db/build.rs @@ -0,0 +1,100 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright 2024 Oxide Computer Company + +use anyhow::Context as _; +use nom::IResult; + +/// Build script for generating native type representations from the +/// ground-truth SQL definitions. +fn main() -> anyhow::Result<()> { + const INIT_FILE: &str = + concat!(env!("CARGO_MANIFEST_DIR"), "/schema/single-node/db-init.sql"); + let contents = std::fs::read_to_string(INIT_FILE) + .with_context(|| format!("Failed to read SQL file: '{INIT_FILE}'"))?; + let field_type_enum = + find_enum(&contents, "type").context("failed to find column 'type'")?; + let field_source_enum = find_enum(&contents, "source") + .context("failed to find column 'source'")?; + let datum_type_enum = find_enum(&contents, "datum_type") + .context("failed to find column 'datum_type'")?; + std::fs::write( + format!("{}/enum_defs.rs", std::env::var("OUT_DIR")?), + [field_type_enum, field_source_enum, datum_type_enum].join("\n"), + ) + .context("writing output file")?; + Ok(()) +} + +// Find an enum in the `timeseries_schema` table definition for the named +// column, and return the corresponding `DataType::Enum8()` definition for it. +fn find_enum(contents: &str, column: &str) -> Option { + let needle = format!("{column} Enum(\n"); + let start = contents.find(&needle)? + needle.len(); + let s = &contents[start..].trim(); + let (variants, names): (Vec, Vec) = + variant_list(s).ok()?.1.into_iter().unzip(); + let enum_map = quote::format_ident!("{}_ENUM_MAP", column.to_uppercase()); + let enum_rev_map = + quote::format_ident!("{}_ENUM_REV_MAP", column.to_uppercase()); + let enum_type = + quote::format_ident!("{}_ENUM_DATA_TYPE", column.to_uppercase()); + let parsed_type = if column == "type" { + quote::quote! { ::oximeter::FieldType } + } else if column == "source" { + quote::quote! { ::oximeter::FieldSource } + } else if column == "datum_type" { + quote::quote! { ::oximeter::DatumType } + } else { + unreachable!(); + }; + Some(quote::quote! { + /// Mapping from the variant index to the string form. + #[allow(dead_code)] + static #enum_map: ::std::sync::LazyLock<::indexmap::IndexMap> = ::std::sync::LazyLock::new(|| { + ::indexmap::IndexMap::from([ + #((#variants, String::from(#names))),* + ]) + }); + /// Reverse mapping, from the _parsed_ form to the variant index. + #[allow(dead_code)] + static #enum_rev_map: ::std::sync::LazyLock<::indexmap::IndexMap<#parsed_type, i8>> = ::std::sync::LazyLock::new(|| { + ::indexmap::IndexMap::from([ + #((<#parsed_type as ::std::str::FromStr>::from_str(#names).unwrap(), #variants)),* + ]) + }); + /// Actual DataType::Enum8(_) with the contained variant-to-name mapping. + #[allow(dead_code)] + static #enum_type: ::std::sync::LazyLock = ::std::sync::LazyLock::new(|| { + crate::native::block::DataType::Enum8( + ::indexmap::IndexMap::from([ + #((#variants, String::from(#names))),* + ]) + ) + }); + }.to_string()) +} + +fn variant_list(s: &str) -> IResult<&str, Vec<(i8, String)>> { + nom::multi::separated_list1( + nom::bytes::complete::is_a(" ,\n"), + single_variant, + )(s) +} + +fn single_variant(s: &str) -> IResult<&str, (i8, String)> { + nom::combinator::map( + nom::sequence::separated_pair( + nom::sequence::delimited( + nom::bytes::complete::tag("'"), + nom::character::complete::alphanumeric1, + nom::bytes::complete::tag("'"), + ), + nom::bytes::complete::tag(" = "), + nom::character::complete::i8, + ), + |(name, variant): (&str, i8)| (variant, name.to_string()), + )(s) +} diff --git a/oximeter/db/src/client/dbwrite.rs b/oximeter/db/src/client/dbwrite.rs index 3559374e0d..d37d95deb9 100644 --- a/oximeter/db/src/client/dbwrite.rs +++ b/oximeter/db/src/client/dbwrite.rs @@ -8,18 +8,19 @@ use crate::client::Client; use crate::model; +use crate::model::to_block::ToBlock as _; use crate::Error; use camino::Utf8PathBuf; use oximeter::Sample; -use oximeter::TimeseriesName; +use oximeter::TimeseriesSchema; use slog::debug; use std::collections::BTreeMap; use std::collections::BTreeSet; #[derive(Debug)] pub(super) struct UnrolledSampleRows { - /// The timeseries schema rows, keyed by timeseries name. - pub new_schema: BTreeMap, + /// The timeseries schema rows. + pub new_schema: Vec, /// The rows to insert in all the other tables, keyed by the table name. pub rows: BTreeMap>, } @@ -182,14 +183,14 @@ impl Client { continue; } Ok(None) => {} - Ok(Some((name, schema))) => { + Ok(Some(schema)) => { debug!( self.log, "new timeseries schema"; - "timeseries_name" => %name, - "schema" => %schema + "timeseries_name" => %schema.timeseries_name, + "schema" => ?schema, ); - new_schema.insert(name, schema); + new_schema.insert(schema.timeseries_name.clone(), schema); } } @@ -217,6 +218,7 @@ impl Client { seen_timeseries.insert(key); } + let new_schema = new_schema.into_values().collect(); UnrolledSampleRows { new_schema, rows } } @@ -268,7 +270,7 @@ impl Client { // receive a sample with a new schema, and both would then try to insert that schema. pub(super) async fn save_new_schema_or_remove( &self, - new_schema: BTreeMap, + new_schema: Vec, ) -> Result<(), Error> { if !new_schema.is_empty() { debug!( @@ -276,17 +278,11 @@ impl Client { "inserting {} new timeseries schema", new_schema.len() ); - const APPROX_ROW_SIZE: usize = 64; - let mut body = String::with_capacity( - APPROX_ROW_SIZE + APPROX_ROW_SIZE * new_schema.len(), + let body = const_format::formatcp!( + "INSERT INTO {}.timeseries_schema FORMAT Native", + crate::DATABASE_NAME ); - body.push_str("INSERT INTO "); - body.push_str(crate::DATABASE_NAME); - body.push_str(".timeseries_schema FORMAT JSONEachRow\n"); - for row_data in new_schema.values() { - body.push_str(row_data); - body.push('\n'); - } + let block = TimeseriesSchema::to_block(&new_schema)?; // Try to insert the schema. // @@ -294,16 +290,16 @@ impl Client { // internal cache. Since we check the internal cache first for // schema, if we fail here but _don't_ remove the schema, we'll // never end up inserting the schema, but we will insert samples. - if let Err(e) = self.execute(body).await { + if let Err(e) = self.insert_native(&body, block).await { debug!( self.log, "failed to insert new schema, removing from cache"; "error" => ?e, ); let mut schema = self.schema.lock().await; - for name in new_schema.keys() { + for schema_to_remove in new_schema.iter() { schema - .remove(name) + .remove(&schema_to_remove.timeseries_name) .expect("New schema should have been cached"); } return Err(e); diff --git a/oximeter/db/src/client/mod.rs b/oximeter/db/src/client/mod.rs index 625ed7fafb..39051e70c8 100644 --- a/oximeter/db/src/client/mod.rs +++ b/oximeter/db/src/client/mod.rs @@ -17,7 +17,9 @@ pub use self::dbwrite::DbWrite; pub use self::dbwrite::TestDbWrite; use crate::client::query_summary::QuerySummary; use crate::model; +use crate::model::from_block::FromBlock; use crate::native; +use crate::native::block::Block; use crate::native::block::ValueArray; use crate::native::QueryResult; use crate::query; @@ -428,7 +430,7 @@ impl Client { "FROM {}.timeseries_schema ", "ORDER BY timeseries_name ", "LIMIT {} ", - "FORMAT JSONEachRow;", + "FORMAT Native;", ), crate::DATABASE_NAME, limit.get(), @@ -441,7 +443,7 @@ impl Client { "WHERE timeseries_name > '{}' ", "ORDER BY timeseries_name ", "LIMIT {} ", - "FORMAT JSONEachRow;", + "FORMAT Native;", ), crate::DATABASE_NAME, last_timeseries, @@ -449,18 +451,19 @@ impl Client { ) } }; - let body = self.execute_with_body(sql).await?.1; - let schema = body - .lines() - .map(|line| { - TimeseriesSchema::from( - serde_json::from_str::(line) - .expect( - "Failed to deserialize TimeseriesSchema from database", - ), - ) - }) - .collect::>(); + let result = self.execute_with_block(&sql).await?; + let Some(block) = result.data.as_ref() else { + error!( + self.log, + "query listing timeseries schema did not contain \ + a data block" + ); + return Err(Error::Database(String::from( + "Query listing timeseries schema did not contain \ + any data from the database", + ))); + }; + let schema = TimeseriesSchema::from_block(block)?; ResultsPage::new(schema, &dropshot::EmptyScanParams {}, |schema, _| { schema.timeseries_name.clone() }) @@ -649,7 +652,7 @@ impl Client { "path" => path.display(), "filename" => &name, ); - match self.execute_native(sql).await { + match self.execute_native(&sql).await { Ok(_) => debug!( self.log, "successfully applied schema upgrade file"; @@ -857,11 +860,11 @@ impl Client { /// Read the latest version applied in the database. pub async fn read_latest_version(&self) -> Result { const ALIAS: &str = "max_version"; - let sql = format!( + const QUERY: &str = const_format::formatcp!( "SELECT MAX(value) AS {ALIAS} FROM {db_name}.version;", db_name = crate::DATABASE_NAME, ); - match self.execute_with_result_native(sql).await { + match self.execute_with_block(QUERY).await { Ok(result) => { let Some(data) = &result.data else { error!( @@ -957,13 +960,14 @@ impl Client { "INSERT INTO {db_name}.version (*) VALUES ({version}, now());", db_name = crate::DATABASE_NAME, ); - self.execute_native(sql).await + self.execute_native(&sql).await } /// Verifies if instance is part of oximeter_cluster pub async fn is_oximeter_cluster(&self) -> Result { - let sql = format!("SHOW CLUSTER {}", crate::CLUSTER_NAME); - self.execute_with_result_native(sql).await.and_then(|result| { + const QUERY: &str = + const_format::formatcp!("SHOW CLUSTER {}", crate::CLUSTER_NAME); + self.execute_with_block(QUERY).await.and_then(|result| { result .data .ok_or_else(|| { @@ -989,7 +993,7 @@ impl Client { async fn verify_or_cache_sample_schema( &self, sample: &Sample, - ) -> Result, Error> { + ) -> Result, Error> { let sample_schema = TimeseriesSchema::from(sample); let name = sample_schema.timeseries_name.clone(); let mut schema = self.schema.lock().await; @@ -1023,15 +1027,8 @@ impl Client { } } Entry::Vacant(entry) => { - let name = entry.key().clone(); entry.insert(sample_schema.clone()); - Ok(Some(( - name, - serde_json::to_string(&model::DbTimeseriesSchema::from( - sample_schema, - )) - .expect("Failed to convert schema to DB model"), - ))) + Ok(Some(sample_schema)) } } } @@ -1090,36 +1087,48 @@ impl Client { Ok(timeseries_by_key.into_values().collect()) } + // Insert data using the native TCP interface. + async fn insert_native( + &self, + sql: &str, + block: Block, + ) -> Result { + trace!( + self.log, + "executing SQL query"; + "sql" => sql, + ); + let mut handle = self.native_pool.claim().await?; + let id = usdt::UniqueId::new(); + probes::sql__query__start!(|| (&id, &sql)); + let result = handle.insert(sql, block).await.map_err(Error::from); + probes::sql__query__done!(|| (&id)); + result + } + // Execute a generic SQL statement, using the native TCP interface. - async fn execute_native(&self, sql: S) -> Result<(), Error> - where - S: Into, - { - self.execute_with_result_native(sql).await.map(|_| ()) + async fn execute_native(&self, sql: &str) -> Result<(), Error> { + self.execute_with_block(sql).await.map(|_| ()) } // Execute a generic SQL statement, returning the query result as a data // block. // // TODO-robustness This currently does no validation of the statement. - async fn execute_with_result_native( + async fn execute_with_block( &self, - sql: S, - ) -> Result - where - S: Into, - { - let sql = sql.into(); + sql: &str, + ) -> Result { trace!( self.log, "executing SQL query"; - "sql" => &sql, + "sql" => sql, ); let mut handle = self.native_pool.claim().await?; let id = usdt::UniqueId::new(); probes::sql__query__start!(|| (&id, &sql)); - let result = handle.query(sql.as_str()).await.map_err(Error::from); + let result = handle.query(sql).await.map_err(Error::from); probes::sql__query__done!(|| (&id)); result } @@ -1218,7 +1227,7 @@ impl Client { let sql = { if schema.is_empty() { format!( - "SELECT * FROM {db_name}.timeseries_schema FORMAT JSONEachRow;", + "SELECT * FROM {db_name}.timeseries_schema FORMAT Native;", db_name = crate::DATABASE_NAME, ) } else { @@ -1229,7 +1238,7 @@ impl Client { "FROM {db_name}.timeseries_schema ", "WHERE timeseries_name NOT IN ", "({current_keys}) ", - "FORMAT JSONEachRow;", + "FORMAT Native;", ), db_name = crate::DATABASE_NAME, current_keys = schema @@ -1240,21 +1249,22 @@ impl Client { ) } }; - let body = self.execute_with_body(sql).await?.1; - if body.is_empty() { + let body = self.execute_with_block(&sql).await?; + let Some(data) = body.data.as_ref() else { trace!(self.log, "no new timeseries schema in database"); - } else { - trace!(self.log, "extracting new timeseries schema"); - let new = body.lines().map(|line| { - let schema = TimeseriesSchema::from( - serde_json::from_str::(line) - .expect( - "Failed to deserialize TimeseriesSchema from database", - ), - ); - (schema.timeseries_name.clone(), schema) - }); - schema.extend(new); + return Ok(()); + }; + if data.n_rows == 0 { + trace!(self.log, "no new timeseries schema in database"); + return Ok(()); + } + trace!( + self.log, + "retrieved new timeseries schema"; + "n_schema" => data.n_rows, + ); + for new_schema in TimeseriesSchema::from_block(data)?.into_iter() { + schema.insert(new_schema.timeseries_name.clone(), new_schema); } Ok(()) } @@ -1348,7 +1358,7 @@ impl Client { "table_name" => table, "n_timeseries" => chunk.len(), ); - self.execute_native(sql).await?; + self.execute_native(&sql).await?; } } Ok(()) @@ -3126,7 +3136,7 @@ mod tests { "INSERT INTO oximeter.{field_table} FORMAT JSONEachRow {row}" ); client - .execute_native(insert_sql) + .execute_native(&insert_sql) .await .expect("Failed to insert field row"); @@ -3137,7 +3147,7 @@ mod tests { crate::DATABASE_SELECT_FORMAT, ); let body = client - .execute_with_body(select_sql) + .execute_with_body(&select_sql) .await .expect("Failed to select field row") .1; @@ -3466,7 +3476,7 @@ mod tests { ); println!("Inserted row: {}", inserted_row); client - .execute_native(insert_sql) + .execute_native(&insert_sql) .await .expect("Failed to insert measurement row"); @@ -4037,11 +4047,11 @@ mod tests { // table, to version 2, which adds two columns to that table in // different SQL files. client - .execute_native(format!("CREATE DATABASE {test_name};")) + .execute_native(&format!("CREATE DATABASE {test_name};")) .await .unwrap(); client - .execute_native(format!( + .execute_native(&format!( "\ CREATE TABLE {test_name}.tbl (\ `col0` UInt8 \ @@ -4086,7 +4096,7 @@ mod tests { // Check that it actually worked! let body = client - .execute_with_body(format!( + .execute_with_body(&format!( "\ SELECT name, type FROM system.columns \ WHERE database = '{test_name}' AND table = 'tbl' \ @@ -4249,11 +4259,11 @@ mod tests { // modifications over two versions, rather than as multiple schema // upgrades in one version bump. client - .execute_native(format!("CREATE DATABASE {test_name};")) + .execute_native(&format!("CREATE DATABASE {test_name};")) .await .unwrap(); client - .execute_native(format!( + .execute_native(&format!( "\ CREATE TABLE {test_name}.tbl (\ `col0` UInt8 \ diff --git a/oximeter/db/src/lib.rs b/oximeter/db/src/lib.rs index 36fff6056f..3712771595 100644 --- a/oximeter/db/src/lib.rs +++ b/oximeter/db/src/lib.rs @@ -20,6 +20,8 @@ pub use oximeter::Field; pub use oximeter::FieldType; pub use oximeter::Measurement; pub use oximeter::Sample; +use parse_display::Display; +use parse_display::FromStr; use schemars::JsonSchema; use serde::Deserialize; use serde::Serialize; @@ -224,7 +226,17 @@ pub struct Timeseries { } #[derive( - Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Deserialize, Serialize, + Clone, + Copy, + Debug, + PartialEq, + Eq, + PartialOrd, + Ord, + Deserialize, + Serialize, + FromStr, + Display, )] pub enum DbFieldSource { Target, diff --git a/oximeter/db/src/model/from_block.rs b/oximeter/db/src/model/from_block.rs new file mode 100644 index 0000000000..83e0f0a0e7 --- /dev/null +++ b/oximeter/db/src/model/from_block.rs @@ -0,0 +1,142 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright 2024 Oxide Computer Company + +//! Trait for deserializing an array of values from a `Block`. + +use crate::native::block::Block; +use crate::native::block::DataType; +use crate::native::block::ValueArray; +use crate::native::Error; +use oximeter::AuthzScope; +use oximeter::FieldSchema; +use oximeter::TimeseriesDescription; +use oximeter::TimeseriesSchema; +use oximeter::Units; +use std::collections::BTreeSet; +use std::num::NonZeroU8; + +/// Trait for deserializing an array of items from a ClickHouse data block. +pub trait FromBlock: Sized { + /// Deserialize an array of `Self`s from a block. + fn from_block(block: &Block) -> Result, Error>; +} + +// TODO-cleanup: This is probably a good candidate for a derive-macro, which +// expands to the code that checks that names / types in the block match those +// of the fields in the struct itself. +impl FromBlock for TimeseriesSchema { + fn from_block(block: &Block) -> Result, Error> { + if block.is_empty() { + return Ok(vec![]); + } + let n_rows = + usize::try_from(block.n_rows).map_err(|_| Error::BlockTooLarge)?; + let mut out = Vec::with_capacity(n_rows); + let ValueArray::String(timeseries_names) = + block.column_values("timeseries_name")? + else { + return Err(Error::UnexpectedColumnType); + }; + let ValueArray::Array { + values: field_names, + inner_type: DataType::String, + } = block.column_values("fields.name")? + else { + return Err(Error::UnexpectedColumnType); + }; + let ValueArray::Array { + values: field_types, + inner_type: DataType::Enum8(field_type_variants), + } = block.column_values("fields.type")? + else { + return Err(Error::UnexpectedColumnType); + }; + let ValueArray::Array { + values: field_sources, + inner_type: DataType::Enum8(field_source_variants), + } = block.column_values("fields.source")? + else { + return Err(Error::UnexpectedColumnType); + }; + let ValueArray::Enum8 { + variants: datum_type_variants, + values: datum_types, + } = block.column_values("datum_type")? + else { + return Err(Error::UnexpectedColumnType); + }; + let ValueArray::DateTime64 { values: created, .. } = + block.column_values("created")? + else { + return Err(Error::UnexpectedColumnType); + }; + + for row in 0..n_rows { + let ValueArray::String(names) = &field_names[row] else { + unreachable!(); + }; + let ValueArray::Enum8 { values: row_field_types, .. } = + &field_types[row] + else { + unreachable!(); + }; + let ValueArray::Enum8 { values: row_field_sources, .. } = + &field_sources[row] + else { + unreachable!(); + }; + let mut field_schema = BTreeSet::new(); + let n_fields = names.len(); + for field in 0..n_fields { + let schema = FieldSchema { + name: names[field].clone(), + field_type: field_type_variants[&row_field_types[field]] + .parse() + .map_err(|_| { + Error::Serde(format!( + "Failed to deserialize field type from database: {:?}", + field_type_variants[&row_field_types[field]] + )) + })?, + source: field_source_variants[&row_field_sources[field]] + .parse() + .map_err(|_| { + Error::Serde(format!( + "Failed to deserialize field source from database: {:?}", + field_source_variants[&row_field_sources[field]])) + })?, + description: String::new(), + }; + field_schema.insert(schema); + } + let schema = TimeseriesSchema { + timeseries_name: + timeseries_names[row].clone().parse().map_err(|_| { + Error::Serde(format!( + "Failed to deserialize timeseries name from database: {:?}", + ×eries_names[row] + )) + })?, + description: TimeseriesDescription::default(), + field_schema, + datum_type: datum_type_variants[&datum_types[row]] + .parse() + .map_err(|_| { + Error::Serde(format!( + "Failed to deserialize datum type from database: {:?}", + &datum_type_variants[&datum_types[row]] + )) + })?, + version: unsafe { NonZeroU8::new_unchecked(1) }, + authz_scope: AuthzScope::Fleet, + units: Units::None, + created: created[row].to_utc(), + }; + out.push(schema); + } + Ok(out) + } +} diff --git a/oximeter/db/src/model.rs b/oximeter/db/src/model/mod.rs similarity index 99% rename from oximeter/db/src/model.rs rename to oximeter/db/src/model/mod.rs index d57819b0d0..eba2333ae5 100644 --- a/oximeter/db/src/model.rs +++ b/oximeter/db/src/model/mod.rs @@ -29,6 +29,8 @@ use oximeter::types::Measurement; use oximeter::types::MissingDatum; use oximeter::types::Sample; use oximeter::Quantile; +use parse_display::Display; +use parse_display::FromStr; use serde::Deserialize; use serde::Serialize; use std::collections::BTreeMap; @@ -38,6 +40,9 @@ use std::net::IpAddr; use std::net::Ipv6Addr; use uuid::Uuid; +pub mod from_block; +pub mod to_block; + /// Describes the version of the Oximeter database. /// /// For usage and details see: @@ -170,7 +175,9 @@ impl From for DbTimeseriesSchema { } } -#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)] +#[derive( + Clone, Copy, Debug, Serialize, Deserialize, PartialEq, FromStr, Display, +)] pub enum DbFieldType { String, I8, @@ -223,7 +230,9 @@ impl From for DbFieldType { } } -#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)] +#[derive( + Clone, Copy, Debug, Serialize, Deserialize, PartialEq, FromStr, Display, +)] pub enum DbDatumType { Bool, I8, diff --git a/oximeter/db/src/model/to_block.rs b/oximeter/db/src/model/to_block.rs new file mode 100644 index 0000000000..5a6bb91766 --- /dev/null +++ b/oximeter/db/src/model/to_block.rs @@ -0,0 +1,115 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright 2024 Oxide Computer Company + +//! Trait for serializing an array of values into a `Block`. + +use crate::native::block::Block; +use crate::native::block::Column; +use crate::native::block::DataType; +use crate::native::block::Precision; +use crate::native::block::ValueArray; +use crate::native::Error; +use chrono::TimeZone as _; +use chrono_tz::Tz; +use indexmap::IndexMap; +use oximeter::TimeseriesSchema; + +include!(concat!(env!("OUT_DIR"), "/enum_defs.rs")); + +/// Trait for serializing an array of items to a ClickHouse data block. +pub trait ToBlock: Sized { + /// Serialize an array of `Self`s to a block. + fn to_block(items: &[Self]) -> Result; +} + +// TODO-cleanup: This is probably a good candidate for a derive-macro, which +// expands to the code that checks that names / types in the block match those +// of the fields in the struct itself. +impl ToBlock for TimeseriesSchema { + fn to_block(items: &[Self]) -> Result { + let n_items = items.len(); + let mut timeseries_names = Vec::with_capacity(n_items); + let mut field_names = Vec::with_capacity(n_items); + let mut field_types = Vec::with_capacity(n_items); + let mut field_sources = Vec::with_capacity(n_items); + let mut datum_types = Vec::with_capacity(n_items); + let mut created = Vec::with_capacity(n_items); + for item in items.iter() { + timeseries_names.push(item.timeseries_name.to_string()); + let n_fields = item.field_schema.len(); + let mut row_field_names = Vec::with_capacity(n_fields); + let mut row_field_types = Vec::with_capacity(n_fields); + let mut row_field_sources = Vec::with_capacity(n_fields); + for field in item.field_schema.iter() { + row_field_names.push(field.name.clone()); + let ty = TYPE_ENUM_REV_MAP.get(&field.field_type).unwrap(); + row_field_types.push(*ty); + let src = SOURCE_ENUM_REV_MAP.get(&field.source).unwrap(); + row_field_sources.push(*src); + } + field_names.push(ValueArray::String(row_field_names)); + field_types.push(ValueArray::Enum8 { + variants: TYPE_ENUM_MAP.clone(), + values: row_field_types, + }); + field_sources.push(ValueArray::Enum8 { + variants: SOURCE_ENUM_MAP.clone(), + values: row_field_sources, + }); + datum_types + .push(*DATUM_TYPE_ENUM_REV_MAP.get(&item.datum_type).unwrap()); + created.push(Tz::UTC.from_utc_datetime(&item.created.naive_utc())); + } + Ok(Block { + name: String::new(), + info: Default::default(), + n_columns: 6, + n_rows: u64::try_from(n_items).map_err(|_| Error::BlockTooLarge)?, + columns: IndexMap::from([ + ( + String::from("timeseries_name"), + Column::from(ValueArray::String(timeseries_names)), + ), + ( + String::from("fields.name"), + Column::from(ValueArray::Array { + inner_type: DataType::String, + values: field_names, + }), + ), + ( + String::from("fields.type"), + Column::from(ValueArray::Array { + inner_type: TYPE_ENUM_DATA_TYPE.clone(), + values: field_types, + }), + ), + ( + String::from("fields.source"), + Column::from(ValueArray::Array { + inner_type: SOURCE_ENUM_DATA_TYPE.clone(), + values: field_sources, + }), + ), + ( + String::from("datum_type"), + Column::from(ValueArray::Enum8 { + variants: DATUM_TYPE_ENUM_MAP.clone(), + values: datum_types, + }), + ), + ( + String::from("created"), + Column::from(ValueArray::DateTime64 { + values: created, + precision: Precision::new(9).unwrap(), + tz: Tz::UTC, + }), + ), + ]), + }) + } +} diff --git a/oximeter/db/src/native/block.rs b/oximeter/db/src/native/block.rs index 10727b6532..4b0405e023 100644 --- a/oximeter/db/src/native/block.rs +++ b/oximeter/db/src/native/block.rs @@ -6,20 +6,28 @@ //! Types for working with actual blocks and columns of data. +use super::packets::server::ColumnDescription; use super::Error; use chrono::DateTime; use chrono::NaiveDate; use chrono_tz::Tz; use indexmap::IndexMap; +use nom::branch::alt; use nom::bytes::complete::tag; use nom::bytes::complete::take_while1; +use nom::character::complete::alphanumeric1; +use nom::character::complete::i8 as nom_i8; use nom::character::complete::u8 as nom_u8; +use nom::combinator::all_consuming; use nom::combinator::eof; use nom::combinator::map; use nom::combinator::map_opt; use nom::combinator::opt; +use nom::combinator::value; +use nom::multi::separated_list1; use nom::sequence::delimited; use nom::sequence::preceded; +use nom::sequence::separated_pair; use nom::sequence::tuple; use nom::IResult; use std::fmt; @@ -67,26 +75,15 @@ impl Block { self.n_columns == 0 && self.n_rows == 0 } - /// Create an empty block with the provided column names and types - pub fn empty<'a>( - types: impl IntoIterator, - ) -> Result { - let mut columns = IndexMap::new(); - let mut n_columns = 0; - for (name, type_) in types.into_iter() { - if !type_.is_supported() { - return Err(Error::UnsupportedDataType(type_.to_string())); - } - n_columns += 1; - columns.insert(name.to_string(), Column::empty(type_)); - } - Ok(Self { + /// Create an empty block. + pub fn empty() -> Self { + Self { name: String::new(), info: BlockInfo::default(), - n_columns, + n_columns: 0, n_rows: 0, - columns, - }) + columns: IndexMap::new(), + } } /// Concatenate this data block with another. @@ -106,7 +103,10 @@ impl Block { Ok(()) } - fn matches_structure(&self, block: &Block) -> bool { + /// Return true if this block matches the structure of the other. + /// + /// This means it has the same column names and types. + pub fn matches_structure(&self, block: &Block) -> bool { if self.n_columns != block.n_columns { return false; } @@ -120,6 +120,32 @@ impl Block { } true } + + /// Return the values of the named column, if it exists. + pub fn column_values(&self, name: &str) -> Result<&ValueArray, Error> { + self.columns + .get(name) + .map(|col| &col.values) + .ok_or_else(|| Error::NoSuchColumn(name.to_string())) + } + + pub(crate) fn matches_table_description( + &self, + columns: &[ColumnDescription], + ) -> bool { + if self.n_columns != columns.len() as u64 { + return false; + } + for (our_col, their_col) in self.columns.iter().zip(columns.iter()) { + if our_col.0 != &their_col.name { + return false; + } + if our_col.1.data_type != their_col.data_type { + return false; + } + } + true + } } /// Details about the block. @@ -177,6 +203,13 @@ pub struct Column { pub data_type: DataType, } +impl From for Column { + fn from(values: ValueArray) -> Self { + let data_type = values.data_type(); + Self { values, data_type } + } +} + impl Column { /// Create an empty column of the provided type. pub fn empty(data_type: DataType) -> Self { @@ -194,6 +227,16 @@ impl Column { self.values.concat(rhs.values); Ok(()) } + + /// Return true if the column is empty. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } + + /// Return the number of elements in the column. + pub fn len(&self) -> usize { + self.values.len() + } } /// An array of singly-typed data values from the server. @@ -252,7 +295,7 @@ impl ValueArray { } /// Return an empty value array of the provided type. - fn empty(data_type: &DataType) -> ValueArray { + pub fn empty(data_type: &DataType) -> ValueArray { match data_type { DataType::UInt8 => ValueArray::UInt8(vec![]), DataType::UInt16 => ValueArray::UInt16(vec![]), @@ -589,6 +632,39 @@ impl DataType { pub(crate) fn is_nullable(&self) -> bool { matches!(self, DataType::Nullable(_)) } + + /// Parse out a data type from a string. + /// + /// This is a `nom`-based function, so that the method can be used in other + /// contexts. The `DataType::from_str()` implementation is a thin wrapper + /// around this. + pub(super) fn nom_parse(s: &str) -> IResult<&str, Self> { + alt(( + value(DataType::UInt8, tag("UInt8")), + value(DataType::UInt16, tag("UInt16")), + value(DataType::UInt32, tag("UInt32")), + value(DataType::UInt64, tag("UInt64")), + value(DataType::UInt128, tag("UInt128")), + value(DataType::Int8, tag("Int8")), + value(DataType::Int16, tag("Int16")), + value(DataType::Int32, tag("Int32")), + value(DataType::Int64, tag("Int64")), + value(DataType::Int128, tag("Int128")), + value(DataType::Float32, tag("Float32")), + value(DataType::Float64, tag("Float64")), + value(DataType::String, tag("String")), + value(DataType::Uuid, tag("UUID")), + value(DataType::Ipv4, tag("IPv4")), + value(DataType::Ipv6, tag("IPv6")), + // IMPORTANT: This needs to consume all its input, otherwise we may + // parse something like `DateTime(UTC)` as `Date`, which is + // incorrect. + value(DataType::Date, all_consuming(tag("Date"))), + // These need to be nested because `alt` supports a max of 21 + // parsers, and we have 22 data types. + alt((datetime, datetime64, enum8, nullable, array)), + ))(s) + } } impl fmt::Display for DataType { @@ -632,9 +708,23 @@ impl fmt::Display for DataType { } // Parse a quoted timezone, like `'UTC'` or `'America/Los_Angeles'` +// +// Note that the quotes may optionally be escaped, like `\'UTC\'`, which is +// needed to support deserializing table descriptions, where the types for each +// column are serialized as an escaped string. fn quoted_timezone(s: &str) -> IResult<&str, Tz> { map( - delimited(tag("'"), take_while1(|c| c != '\''), tag("'")), + delimited( + preceded(opt(tag("\\")), tag("'")), + take_while1(|c: char| { + c.is_ascii_alphanumeric() + || c == '/' + || c == '+' + || c == '-' + || c == '_' + }), + preceded(opt(tag("\\")), tag("'")), + ), parse_timezone, )(s) } @@ -687,104 +777,68 @@ fn parse_timezone(s: &str) -> Tz { s.parse().unwrap_or_else(|_| *DEFAULT_TIMEZONE) } -impl std::str::FromStr for DataType { - type Err = Error; - - fn from_str(s: &str) -> Result { - // Simple scalar types. - if s == "UInt8" { - return Ok(DataType::UInt8); - } else if s == "UInt16" { - return Ok(DataType::UInt16); - } else if s == "UInt32" { - return Ok(DataType::UInt32); - } else if s == "UInt64" { - return Ok(DataType::UInt64); - } else if s == "UInt128" { - return Ok(DataType::UInt128); - } else if s == "Int8" { - return Ok(DataType::Int8); - } else if s == "Int16" { - return Ok(DataType::Int16); - } else if s == "Int32" { - return Ok(DataType::Int32); - } else if s == "Int64" { - return Ok(DataType::Int64); - } else if s == "Int128" { - return Ok(DataType::Int128); - } else if s == "Float32" { - return Ok(DataType::Float32); - } else if s == "Float64" { - return Ok(DataType::Float64); - } else if s == "String" { - return Ok(DataType::String); - } else if s == "UUID" { - return Ok(DataType::Uuid); - } else if s == "IPv4" { - return Ok(DataType::Ipv4); - } else if s == "IPv6" { - return Ok(DataType::Ipv6); - } else if s == "Date" { - return Ok(DataType::Date); - } - - // Check for datetime, possibly with a timezone. - if let Ok((_, dt)) = datetime(s) { - return Ok(dt); - }; +/// Parse an enum variant name. +fn variant_name(s: &str) -> IResult<&str, &str> { + delimited( + preceded(opt(tag("\\")), tag("'")), + alphanumeric1, + preceded(opt(tag("\\")), tag("'")), + )(s) +} - // Check for DateTime64 with precision, and possibly a timezone. - if let Ok((_, dt)) = datetime64(s) { - return Ok(dt); - }; +/// Parse a single enum variant, like `'Foo' = 1`. +/// +/// Note that the single-quotes may be escaped, which is required for parsing +/// the `ColumnDescription` type from a `TableColumns` server packet. +fn enum_variant(s: &str) -> IResult<&str, (i8, &str)> { + map(separated_pair(variant_name, tag(" = "), nom_i8), |(name, variant)| { + (variant, name) + })(s) +} - // Check for Enum8s. - // - // These are written like "Enum8('foo' = 1, 'bar' = 2)" - if let Some(suffix) = s.strip_prefix("Enum8(") { - let Some(inner) = suffix.strip_suffix(")") else { - return Err(Error::UnsupportedDataType(s.to_string())); - }; +/// Parse an `Enum8` data type from a string. +pub(super) fn enum8(s: &str) -> IResult<&str, DataType> { + map( + delimited( + tag("Enum8("), + separated_list1(tag(", "), enum_variant), + tag(")"), + ), + |variants| { let mut map = IndexMap::new(); - for each in inner.split(',') { - let Some((name, value)) = each.split_once(" = ") else { - return Err(Error::UnsupportedDataType(s.to_string())); - }; - let Ok(value) = value.parse() else { - return Err(Error::UnsupportedDataType(s.to_string())); - }; - // Trim whitespace from the name and strip any single-quotes. - let name = name.trim().trim_matches('\'').to_string(); - map.insert(value, name.to_string()); - } - return Ok(DataType::Enum8(map)); - } + for (variant, name) in variants.into_iter() { + map.insert(variant, name.to_string()); + } + DataType::Enum8(map) + }, + )(s) +} - // Recurse for nullable types. - if let Some(suffix) = s.strip_prefix("Nullable(") { - let Some(inner) = suffix.strip_suffix(')') else { - return Err(Error::UnsupportedDataType(s.to_string())); - }; - return inner - .parse() - .map(|inner| DataType::Nullable(Box::new(inner))); - } +fn nullable(s: &str) -> IResult<&str, DataType> { + map(delimited(tag("Nullable("), DataType::nom_parse, tag(")")), |inner| { + DataType::Nullable(Box::new(inner)) + })(s) +} - // And for arrays. - if let Some(suffix) = s.strip_prefix("Array(") { - let Some(inner) = suffix.strip_suffix(')') else { - return Err(Error::UnsupportedDataType(s.to_string())); - }; - return inner.parse().map(|inner| DataType::Array(Box::new(inner))); - } +fn array(s: &str) -> IResult<&str, DataType> { + map(delimited(tag("Array("), DataType::nom_parse, tag(")")), |inner| { + DataType::Array(Box::new(inner)) + })(s) +} + +impl std::str::FromStr for DataType { + type Err = Error; - // Anything else is unsupported for now. - Err(Error::UnsupportedDataType(s.to_string())) + fn from_str(s: &str) -> Result { + Self::nom_parse(s) + .map(|(_, parsed)| parsed) + .map_err(|_| Error::UnsupportedDataType(s.to_string())) } } #[cfg(test)] mod tests { + use super::enum8; use super::Block; use super::BlockInfo; use super::Column; @@ -794,6 +848,8 @@ mod tests { use super::DEFAULT_TIMEZONE; use crate::native::block::datetime; use crate::native::block::datetime64; + use crate::native::block::enum_variant; + use crate::native::block::quoted_timezone; use chrono::SubsecRound as _; use chrono::Utc; use chrono_tz::Tz; @@ -933,6 +989,14 @@ mod tests { assert!(datetime64("DateTime64(1,'UTC')").is_err()); } + #[test] + fn parse_escaped_date_time64() { + assert_eq!( + DataType::DateTime64(Precision(1), Tz::UTC), + datetime64(r#"DateTime64(1, \'UTC\')"#).unwrap().1 + ); + } + #[test] fn concat_blocks() { let data = vec![0, 1]; @@ -955,4 +1019,63 @@ mod tests { ValueArray::UInt64([data.as_slice(), data.as_slice()].concat()) ); } + + #[test] + fn test_parse_enum_variant() { + assert_eq!(enum_variant("'Foo' = 1'").unwrap().1, (1, "Foo"),); + assert_eq!(enum_variant("\\'Foo\\' = 1'").unwrap().1, (1, "Foo"),); + + enum_variant("'Foo'").unwrap_err(); + enum_variant("'Foo' = ").unwrap_err(); + enum_variant("'Foo' = x").unwrap_err(); + enum_variant("\"Foo\" = 1").unwrap_err(); + } + + #[test] + fn test_parse_enum8() { + let parsed = enum8("Enum8('Foo' = 1, 'Bar' = 2)").unwrap().1; + let DataType::Enum8(map) = parsed else { + panic!("Expected DataType::Enum8, found {parsed:#?}"); + }; + assert_eq!(map.len(), 2); + assert_eq!(map.get(&1).unwrap(), "Foo"); + assert_eq!(map.get(&2).unwrap(), "Bar"); + } + + #[test] + fn test_parse_array_enum8_with_escapes() { + const INPUT: &str = r#"Array(Enum8(\'Bool\' = 1, \'I64\' = 2))"#; + let parsed = DataType::nom_parse(INPUT).unwrap().1; + let DataType::Array(inner) = parsed else { + panic!("Expected a `DataType::Array(_)`, found {parsed:#?}"); + }; + let DataType::Enum8(map) = &*inner else { + panic!("Expected a `DataType::Enum8(_)`, found {inner:#?}"); + }; + assert_eq!(map.len(), 2); + assert_eq!(map.get(&1).unwrap(), "Bool"); + assert_eq!(map.get(&2).unwrap(), "I64"); + } + + #[test] + fn test_parse_all_known_timezones() { + for tz in chrono_tz::TZ_VARIANTS.iter() { + let quoted = format!("'{}'", tz); + let Ok(out) = quoted_timezone("ed) else { + panic!("Failed to parse quoted timezone: {quoted}"); + }; + assert_eq!(&out.1, tz, "Failed to parse quoted timezone: {quoted}"); + + let escape_quoted = format!("\\'{}\\'", tz); + let Ok(out) = quoted_timezone(&escape_quoted) else { + panic!( + "Failed to parse escaped quoted timezone: {escape_quoted}" + ); + }; + assert_eq!( + &out.1, tz, + "Failed to parse escaped quoted timezone: {escape_quoted}" + ); + } + } } diff --git a/oximeter/db/src/native/connection.rs b/oximeter/db/src/native/connection.rs index 911788a91f..706dbf3331 100644 --- a/oximeter/db/src/native/connection.rs +++ b/oximeter/db/src/native/connection.rs @@ -6,6 +6,7 @@ //! A connection and pool for talking to the ClickHouse server. +use super::block::Block; use super::io::packet::client::Encoder; use super::io::packet::server::Decoder; use super::packets::client::Packet as ClientPacket; @@ -208,8 +209,34 @@ impl Connection { Ok(false) } - /// Send a SQL query, possibly with data. + /// Send a SQL query that inserts data. + pub async fn insert( + &mut self, + query: &str, + block: Block, + ) -> Result { + self.query_inner(query, Some(block)).await + } + + /// Send a SQL query, without any data. pub async fn query(&mut self, query: &str) -> Result { + self.query_inner(query, None).await + } + + // Send a SQL query, possibly with data. + // + // If data is present, it is sent after the SQL query itself. An error is + // returned if the server indicates that the block structure required by the + // insert query doesn't match that of the provided block. + // + // IMPORTANT: We do not currently validate that data is provided iff the + // query is an INSERT statement! Callers are required to ensure that they + // provide data if and only if the query requires it. + async fn query_inner( + &mut self, + query: &str, + maybe_data: Option, + ) -> Result { let mut query_result = QueryResult { id: Uuid::new_v4(), progress: Progress::default(), @@ -221,12 +248,109 @@ impl Connection { self.writer.send(ClientPacket::Query(query)).await?; probes::packet__sent!(|| "Query"); self.outstanding_query = true; + + // If we have data to send, wait for the server to send an empty block + // that describes its structure. + if let Some(block_to_insert) = maybe_data { + let res: Result<(), Error> = loop { + match self.reader.next().await { + Some(Ok(packet)) => match packet { + ServerPacket::Hello(_) + | ServerPacket::Pong + // The server should only send this after we've + // inserted our data. + | ServerPacket::EndOfStream => + { + let kind = packet.kind(); + probes::unexpected__server__packet!(|| kind); + break Err(Error::UnexpectedPacket(kind)); + } + ServerPacket::Data(block) => { + probes::data__packet__received!(|| { + ( + block.n_columns, + block.n_rows, + block + .columns + .iter() + .map(|(name, col)| { + ( + name.clone(), + col.data_type.to_string(), + ) + }) + .collect::>(), + ) + }); + + // Similar to when selecting data, the server sends + // a block with zero rows that describes the table + // structure, so any block with a non-zero number of + // rows is an error here. + if block.n_rows != 0 { + break Err(Error::ExpectedEmptyDataBlock); + } + + // Don't concatenate the block, but check that its + // structure matches what we're about to insert. + if !block.matches_structure(&block_to_insert) { + break Err(Error::MismatchedBlockStructure); + } + + // Finally, send the actual data block and an empty + // block to tell the server we're finished. + if let Err(e) = self + .writer + .send(ClientPacket::Data(block_to_insert)) + .await + { + break Err(e); + } + break self + .writer + .send(ClientPacket::Data(Block::empty())) + .await; + } + ServerPacket::Exception(exceptions) => { + break Err(Error::Exception { exceptions }) + } + ServerPacket::Progress(progress) => { + query_result.progress += progress + } + ServerPacket::ProfileInfo(info) => { + let _ = query_result.profile_info.replace(info); + } + ServerPacket::TableColumns(columns) => { + if !block_to_insert + .matches_table_description(&columns) + { + break Err(Error::MismatchedBlockStructure); + } + } + ServerPacket::ProfileEvents(block) => { + let _ = query_result.profile_events.replace(block); + } + }, + Some(Err(e)) => break Err(e), + None => break Err(Error::Disconnected), + } + }; + if let Err(e) = res { + self.outstanding_query = false; + return Err(e); + } + } + + // Now wait for the remainder of the query to execute. let res = loop { match self.reader.next().await { Some(Ok(packet)) => match packet { - ServerPacket::Hello(_) => { - probes::unexpected__server__packet!(|| "Hello"); - break Err(Error::UnexpectedPacket("Hello")); + ServerPacket::Hello(_) + | ServerPacket::Pong + | ServerPacket::TableColumns(_) => { + let kind = packet.kind(); + probes::unexpected__server__packet!(|| kind); + break Err(Error::UnexpectedPacket(kind)); } ServerPacket::Data(block) => { probes::data__packet__received!(|| { @@ -263,10 +387,6 @@ impl Connection { ServerPacket::Progress(progress) => { query_result.progress += progress } - ServerPacket::Pong => { - probes::unexpected__server__packet!(|| "Hello"); - break Err(Error::UnexpectedPacket("Pong")); - } ServerPacket::EndOfStream => break Ok(query_result), ServerPacket::ProfileInfo(info) => { let _ = query_result.profile_info.replace(info); diff --git a/oximeter/db/src/native/io/mod.rs b/oximeter/db/src/native/io/mod.rs index 999e90f4f6..1d54b13969 100644 --- a/oximeter/db/src/native/io/mod.rs +++ b/oximeter/db/src/native/io/mod.rs @@ -13,4 +13,5 @@ pub mod packet; pub mod profile_info; pub mod progress; pub mod string; +pub mod table_columns; pub mod varuint; diff --git a/oximeter/db/src/native/io/packet/client.rs b/oximeter/db/src/native/io/packet/client.rs index c8397d68a2..78e9fd09df 100644 --- a/oximeter/db/src/native/io/packet/client.rs +++ b/oximeter/db/src/native/io/packet/client.rs @@ -67,8 +67,7 @@ impl Encoder { io::string::encode("", &mut dst); // Send an empty block to signal the end of data transfer. - self.encode_block(Block::empty(std::iter::empty()).unwrap(), &mut dst) - .unwrap(); + self.encode_block(Block::empty(), &mut dst).unwrap(); } /// Encode a ClientInfo into the buffer. diff --git a/oximeter/db/src/native/io/packet/server.rs b/oximeter/db/src/native/io/packet/server.rs index 0ef6d96d4b..1fd9e2cac4 100644 --- a/oximeter/db/src/native/io/packet/server.rs +++ b/oximeter/db/src/native/io/packet/server.rs @@ -161,6 +161,20 @@ impl tokio_util::codec::Decoder for Decoder { }; Packet::ProfileInfo(info) } + Packet::TABLE_COLUMNS => { + match io::table_columns::decode(&mut buf) { + Ok(Some(columns)) => Packet::TableColumns(columns), + Ok(None) => return Ok(None), + Err(e) => { + probes::invalid__packet!(|| ( + "TableColumns", + src.len() + )); + src.clear(); + return Err(e); + } + } + } Packet::PROFILE_EVENTS => match io::block::decode(&mut buf) { // Profile events are encoded as a data block. Ok(Some(block)) => Packet::ProfileEvents(block), diff --git a/oximeter/db/src/native/io/table_columns.rs b/oximeter/db/src/native/io/table_columns.rs new file mode 100644 index 0000000000..7fb8ad55eb --- /dev/null +++ b/oximeter/db/src/native/io/table_columns.rs @@ -0,0 +1,202 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. +// +// Copyright 2024 Oxide Computer Company + +//! Decoding table column descriptions from the database. +//! +//! As is helpfully noted in the ClickHouse sources at +//! , +//! +//! > NOTE: Serialization format is insane. +//! +//! The server serializes an array of `ColumnDescription`s to help the client +//! validate data before inserting it. This is a pure-text format that includes +//! column names and types, but also comments, codecs, and TTL values. The +//! format is indeed pretty wonky, so we're using `nom` to help parse it +//! robustly. For now, we only care about the names and data types; the +//! remainder is collected as-is, without interpretation, into a generic +//! `details` field. + +use super::string; +use crate::native::block::DataType; +use crate::native::packets::server::ColumnDescription; +use crate::native::Error; +use nom::bytes::streaming::is_not; +use nom::bytes::streaming::tag; +use nom::character::streaming::u64 as nom_u64; +use nom::error::ErrorKind; +use nom::sequence::delimited; +use nom::sequence::separated_pair; +use nom::sequence::terminated; +use nom::Err as NomErr; +use nom::IResult; + +/// Decode an array of `ColumnDescription`s from a buffer. +pub fn decode( + src: &mut &[u8], +) -> Result>, Error> { + // See `src/Storages/ColumnDescription.cpp` for details of the encoding + // here, but briefly: + // + // - The packet starts with a "header" describing the table name (usually + // empty), the version, the number of columns. + // - Each column is serialized as a string, with its name; type; comment; + // compression codec; settings; statistics; and TTL + // - These are all newline delimited, with tabs prefixing each of the items + // listed above. + // + // See https://github.com/ClickHouse/ClickHouse/blob/c82cf25b3e5864bcc153cbe45adb8c6527e1ec6e/src/Server/TCPHandler.cpp#L2433 + // for more details, which is where the encoding of these values starts. + let Some(_table_name) = string::decode(src)? else { + return Ok(None); + }; + + // The column description itself is serialized as a ClickHouse + // varuint-prefixed string. Deserialize this, and then parse out the pieces + // of that parsed string. + let Some(text) = string::decode(src)? else { + return Ok(None); + }; + column_descriptions(&text) +} + +fn column_descriptions( + text: &str, +) -> Result>, Error> { + // Try to match the version "header" + let text = match tag::<_, _, (_, ErrorKind)>("columns format version: 1\n")( + text, + ) { + Ok((text, _match)) => text, + Err(NomErr::Incomplete(_)) => return Ok(None), + Err(_) => return Err(Error::InvalidPacket("TableColumns (header)")), + }; + + // Match the number of columns. + let (text, n_columns) = match column_count(text) { + Ok(input) => input, + Err(NomErr::Incomplete(_)) => return Ok(None), + Err(_) => { + return Err(Error::InvalidPacket("TableColumns (column count)")) + } + }; + + // Extract each column, each of which is on a separate line. + let mut out = Vec::with_capacity( + usize::try_from(n_columns).map_err(|_| Error::BlockTooLarge)?, + ); + for line in text.lines() { + let col = match column_description(line) { + Ok(out) => out.1, + Err(NomErr::Incomplete(_)) => return Ok(None), + Err(_) => { + return Err(Error::InvalidPacket("TableColumns (description)")) + } + }; + out.push(col); + } + assert_eq!(out.len(), n_columns as usize); + Ok(Some(out)) +} + +// Parse out the column count from a header line like: "3 columns:\n". +fn column_count(s: &str) -> IResult<&str, u64> { + terminated(nom_u64, tag(" columns:\n"))(s) +} + +// Parse a backtick-quoted column name like: "`foo.bar`". +fn backtick_quoted_column_name(s: &str) -> IResult<&str, &str> { + delimited(tag("`"), is_not("`"), tag("`"))(s) +} + +// Parse a full column description from one line. +// +// Note that this must _not_ end in a newline, so one should use something like +// `str::lines()` and pass the result to this method. +fn column_description(s: &str) -> IResult<&str, ColumnDescription> { + let (s, (name, data_type)) = separated_pair( + backtick_quoted_column_name, + tag(" "), + DataType::nom_parse, + )(s)?; + + // At this point, we take any remaining output as the details, which may be + // empty. + Ok(( + "", + ColumnDescription { + name: name.to_string(), + data_type, + details: s.to_string(), + }, + )) +} + +#[cfg(test)] +mod tests { + use super::backtick_quoted_column_name; + use super::column_count; + use super::column_description; + use super::column_descriptions; + use super::NomErr; + use crate::native::block::DataType; + + #[test] + fn test_backtick_quoted_column_name() { + assert_eq!(backtick_quoted_column_name("`foo`").unwrap().1, "foo"); + assert_eq!( + backtick_quoted_column_name("`foo.bar`").unwrap().1, + "foo.bar" + ); + assert!(matches!( + backtick_quoted_column_name("`foo").unwrap_err(), + NomErr::Incomplete(_) + )); + } + + #[test] + fn test_column_count() { + assert_eq!(column_count("4 columns:\n").unwrap().1, 4,); + } + + #[test] + fn test_column_description_only_required_parts() { + let desc = column_description("`timeseries_name` String").unwrap().1; + assert_eq!(desc.name, "timeseries_name"); + assert_eq!(desc.data_type, DataType::String); + assert!(desc.details.is_empty()); + } + + #[test] + fn test_column_descriptions() { + static INPUT: &str = r#"columns format version: 1 +2 columns: +`timeseries_name` String +`fields.name` Array(String) +"#; + let columns = column_descriptions(&mut &*INPUT) + .expect("failed to decode column descriptions") + .expect("expected Some(_) column description"); + assert_eq!(columns.len(), 2); + assert_eq!(columns[0].name, "timeseries_name"); + assert_eq!(columns[0].data_type, DataType::String); + assert_eq!(columns[1].name, "fields.name"); + assert_eq!( + columns[1].data_type, + DataType::Array(Box::new(DataType::String)) + ); + } + + #[test] + fn test_column_description_with_default() { + static INPUT: &str = r#"`timeseries_name` String\tDEFAULT "foo""#; + let column = column_description(&mut &*INPUT) + .expect("failed to decode column description") + .1; + assert_eq!(column.name, "timeseries_name"); + assert_eq!(column.data_type, DataType::String); + assert_eq!(column.details, "\\tDEFAULT \"foo\""); + } +} diff --git a/oximeter/db/src/native/mod.rs b/oximeter/db/src/native/mod.rs index 02cd485569..77cbd934fe 100644 --- a/oximeter/db/src/native/mod.rs +++ b/oximeter/db/src/native/mod.rs @@ -171,6 +171,9 @@ pub enum Error { #[error("Unrecognized server packet, kind = {0}")] UnrecognizedServerPacket(u8), + #[error("Invalid data packet of kind '{0}'")] + InvalidPacket(&'static str), + #[error("Encountered non-UTF8 string")] NonUtf8String, @@ -205,11 +208,32 @@ pub enum Error { )] Exception { exceptions: Vec }, - #[error("Cannot concatenate blocks with mismatched structure")] + #[error( + "Mismatched data block structure when concatenating blocks or \ + inserting data blocks into the database" + )] MismatchedBlockStructure, #[error("Value out of range for corresponding ClickHouse type")] OutOfRange { type_name: String, min: String, max: String, value: String }, + + #[error("Failed to serialize / deserialize value from the database")] + Serde(String), + + #[error("No column with name '{0}'")] + NoSuchColumn(String), + + #[error("Too many rows to create block")] + TooManyRows, + + #[error("Column has unexpected type")] + UnexpectedColumnType, + + #[error("Data block is too large")] + BlockTooLarge, + + #[error("Expected an empty data block")] + ExpectedEmptyDataBlock, } /// Error codes and related constants. diff --git a/oximeter/db/src/native/packets/server.rs b/oximeter/db/src/native/packets/server.rs index 2e6be64a9d..0acf614496 100644 --- a/oximeter/db/src/native/packets/server.rs +++ b/oximeter/db/src/native/packets/server.rs @@ -6,10 +6,30 @@ //! Packets sent from the server. +use crate::native::block::Block; +use crate::native::block::DataType; use std::fmt; use std::time::Duration; -use crate::native::block::Block; +/// Description of a single column in a table. +/// +/// This is used during insertion queries. When the client sends a request to +/// insert data, the server responds with a description of all the columns in +/// the table, which the client is supposed to use to verify the data block +/// being inserted. +#[derive(Clone, Debug, PartialEq)] +pub struct ColumnDescription { + /// The name of the column. + pub name: String, + /// The type of the column. + pub data_type: DataType, + /// Other details for the column. + /// + /// This is collected as a string, but otherwise unparsed or processed. We + /// don't care about these details at this point, and do nothing with them + /// for now. + pub details: String, +} /// A packet sent from the ClickHouse server to the client. #[derive(Clone, Debug, PartialEq)] @@ -30,6 +50,8 @@ pub enum Packet { EndOfStream, /// Profiling data for a query. ProfileInfo(ProfileInfo), + /// Metadata about the columns in a table. + TableColumns(Vec), /// A data block containing profiling events during a query. ProfileEvents(Block), } @@ -42,6 +64,7 @@ impl Packet { pub const PONG: u8 = 4; pub const END_OF_STREAM: u8 = 5; pub const PROFILE_INFO: u8 = 6; + pub const TABLE_COLUMNS: u8 = 11; pub const PROFILE_EVENTS: u8 = 14; /// Return the kind of the packet as a string. @@ -54,6 +77,7 @@ impl Packet { Packet::Pong => "Pong", Packet::EndOfStream => "EndOfStream", Packet::ProfileInfo(_) => "ProfileInfo", + Packet::TableColumns(_) => "TableColumns", Packet::ProfileEvents(_) => "ProfileEvents", } } diff --git a/oximeter/types/Cargo.toml b/oximeter/types/Cargo.toml index 6d6bbc07e6..3f4ab66c2e 100644 --- a/oximeter/types/Cargo.toml +++ b/oximeter/types/Cargo.toml @@ -14,6 +14,7 @@ float-ord.workspace = true num.workspace = true omicron-common.workspace = true omicron-workspace-hack.workspace = true +parse-display.workspace = true regex.workspace = true schemars = { workspace = true, features = [ "uuid1", "bytes", "chrono" ] } serde.workspace = true diff --git a/oximeter/types/src/schema.rs b/oximeter/types/src/schema.rs index 135c77462a..3f0e8beb5c 100644 --- a/oximeter/types/src/schema.rs +++ b/oximeter/types/src/schema.rs @@ -14,6 +14,8 @@ use crate::Metric; use crate::Target; use chrono::DateTime; use chrono::Utc; +use parse_display::Display; +use parse_display::FromStr; use schemars::JsonSchema; use serde::Deserialize; use serde::Serialize; @@ -63,11 +65,14 @@ impl FieldSchema { Debug, PartialEq, Eq, + Hash, PartialOrd, Ord, Deserialize, Serialize, JsonSchema, + FromStr, + Display, )] #[serde(rename_all = "snake_case")] pub enum FieldSource { diff --git a/oximeter/types/src/types.rs b/oximeter/types/src/types.rs index 60260e3649..cea48f4477 100644 --- a/oximeter/types/src/types.rs +++ b/oximeter/types/src/types.rs @@ -15,6 +15,8 @@ use chrono::DateTime; use chrono::Utc; use num::traits::One; use num::traits::Zero; +use parse_display::Display; +use parse_display::FromStr; use schemars::JsonSchema; use serde::Deserialize; use serde::Serialize; @@ -40,12 +42,15 @@ use uuid::Uuid; Debug, PartialEq, Eq, + Hash, PartialOrd, Ord, JsonSchema, Serialize, Deserialize, strum::EnumIter, + FromStr, + Display, )] #[serde(rename_all = "snake_case")] pub enum FieldType { @@ -86,12 +91,6 @@ impl FieldType { } } -impl std::fmt::Display for FieldType { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "{:?}", self) - } -} - macro_rules! impl_field_type_from { ($ty:ty, $variant:path) => { impl From<&$ty> for FieldType { @@ -313,6 +312,8 @@ pub struct Field { Serialize, Deserialize, strum::EnumIter, + FromStr, + Display, )] #[serde(rename_all = "snake_case")] pub enum DatumType { @@ -390,12 +391,6 @@ impl DatumType { } } -impl std::fmt::Display for DatumType { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - write!(f, "{:?}", self) - } -} - /// A `Datum` is a single sampled data point from a metric. #[derive(Clone, Debug, PartialEq, JsonSchema, Serialize, Deserialize)] #[serde(tag = "type", content = "datum", rename_all = "snake_case")]