Skip to content

Commit

Permalink
Talk to ClickHouse over the native protocol (#6584)
Browse files Browse the repository at this point in the history
Adds a module in `oximeter_db` that speaks the native ClickHouse
protocol, a message-oriented protocol directly over TCP. This includes
the core messaging and serde needed for making a handshake with the
server, and then running most standard SQL queries.

This uses the codecs from `tokio_util` to turn the TCP stream into a
sink / stream of messges, which is much more convenient to operate on.
The client and server exchange a handshake, after which the client may
run SQL queries to select data.

There are a number of tests in the module, including low-level
serialization checks and asserting the results of actual SQL queries. In
addition, this adds a barebones SQL shell -- this is not intended to
replace the feature-rich official CLI, but is very useful for testing
the protocol implementation by running arbitrary queries.

This module is not consumed anywhere beyond the shell itself. A
follow-up commit will integrate it into the existing
`oximeter_db::Client` object, with the primary goal of making OxQL
queries much more flexible and efficient.
  • Loading branch information
bnaecker authored Sep 20, 2024
1 parent b99552c commit ade19ee
Show file tree
Hide file tree
Showing 25 changed files with 3,933 additions and 2 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ nexus-test-interface = { path = "nexus/test-interface" }
nexus-test-utils-macros = { path = "nexus/test-utils-macros" }
nexus-test-utils = { path = "nexus/test-utils" }
nexus-types = { path = "nexus/types" }
nom = "7.1.3"
num-integer = "0.1.46"
num = { version = "0.4.3", default-features = false, features = [ "libm" ] }
omicron-clickhouse-admin = { path = "clickhouse-admin" }
Expand Down
24 changes: 23 additions & 1 deletion oximeter/db/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ clap.workspace = true
clickward.workspace = true
dropshot.workspace = true
futures.workspace = true
gethostname.workspace = true
highway.workspace = true
libc.workspace = true
num.workspace = true
omicron-common.workspace = true
omicron-workspace-hack.workspace = true
Expand All @@ -33,6 +35,7 @@ slog-async.workspace = true
slog-dtrace.workspace = true
slog-term.workspace = true
thiserror.workspace = true
tokio-util.workspace = true
usdt.workspace = true
uuid.workspace = true

Expand All @@ -44,6 +47,10 @@ features = [ "serde" ]
workspace = true
optional = true

[dependencies.display-error-chain]
workspace = true
optional = true

[dependencies.indexmap]
workspace = true
optional = true
Expand Down Expand Up @@ -86,6 +93,7 @@ features = [ "rt-multi-thread", "macros" ]

[dev-dependencies]
camino-tempfile.workspace = true
criterion = { workspace = true, features = [ "async_tokio" ] }
expectorate.workspace = true
indexmap.workspace = true
itertools.workspace = true
Expand All @@ -98,7 +106,7 @@ strum.workspace = true
tempfile.workspace = true

[features]
default = [ "oxql", "sql" ]
default = [ "native-sql", "oxql", "sql" ]
sql = [
"dep:indexmap",
"dep:reedline",
Expand All @@ -113,7 +121,21 @@ oxql = [
"dep:reedline",
"dep:tabled",
]
native-sql = [
"dep:crossterm",
"dep:display-error-chain",
"dep:indexmap",
"dep:reedline",
"dep:rustyline",
"dep:sqlformat",
"dep:sqlparser",
"dep:tabled"
]

[[bin]]
name = "oxdb"
doc = false

[[bench]]
name = "protocol"
harness = false
116 changes: 116 additions & 0 deletions oximeter/db/benches/protocol.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at https://mozilla.org/MPL/2.0/.

//! Benchmark comparing the native protocol to JSON-over-HTTP.
// Copyright 2024 Oxide Computer Company

use criterion::BenchmarkId;
use criterion::Criterion;
use criterion::{criterion_group, criterion_main};
use omicron_common::address::CLICKHOUSE_TCP_PORT;
use oximeter_db::native::Connection;
use std::net::Ipv6Addr;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::runtime::Runtime;
use tokio::sync::Mutex;

/// List of queries to run.
///
/// These are static so we're not benchmarking the creation of the query string.
const QUERIES: &[&str] = &[
"SELECT number FROM system.numbers LIMIT 100",
"SELECT number FROM system.numbers LIMIT 1000",
"SELECT number FROM system.numbers LIMIT 10000",
"SELECT number FROM system.numbers LIMIT 100000",
"SELECT number FROM system.numbers LIMIT 1000000",
];

/// Run the provided query on the connection.
///
/// We need to be passed the runtime here to keep it alive, and avoid errors
/// complaining that the runtime is shutting down.
async fn native_impl(
_rt: &Runtime,
conn: &Arc<Mutex<Connection>>,
query: &str,
) {
conn.lock().await.query(query).await.expect("Expected to run query");
}

/// Setup the native query benchmark.
fn native(c: &mut Criterion) {
let mut group = c.benchmark_group("native");

// Create a client.
//
// It's unfortunate that we need to wrap this in an Arc+Mutex, but it's
// required since the query method takes `&mut self`. Otherwise we'd need to
// create the client on each iteration, and thus be timing the handshake
// too. This way, we're just adding the overhead of the lock, which should
// be pretty small since it's uncontended.
let addr = SocketAddr::new(Ipv6Addr::LOCALHOST.into(), CLICKHOUSE_TCP_PORT);
let rt = Runtime::new().unwrap();
let conn = Arc::new(Mutex::new(
rt.block_on(async { Connection::new(addr).await.unwrap() }),
));
for query in QUERIES {
group.bench_with_input(
BenchmarkId::from_parameter(*query),
&(&rt, &conn, query),
|b, (rt, conn, query)| {
b.to_async(*rt).iter(|| native_impl(rt, conn, query))
},
);
}
group.finish();
}

/// Run the provided query using the HTTP interface.
async fn json_over_http_impl(client: &reqwest::Client, sql: &str) {
let _: Vec<_> = client
.post("http://[::1]:8123")
.query(&[
("output_format_json_quote_64bit_integers", "0"),
("wait_end_of_query", "1"),
])
.body(format!("{sql} FORMAT JSONEachRow"))
.send()
.await
.expect("Expected to send query")
.text()
.await
.expect("Expected query to return text")
.lines()
.map(|line| {
serde_json::from_str::<serde_json::Value>(line)
.expect("Expected JSON lines")
})
.collect();
}

/// Setup the JSON-over-HTTP benchmark.
fn json_over_http(c: &mut Criterion) {
let mut group = c.benchmark_group("json-over-http");
let client = reqwest::Client::new();
for query in QUERIES {
group.bench_with_input(
BenchmarkId::from_parameter(*query),
&(&client, query),
|b, &(client, query)| {
b.to_async(Runtime::new().unwrap())
.iter(|| json_over_http_impl(client, query))
},
);
}
group.finish();
}

criterion_group!(
name = benches;
config = Criterion::default().sample_size(100).noise_threshold(0.05);
targets = json_over_http, native
);
criterion_main!(benches);
6 changes: 6 additions & 0 deletions oximeter/db/src/bin/oxdb/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,10 @@ enum Subcommand {
#[clap(flatten)]
opts: oximeter_db::shells::oxql::ShellOptions,
},

/// Start a native SQL shell to a ClickHouse server.
#[cfg(feature = "native-sql")]
NativeSql,
}

fn describe_data() {
Expand Down Expand Up @@ -356,6 +360,8 @@ async fn main() -> anyhow::Result<()> {
oximeter_db::shells::oxql::shell(args.address, args.port, log, opts)
.await?
}
#[cfg(feature = "native-sql")]
Subcommand::NativeSql => oximeter_db::shells::native::shell().await?,
}
Ok(())
}
4 changes: 3 additions & 1 deletion oximeter/db/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ use thiserror::Error;

mod client;
pub mod model;
#[cfg(feature = "native-sql")]
pub mod native;
#[cfg(any(feature = "oxql", test))]
pub mod oxql;
pub mod query;
#[cfg(any(feature = "oxql", feature = "sql", test))]
#[cfg(any(feature = "oxql", feature = "sql", feature = "native-sql", test))]
pub mod shells;
#[cfg(any(feature = "sql", test))]
pub mod sql;
Expand Down
Loading

0 comments on commit ade19ee

Please sign in to comment.