Skip to content

Commit

Permalink
Initialize timeseries database on qorb acquire
Browse files Browse the repository at this point in the history
  • Loading branch information
plotnick committed Oct 15, 2024
1 parent 834ea85 commit ded7d75
Show file tree
Hide file tree
Showing 8 changed files with 211 additions and 27 deletions.
2 changes: 1 addition & 1 deletion nexus/tests/integration_tests/oximeter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::integration_tests::metrics::wait_for_producer;
use nexus_test_interface::NexusServer;
use nexus_test_utils_macros::nexus_test;
use omicron_test_utils::dev::poll::{wait_for_condition, CondCheckError};
use oximeter_db::DbWrite;
use oximeter_db::DbInit;
use std::time::Duration;
use uuid::Uuid;

Expand Down
2 changes: 1 addition & 1 deletion oximeter/collector/src/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use omicron_common::backoff::BackoffError;
use oximeter::types::ProducerResults;
use oximeter::types::ProducerResultsItem;
use oximeter_db::Client;
use oximeter_db::DbWrite;
use oximeter_db::{DbInit, DbWrite};
use qorb::claim::Handle;
use qorb::pool::Pool;
use qorb::resolver::BoxedResolver;
Expand Down
2 changes: 1 addition & 1 deletion oximeter/db/src/bin/oxdb/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ use oximeter::{
types::{Cumulative, Sample},
Metric, Target,
};
use oximeter_db::{make_client, query, Client, DbWrite};
use oximeter_db::{make_client, query, Client, DbInit, DbWrite};
use slog::{debug, info, o, Drain, Level, Logger};
use std::net::IpAddr;
use uuid::Uuid;
Expand Down
47 changes: 27 additions & 20 deletions oximeter/db/src/client/dbwrite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,9 @@ pub(super) struct UnrolledSampleRows {
pub rows: BTreeMap<String, Vec<String>>,
}

/// A trait allowing a [`Client`] to write data into the timeseries database.
///
/// The vanilla [`Client`] object allows users to query the timeseries database, returning
/// timeseries samples corresponding to various filtering criteria. This trait segregates the
/// methods required for _writing_ new data into the database, and is intended only for use by the
/// `oximeter-collector` crate.
/// A trait allowing a client to initialize the timeseries database.
#[async_trait::async_trait]
pub trait DbWrite {
/// Insert the given samples into the database.
async fn insert_samples(&self, samples: &[Sample]) -> Result<(), Error>;

pub trait DbInit {
/// Initialize the replicated telemetry database, creating tables as needed.
async fn init_replicated_db(&self) -> Result<(), Error>;

Expand All @@ -49,16 +41,7 @@ pub trait DbWrite {
}

#[async_trait::async_trait]
impl DbWrite for Client {
/// Insert the given samples into the database.
async fn insert_samples(&self, samples: &[Sample]) -> Result<(), Error> {
debug!(self.log, "unrolling {} total samples", samples.len());
let UnrolledSampleRows { new_schema, rows } =
self.unroll_samples(samples).await;
self.save_new_schema_or_remove(new_schema).await?;
self.insert_unrolled_samples(rows).await
}

impl DbInit for Client {
/// Initialize the replicated telemetry database, creating tables as needed.
///
/// We run both db-init files since we want all tables in production.
Expand Down Expand Up @@ -109,6 +92,30 @@ impl DbWrite for Client {
}
}

/// A trait allowing a [`Client`] to write data into the timeseries database.
///
/// The vanilla [`Client`] object allows users to query the timeseries database, returning
/// timeseries samples corresponding to various filtering criteria. This trait segregates the
/// methods required for _writing_ new data into the database, and is intended only for use by the
/// `oximeter-collector` crate.
#[async_trait::async_trait]
pub trait DbWrite {
/// Insert the given samples into the database.
async fn insert_samples(&self, samples: &[Sample]) -> Result<(), Error>;
}

#[async_trait::async_trait]
impl DbWrite for Client {
/// Insert the given samples into the database.
async fn insert_samples(&self, samples: &[Sample]) -> Result<(), Error> {
debug!(self.log, "unrolling {} total samples", samples.len());
let UnrolledSampleRows { new_schema, rows } =
self.unroll_samples(samples).await;
self.save_new_schema_or_remove(new_schema).await?;
self.insert_unrolled_samples(rows).await
}
}

/// Allow initializing a minimal subset of db tables for replicated cluster
/// testing
#[async_trait::async_trait]
Expand Down
179 changes: 178 additions & 1 deletion oximeter/db/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub(crate) mod query_summary;
#[cfg(any(feature = "sql", test))]
mod sql;

pub use self::dbwrite::DbInit;
pub use self::dbwrite::DbWrite;
pub use self::dbwrite::TestDbWrite;
use crate::client::query_summary::QuerySummary;
Expand All @@ -26,6 +27,7 @@ use crate::TimeseriesPageSelector;
use crate::TimeseriesScanParams;
use crate::TimeseriesSchema;
use anyhow::anyhow;
use anyhow::Context;
use debug_ignore::DebugIgnore;
use dropshot::EmptyScanParams;
use dropshot::PaginationOrder;
Expand Down Expand Up @@ -97,6 +99,7 @@ impl backend::Connector for ReqwestConnector {
client: reqwest::Client::builder()
.pool_max_idle_per_host(1)
.build()
.context("can't connect to database")
.map_err(|e| QorbError::Other(anyhow!(e)))?,
url: format!("http://{}", backend.address),
})
Expand All @@ -111,12 +114,25 @@ impl backend::Connector for ReqwestConnector {
.get(format!("{}/ping", conn.url))
.send()
.await
.map_err(|err| QorbError::Other(anyhow!(err.to_string())))?,
.context("can't ping database")
.map_err(|e| QorbError::Other(anyhow!(e)))?,
)
.await
.map_err(|e| QorbError::Other(anyhow!(e)))?;
Ok(())
}

async fn on_acquire(
&self,
conn: &mut Self::Connection,
) -> Result<(), backend::Error> {
let replicated = false; // TODO
conn.init_db(replicated, model::OXIMETER_VERSION)
.await
.context("can't initialize database")
.map_err(|e| QorbError::Other(anyhow!(e)))?;
Ok(())
}
}

#[derive(Clone, Debug)]
Expand All @@ -125,6 +141,167 @@ pub(crate) struct ReqwestClient {
client: reqwest::Client,
}

/// If we acquire a connection to a fresh database, we must ensure that
/// it is initialized (i.e., that the database exists and has the current
/// schema) before we can insert samples into it. These methods currently
/// duplicate the work of the [Client] database initialization, but with
/// somewhat simpler machinery.
///
/// TODO: De-duplicate with [Client] `impl`.
impl ReqwestClient {
/// Ensure that the acquired database exists and is up-to-date.
async fn init_db(
&self,
replicated: bool,
expected_version: u64,
) -> Result<(), Error> {
// Read the version from the DB
let version = self.read_latest_version().await?;

// Decide how to conform the on-disk version with this version of
// Oximeter.
if version < expected_version {
// If the on-storage version is less than the constant embedded into
// this binary, the DB is out-of-date. Drop it, and re-populate it
// later.
if !replicated {
self.wipe_single_node_db().await?;
self.init_single_node_db().await?;
} else {
self.wipe_replicated_db().await?;
self.init_replicated_db().await?;
}
} else if version > expected_version {
// If the on-storage version is greater than the constant embedded
// into this binary, we may have downgraded.
return Err(Error::DatabaseVersionMismatch {
expected: model::OXIMETER_VERSION,
found: version,
});
} else {
// If the version matches, we don't need to update the DB
return Ok(());
}

self.insert_version(expected_version).await?;
Ok(())
}

/// Read the latest version applied in the database.
pub async fn read_latest_version(&self) -> Result<u64, Error> {
let sql = format!(
"SELECT MAX(value) FROM {db_name}.version;",
db_name = crate::DATABASE_NAME,
);

let version = match self.execute(sql).await {
Ok(body) if body.is_empty() => 0,
Ok(body) => body.trim().parse::<u64>().map_err(|err| {
Error::Database(format!("Cannot read version: {err}"))
})?,
Err(Error::Database(err))
// Case 1: The database has not been created.
if err.contains(CLICKHOUSE_DB_MISSING) ||
// Case 2: The database has been created, but it's old (exists
// prior to the version table).
err.contains(CLICKHOUSE_DB_VERSION_MISSING) => 0,
Err(err) => return Err(err),
};
Ok(version)
}

async fn insert_version(&self, version: u64) -> Result<(), Error> {
let sql = format!(
"INSERT INTO {db_name}.version (*) VALUES ({version}, now());",
db_name = crate::DATABASE_NAME,
);
self.execute(sql).await?;
Ok(())
}

/// Execute a SQL statement, awaiting the response as text.
///
/// This is intended only to be used for initializing the database.
/// It does not implement the logging, probing, or query summarization
/// of [`Client::execute_with_body`].
async fn execute(&self, sql: String) -> Result<String, Error> {
let response = self
.client
.post(&self.url)
.timeout(DEFAULT_REQUEST_TIMEOUT)
.query(&[("output_format_json_quote_64bit_integers", "0")])
.body(sql)
.send()
.await
.map_err(|err| Error::DatabaseUnavailable(err.to_string()))?;

// Convert the HTTP response into a database response.
let response = handle_db_response(response).await?;
response.text().await.map_err(|err| Error::Database(err.to_string()))
}

/// Run one or more SQL statements.
///
/// This is intended only to be used for initializing the database.
async fn run_many_sql_statements(
&self,
sql: impl AsRef<str>,
) -> Result<(), Error> {
for stmt in sql.as_ref().split(';').filter(|s| !s.trim().is_empty()) {
self.execute(stmt.to_string()).await?;
}
Ok(())
}
}

#[async_trait::async_trait]
impl DbInit for ReqwestClient {
/// Initialize the replicated telemetry database, creating tables as needed.
///
/// We run both db-init files since we want all tables in production.
/// These files are intentionally disjoint so that we don't have to
/// duplicate any setup.
async fn init_replicated_db(&self) -> Result<(), Error> {
self.run_many_sql_statements(include_str!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/schema/replicated/db-init-1.sql"
)))
.await?;
self.run_many_sql_statements(include_str!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/schema/replicated/db-init-2.sql"
)))
.await
}

/// Wipe the ClickHouse database entirely from a replicated set up.
async fn wipe_replicated_db(&self) -> Result<(), Error> {
self.run_many_sql_statements(include_str!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/schema/replicated/db-wipe.sql"
)))
.await
}

/// Initialize a single node telemetry database, creating tables as needed.
async fn init_single_node_db(&self) -> Result<(), Error> {
self.run_many_sql_statements(include_str!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/schema/single-node/db-init.sql"
)))
.await
}

/// Wipe the ClickHouse database entirely from a single node set up.
async fn wipe_single_node_db(&self) -> Result<(), Error> {
self.run_many_sql_statements(include_str!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/schema/single-node/db-wipe.sql"
)))
.await
}
}

#[derive(Debug)]
pub(crate) enum ClientSource {
Static(ReqwestClient),
Expand Down
2 changes: 1 addition & 1 deletion oximeter/db/src/client/oxql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1110,7 +1110,7 @@ fn update_total_rows_and_check(
mod tests {
use super::ConsistentKeyGroup;
use crate::client::oxql::chunk_consistent_key_groups_impl;
use crate::{Client, DbWrite};
use crate::{Client, DbInit, DbWrite};
use crate::{Metric, Target};
use chrono::{DateTime, Utc};
use dropshot::test_util::LogContext;
Expand Down
2 changes: 1 addition & 1 deletion oximeter/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ pub mod sql;
pub use client::oxql::OxqlResult;
pub use client::query_summary::QuerySummary;
pub use client::Client;
pub use client::DbWrite;
pub use client::TestDbWrite;
pub use client::{DbInit, DbWrite};
pub use model::OXIMETER_VERSION;

#[derive(Debug, Error)]
Expand Down
2 changes: 1 addition & 1 deletion oximeter/db/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use clickward::{BasePorts, Deployment, DeploymentConfig, KeeperId};
use dropshot::test_util::log_prefix_for_test;
use omicron_test_utils::dev::poll;
use omicron_test_utils::dev::test_setup_log;
use oximeter_db::{Client, DbWrite, OxqlResult, Sample, TestDbWrite};
use oximeter_db::{Client, DbInit, DbWrite, OxqlResult, Sample, TestDbWrite};
use oximeter_test_utils::wait_for_keepers;
use slog::{info, Logger};
use std::collections::BTreeSet;
Expand Down

0 comments on commit ded7d75

Please sign in to comment.