Skip to content

Commit

Permalink
Abstract over kinds of ClickHouse deployments in tests (#6593)
Browse files Browse the repository at this point in the history
- Add the `ClickHouseDeployment` enum, which manages an entire
ClickHouse deployment in test code, either a single-node for most tests,
or a cluster where relevant. For the cluster variant, this adds a way to
wait for the first child or all children, to be shutdown. This fixes a
bug in the logic for managing child processes, where failures of one of
the process could make zombies out of all the others. This also collects
the nodes into arrays, so we can resize the cluster easily if we want,
which fixes #4460.
- Use the new enum in the `ControlPlaneTestContext` for all Nexus
integration tests.
- Rework the `ch-dev` binary to use the new enum, and also print much
more verbose information about what it's doing when starting ClickHouse.
This fixes #3011.
  • Loading branch information
bnaecker authored Sep 19, 2024
1 parent 1b43a0a commit a692ea5
Show file tree
Hide file tree
Showing 10 changed files with 794 additions and 941 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

163 changes: 89 additions & 74 deletions dev-tools/ch-dev/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,21 @@ async fn start_single_node(
let mut signal_stream = signals.fuse();

// Start the database server process, possibly on a specific port
let mut db_instance =
dev::clickhouse::ClickHouseInstance::new_single_node(logctx, port)
let mut deployment =
dev::clickhouse::ClickHouseDeployment::new_single_node(logctx, port)
.await?;
let db_instance = deployment
.instances()
.next()
.expect("Should have launched a ClickHouse instance");
println!(
"ch-dev: running ClickHouse with full command:\n\"clickhouse {}\"",
db_instance.cmdline().join(" ")
);
println!("ch-dev: ClickHouse environment:");
for (k, v) in db_instance.environment() {
println!("\t{k}={v}");
}
println!(
"ch-dev: ClickHouse is running with PID {}",
db_instance
Expand All @@ -94,14 +102,14 @@ async fn start_single_node(
db_instance.port()
);
println!(
"ch-dev: using {} for ClickHouse data storage",
"ch-dev: ClickHouse data stored in: [{}]",
db_instance.data_path()
);

// Wait for the DB to exit itself (an error), or for SIGINT
tokio::select! {
_ = db_instance.wait_for_shutdown() => {
db_instance.cleanup().await.context("clean up after shutdown")?;
_ = deployment.wait_for_shutdown() => {
deployment.cleanup().await.context("clean up after shutdown")?;
bail!("ch-dev: ClickHouse shutdown unexpectedly");
}
caught_signal = signal_stream.next() => {
Expand All @@ -115,7 +123,7 @@ async fn start_single_node(
);

// Remove the data directory.
db_instance
deployment
.wait_for_shutdown()
.await
.context("clean up after SIGINT shutdown")?;
Expand All @@ -135,12 +143,16 @@ async fn start_replicated_cluster(
let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
let replica_config = manifest_dir
.as_path()
.join("../../oximeter/db/src/configs/replica_config.xml");
.join("../../oximeter/db/src/configs/replica_config.xml")
.canonicalize()
.context("Failed to canonicalize replica config path")?;
let keeper_config = manifest_dir
.as_path()
.join("../../oximeter/db/src/configs/keeper_config.xml");
.join("../../oximeter/db/src/configs/keeper_config.xml")
.canonicalize()
.context("Failed to canonicalize keeper config path")?;

let mut cluster = dev::clickhouse::ClickHouseCluster::new(
let mut cluster = dev::clickhouse::ClickHouseDeployment::new_cluster(
logctx,
replica_config,
keeper_config,
Expand All @@ -149,83 +161,86 @@ async fn start_replicated_cluster(
println!(
"ch-dev: running ClickHouse cluster with configuration files:\n \
replicas: {}\n keepers: {}",
cluster.replica_config_path().display(),
cluster.keeper_config_path().display()
);
let pid_error_msg = "Failed to get process PID, it may not have started";
println!(
"ch-dev: ClickHouse cluster is running with: server PIDs = [{}, {}] \
and keeper PIDs = [{}, {}, {}]",
cluster.replica_1.pid().expect(pid_error_msg),
cluster.replica_2.pid().expect(pid_error_msg),
cluster.keeper_1.pid().expect(pid_error_msg),
cluster.keeper_2.pid().expect(pid_error_msg),
cluster.keeper_3.pid().expect(pid_error_msg),
);
println!(
"ch-dev: ClickHouse HTTP servers listening on ports: {}, {}",
cluster.replica_1.port(),
cluster.replica_2.port()
);
println!(
"ch-dev: using {} and {} for ClickHouse data storage",
cluster.replica_1.data_path(),
cluster.replica_2.data_path()
cluster.replica_config_path().unwrap().display(),
cluster.keeper_config_path().unwrap().display()
);
for instance in cluster.instances() {
println!(
"ch-dev: running ClickHouse replica with full command:\
\n\"clickhouse {}\"",
instance.cmdline().join(" ")
);
println!("ch-dev: ClickHouse replica environment:");
for (k, v) in instance.environment() {
println!("\t{k}={v}");
}
println!(
"ch-dev: ClickHouse replica PID is {}",
instance.pid().context("Failed to get instance PID")?
);
println!(
"ch-dev: ClickHouse replica data path is {}",
instance.data_path(),
);
println!(
"ch-dev: ClickHouse replica HTTP server is listening on port {}",
instance.address.port(),
);
}
for keeper in cluster.keepers() {
println!(
"ch-dev: running ClickHouse Keeper with full command:\
\n\"clickhouse {}\"",
keeper.cmdline().join(" ")
);
println!("ch-dev: ClickHouse Keeper environment:");
for (k, v) in keeper.environment() {
println!("\t{k}={v}");
}
println!(
"ch-dev: ClickHouse Keeper PID is {}",
keeper.pid().context("Failed to get Keeper PID")?
);
println!(
"ch-dev: ClickHouse Keeper data path is {}",
keeper.data_path(),
);
println!(
"ch-dev: ClickHouse Keeper HTTP server is listening on port {}",
keeper.address.port(),
);
}

// Wait for the replicas and keepers to exit themselves (an error), or for SIGINT
tokio::select! {
_ = cluster.replica_1.wait_for_shutdown() => {
cluster.replica_1.cleanup().await.context(
format!("clean up {} after shutdown", cluster.replica_1.data_path())
)?;
bail!("ch-dev: ClickHouse replica 1 shutdown unexpectedly");
}
_ = cluster.replica_2.wait_for_shutdown() => {
cluster.replica_2.cleanup().await.context(
format!("clean up {} after shutdown", cluster.replica_2.data_path())
)?;
bail!("ch-dev: ClickHouse replica 2 shutdown unexpectedly");
}
_ = cluster.keeper_1.wait_for_shutdown() => {
cluster.keeper_1.cleanup().await.context(
format!("clean up {} after shutdown", cluster.keeper_1.data_path())
)?;
bail!("ch-dev: ClickHouse keeper 1 shutdown unexpectedly");
}
_ = cluster.keeper_2.wait_for_shutdown() => {
cluster.keeper_2.cleanup().await.context(
format!("clean up {} after shutdown", cluster.keeper_2.data_path())
)?;
bail!("ch-dev: ClickHouse keeper 2 shutdown unexpectedly");
}
_ = cluster.keeper_3.wait_for_shutdown() => {
cluster.keeper_3.cleanup().await.context(
format!("clean up {} after shutdown", cluster.keeper_3.data_path())
)?;
bail!("ch-dev: ClickHouse keeper 3 shutdown unexpectedly");
res = cluster.wait_for_shutdown() => {
cluster.cleanup().await.context("cleaning up after shutdown")?;
match res {
Ok(node) => {
bail!(
"ch-dev: ClickHouse cluster {:?} node {} shutdown unexpectedly",
node.kind,
node.index,
);
}
Err(e) => {
bail!(
"ch-dev: Failed to wait for cluster node: {}",
e,
);
}
}
}
caught_signal = signal_stream.next() => {
assert_eq!(caught_signal.unwrap(), SIGINT);
eprintln!(
"ch-dev: caught signal, shutting down and removing \
temporary directories"
);

// Remove the data directories.
let mut instances = vec![
cluster.replica_1,
cluster.replica_2,
cluster.keeper_1,
cluster.keeper_2,
cluster.keeper_3,
];
for instance in instances.iter_mut() {
instance
.wait_for_shutdown()
cluster
.cleanup()
.await
.context(format!("clean up {} after SIGINT shutdown", instance.data_path()))?;
};
.context("clean up after SIGINT shutdown")?;
}
}
Ok(())
Expand Down
4 changes: 2 additions & 2 deletions dev-tools/omdb/tests/test_all_output.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ async fn test_omdb_success_cases(cptestctx: &ControlPlaneTestContext) {
let mgs_url = format!("http://{}/", gwtestctx.client.bind_address);
let ox_url = format!("http://{}/", cptestctx.oximeter.server_address());
let ox_test_producer = cptestctx.producer.address().ip();
let ch_url = format!("http://{}/", cptestctx.clickhouse.address);
let ch_url = format!("http://{}/", cptestctx.clickhouse.http_address());

let tmpdir = camino_tempfile::tempdir()
.expect("failed to create temporary directory");
Expand Down Expand Up @@ -308,7 +308,7 @@ async fn test_omdb_env_settings(cptestctx: &ControlPlaneTestContext) {
format!("http://{}", cptestctx.internal_client.bind_address);
let ox_url = format!("http://{}/", cptestctx.oximeter.server_address());
let ox_test_producer = cptestctx.producer.address().ip();
let ch_url = format!("http://{}/", cptestctx.clickhouse.address);
let ch_url = format!("http://{}/", cptestctx.clickhouse.http_address());
let dns_sockaddr = cptestctx.internal_dns.dns_server.local_address();
let mut output = String::new();

Expand Down
2 changes: 1 addition & 1 deletion nexus/benches/setup_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ async fn do_clickhouse_setup() {
let cfg = nexus_test_utils::load_test_config();
let logctx = LogContext::new("clickhouse_setup", &cfg.pkg.log);
let mut clickhouse =
dev::clickhouse::ClickHouseInstance::new_single_node(&logctx, 0)
dev::clickhouse::ClickHouseDeployment::new_single_node(&logctx, 0)
.await
.unwrap();
clickhouse.cleanup().await.unwrap();
Expand Down
21 changes: 11 additions & 10 deletions nexus/test-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ pub struct ControlPlaneTestContext<N> {
pub internal_client: ClientTestContext,
pub server: N,
pub database: dev::db::CockroachInstance,
pub clickhouse: dev::clickhouse::ClickHouseInstance,
pub clickhouse: dev::clickhouse::ClickHouseDeployment,
pub logctx: LogContext,
pub sled_agent_storage: camino_tempfile::Utf8TempDir,
pub sled_agent: sim::Server,
Expand Down Expand Up @@ -275,7 +275,7 @@ pub struct ControlPlaneTestContextBuilder<'a, N: NexusServer> {

pub server: Option<N>,
pub database: Option<dev::db::CockroachInstance>,
pub clickhouse: Option<dev::clickhouse::ClickHouseInstance>,
pub clickhouse: Option<dev::clickhouse::ClickHouseDeployment>,
pub sled_agent_storage: Option<camino_tempfile::Utf8TempDir>,
pub sled_agent: Option<sim::Server>,
pub sled_agent2_storage: Option<camino_tempfile::Utf8TempDir>,
Expand Down Expand Up @@ -447,13 +447,14 @@ impl<'a, N: NexusServer> ControlPlaneTestContextBuilder<'a, N> {
pub async fn start_clickhouse(&mut self) {
let log = &self.logctx.log;
debug!(log, "Starting Clickhouse");
let clickhouse = dev::clickhouse::ClickHouseInstance::new_single_node(
&self.logctx,
0,
)
.await
.unwrap();
let port = clickhouse.port();
let clickhouse =
dev::clickhouse::ClickHouseDeployment::new_single_node(
&self.logctx,
0,
)
.await
.unwrap();
let port = clickhouse.http_address().port();

let zpool_id = ZpoolUuid::new_v4();
let dataset_id = Uuid::new_v4();
Expand Down Expand Up @@ -594,7 +595,7 @@ impl<'a, N: NexusServer> ControlPlaneTestContextBuilder<'a, N> {
let oximeter = start_oximeter(
log.new(o!("component" => "oximeter")),
nexus_internal_addr,
clickhouse.port(),
clickhouse.http_address().port(),
collector_id,
)
.await
Expand Down
13 changes: 3 additions & 10 deletions nexus/tests/integration_tests/oximeter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ use nexus_test_interface::NexusServer;
use nexus_test_utils_macros::nexus_test;
use omicron_test_utils::dev::poll::{wait_for_condition, CondCheckError};
use oximeter_db::DbWrite;
use std::net;
use std::time::Duration;
use uuid::Uuid;

Expand Down Expand Up @@ -118,14 +117,8 @@ async fn test_oximeter_reregistration() {
row.get::<&str, chrono::DateTime<chrono::Utc>>("time_modified");

// ClickHouse client for verifying collection.
let ch_address = net::SocketAddrV6::new(
"::1".parse().unwrap(),
context.clickhouse.port(),
0,
0,
);
let client =
oximeter_db::Client::new(ch_address.into(), &context.logctx.log);
let ch_address = context.clickhouse.http_address().into();
let client = oximeter_db::Client::new(ch_address, &context.logctx.log);
client
.init_single_node_db()
.await
Expand Down Expand Up @@ -308,7 +301,7 @@ async fn test_oximeter_reregistration() {
context.oximeter = nexus_test_utils::start_oximeter(
context.logctx.log.new(o!("component" => "oximeter")),
context.server.get_http_server_internal_address().await,
context.clickhouse.port(),
context.clickhouse.http_address().port(),
oximeter_id,
)
.await
Expand Down
Loading

0 comments on commit a692ea5

Please sign in to comment.