Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Read / write timeseries schema with the native client #6943

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 23 additions & 3 deletions nexus-config/src/nexus_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,12 @@ pub struct SchemaConfig {
/// Optional configuration for the timeseries database.
#[derive(Clone, Debug, Default, Deserialize, PartialEq, Serialize)]
pub struct TimeseriesDbConfig {
/// The HTTP address of the ClickHouse server.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub address: Option<SocketAddr>,
/// The native TCP address of the ClickHouse server.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub native_address: Option<SocketAddr>,
}

/// Configuration for the `Dendrite` dataplane daemon.
Expand Down Expand Up @@ -774,7 +778,9 @@ impl std::fmt::Display for SchemeName {
mod test {
use super::*;

use omicron_common::address::{Ipv6Subnet, RACK_PREFIX};
use omicron_common::address::{
Ipv6Subnet, CLICKHOUSE_HTTP_PORT, CLICKHOUSE_TCP_PORT, RACK_PREFIX,
};
use omicron_common::api::internal::shared::SwitchLocation;

use camino::{Utf8Path, Utf8PathBuf};
Expand All @@ -784,7 +790,7 @@ mod test {
use dropshot::ConfigLoggingLevel;
use std::collections::HashMap;
use std::fs;
use std::net::{Ipv6Addr, SocketAddr};
use std::net::{Ipv6Addr, SocketAddr, SocketAddrV6};
use std::str::FromStr;
use std::time::Duration;

Expand Down Expand Up @@ -889,6 +895,7 @@ mod test {
if_exists = "fail"
[timeseries_db]
address = "[::1]:8123"
native_address = "[::1]:9000"
[updates]
trusted_root = "/path/to/root.json"
[tunables]
Expand Down Expand Up @@ -1007,7 +1014,20 @@ mod test {
path: "/nonexistent/path".into()
},
timeseries_db: TimeseriesDbConfig {
address: Some("[::1]:8123".parse().unwrap())
address: Some(SocketAddr::V6(SocketAddrV6::new(
Ipv6Addr::LOCALHOST,
CLICKHOUSE_HTTP_PORT,
0,
0,
))),
native_address: Some(SocketAddr::V6(
SocketAddrV6::new(
Ipv6Addr::LOCALHOST,
CLICKHOUSE_TCP_PORT,
0,
0,
)
)),
},
updates: Some(UpdatesConfig {
trusted_root: Utf8PathBuf::from("/path/to/root.json"),
Expand Down
6 changes: 4 additions & 2 deletions nexus/src/app/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ impl super::Nexus {
.timeseries_schema_list(&pagination.page, limit)
.await
.map_err(|e| match e {
oximeter_db::Error::DatabaseUnavailable(_) => {
oximeter_db::Error::DatabaseUnavailable(_)
| oximeter_db::Error::Connection(_) => {
Error::ServiceUnavailable {
internal_message: e.to_string(),
}
Expand Down Expand Up @@ -150,7 +151,8 @@ impl super::Nexus {
result.tables
})
.map_err(|e| match e {
oximeter_db::Error::DatabaseUnavailable(_) => {
oximeter_db::Error::DatabaseUnavailable(_)
| oximeter_db::Error::Connection(_) => {
Error::ServiceUnavailable {
internal_message: e.to_string(),
}
Expand Down
33 changes: 25 additions & 8 deletions nexus/src/app/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use nexus_db_queries::authn;
use nexus_db_queries::authz;
use nexus_db_queries::context::OpContext;
use nexus_db_queries::db;
use omicron_common::address::CLICKHOUSE_HTTP_PORT;
use omicron_common::address::CLICKHOUSE_TCP_PORT;
use omicron_common::address::DENDRITE_PORT;
use omicron_common::address::MGD_PORT;
Expand Down Expand Up @@ -411,13 +412,12 @@ impl Nexus {
.map_err(|e| e.to_string())?;

// Client to the ClickHouse database.
let timeseries_client =
if let Some(http_address) = &config.pkg.timeseries_db.address {
let native_address =
SocketAddr::new(http_address.ip(), CLICKHOUSE_TCP_PORT);
oximeter_db::Client::new(*http_address, native_address, &log)
} else {
// TODO-cleanup: Remove this when we remove the HTTP client.
// TODO-cleanup: Simplify this when we remove the HTTP client.
let timeseries_client = match (
&config.pkg.timeseries_db.address,
&config.pkg.timeseries_db.native_address,
) {
(None, None) => {
let http_resolver =
qorb_resolver.for_service(ServiceName::Clickhouse);
let native_resolver =
Expand All @@ -427,7 +427,24 @@ impl Nexus {
native_resolver,
&log,
)
};
}
(maybe_http, maybe_native) => {
let (http_address, native_address) =
match (maybe_http, maybe_native) {
(None, None) => unreachable!("handled above"),
(None, Some(native)) => (
SocketAddr::new(native.ip(), CLICKHOUSE_HTTP_PORT),
*native,
),
(Some(http), None) => (
*http,
SocketAddr::new(http.ip(), CLICKHOUSE_TCP_PORT),
),
(Some(http), Some(native)) => (*http, *native),
};
oximeter_db::Client::new(http_address, native_address, &log)
}
};

// TODO-cleanup We may want to make the populator a first-class
// background task.
Expand Down
3 changes: 2 additions & 1 deletion nexus/src/app/oximeter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,8 @@ pub(crate) async fn unassign_producer(

fn map_oximeter_err(error: oximeter_db::Error) -> Error {
match error {
oximeter_db::Error::DatabaseUnavailable(_) => {
oximeter_db::Error::DatabaseUnavailable(_)
| oximeter_db::Error::Connection(_) => {
Error::ServiceUnavailable { internal_message: error.to_string() }
}
_ => Error::InternalError { internal_message: error.to_string() },
Expand Down
3 changes: 3 additions & 0 deletions nexus/test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,6 +485,7 @@ impl<'a, N: NexusServer> ControlPlaneTestContextBuilder<'a, N> {
let dataset_id = Uuid::new_v4();
let http_address = clickhouse.http_address();
let http_port = http_address.port();
let native_address = clickhouse.native_address();
self.rack_init_builder.add_clickhouse_dataset(
zpool_id,
dataset_id,
Expand All @@ -503,6 +504,8 @@ impl<'a, N: NexusServer> ControlPlaneTestContextBuilder<'a, N> {
.as_mut()
.expect("Tests expect to set a port of Clickhouse")
.set_port(http_port);
self.config.pkg.timeseries_db.native_address =
Some(native_address.into());

let pool_name = illumos_utils::zpool::ZpoolName::new_external(zpool_id)
.to_string()
Expand Down
7 changes: 7 additions & 0 deletions oximeter/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ camino.workspace = true
chrono.workspace = true
chrono-tz.workspace = true
clap.workspace = true
const_format.workspace = true
clickward.workspace = true
debug-ignore.workspace = true
dropshot.workspace = true
Expand All @@ -32,6 +33,7 @@ omicron-common.workspace = true
omicron-workspace-hack.workspace = true
oximeter.workspace = true
oxql-types.workspace = true
parse-display.workspace = true
qorb.workspace = true
regex.workspace = true
serde.workspace = true
Expand Down Expand Up @@ -93,6 +95,11 @@ optional = true
workspace = true
features = [ "rt-multi-thread", "macros" ]

[build-dependencies]
anyhow.workspace = true
nom.workspace = true
quote.workspace = true

[dev-dependencies]
camino-tempfile.workspace = true
criterion = { workspace = true, features = [ "async_tokio" ] }
Expand Down
100 changes: 100 additions & 0 deletions oximeter/db/build.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.
//
// Copyright 2024 Oxide Computer Company

use anyhow::Context as _;
use nom::IResult;

/// Build script for generating native type representations from the
/// ground-truth SQL definitions.
fn main() -> anyhow::Result<()> {
const INIT_FILE: &str =
concat!(env!("CARGO_MANIFEST_DIR"), "/schema/single-node/db-init.sql");
let contents = std::fs::read_to_string(INIT_FILE)
.with_context(|| format!("Failed to read SQL file: '{INIT_FILE}'"))?;
let field_type_enum =
find_enum(&contents, "type").context("failed to find column 'type'")?;
let field_source_enum = find_enum(&contents, "source")
.context("failed to find column 'source'")?;
let datum_type_enum = find_enum(&contents, "datum_type")
.context("failed to find column 'datum_type'")?;
std::fs::write(
format!("{}/enum_defs.rs", std::env::var("OUT_DIR")?),
[field_type_enum, field_source_enum, datum_type_enum].join("\n"),
)
.context("writing output file")?;
Ok(())
}

// Find an enum in the `timeseries_schema` table definition for the named
// column, and return the corresponding `DataType::Enum8()` definition for it.
fn find_enum(contents: &str, column: &str) -> Option<String> {
let needle = format!("{column} Enum(\n");
let start = contents.find(&needle)? + needle.len();
let s = &contents[start..].trim();
let (variants, names): (Vec<i8>, Vec<String>) =
variant_list(s).ok()?.1.into_iter().unzip();
let enum_map = quote::format_ident!("{}_ENUM_MAP", column.to_uppercase());
let enum_rev_map =
quote::format_ident!("{}_ENUM_REV_MAP", column.to_uppercase());
let enum_type =
quote::format_ident!("{}_ENUM_DATA_TYPE", column.to_uppercase());
let parsed_type = if column == "type" {
quote::quote! { ::oximeter::FieldType }
} else if column == "source" {
quote::quote! { ::oximeter::FieldSource }
} else if column == "datum_type" {
quote::quote! { ::oximeter::DatumType }
} else {
unreachable!();
};
Some(quote::quote! {
/// Mapping from the variant index to the string form.
#[allow(dead_code)]
static #enum_map: ::std::sync::LazyLock<::indexmap::IndexMap<i8, String>> = ::std::sync::LazyLock::new(|| {
::indexmap::IndexMap::from([
#((#variants, String::from(#names))),*
])
});
/// Reverse mapping, from the _parsed_ form to the variant index.
#[allow(dead_code)]
static #enum_rev_map: ::std::sync::LazyLock<::indexmap::IndexMap<#parsed_type, i8>> = ::std::sync::LazyLock::new(|| {
::indexmap::IndexMap::from([
#((<#parsed_type as ::std::str::FromStr>::from_str(#names).unwrap(), #variants)),*
])
});
/// Actual DataType::Enum8(_) with the contained variant-to-name mapping.
#[allow(dead_code)]
static #enum_type: ::std::sync::LazyLock<crate::native::block::DataType> = ::std::sync::LazyLock::new(|| {
crate::native::block::DataType::Enum8(
::indexmap::IndexMap::from([
#((#variants, String::from(#names))),*
])
)
});
}.to_string())
}

fn variant_list(s: &str) -> IResult<&str, Vec<(i8, String)>> {
nom::multi::separated_list1(
nom::bytes::complete::is_a(" ,\n"),
single_variant,
)(s)
}

fn single_variant(s: &str) -> IResult<&str, (i8, String)> {
nom::combinator::map(
nom::sequence::separated_pair(
nom::sequence::delimited(
nom::bytes::complete::tag("'"),
nom::character::complete::alphanumeric1,
nom::bytes::complete::tag("'"),
),
nom::bytes::complete::tag(" = "),
nom::character::complete::i8,
),
|(name, variant): (&str, i8)| (variant, name.to_string()),
)(s)
}
Loading
Loading