Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: run relation_link_test concurrently #4964

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 15 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 0 additions & 47 deletions query-engine/connector-test-kit-rs/qe-setup/src/cockroachdb.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use once_cell::sync::OnceCell;
use quaint::{connector::PostgresFlavour, prelude::*, single::Quaint};
use schema_core::schema_connector::{ConnectorError, ConnectorResult};
use url::Url;
Expand All @@ -20,7 +19,6 @@ pub(crate) async fn cockroach_setup(url: String, prisma_schema: &str) -> Connect

conn.raw_cmd(&query).await.unwrap();

drop_db_when_thread_exits(parsed_url, db_name);
let mut connector = sql_schema_connector::SqlSchemaConnector::new_cockroach();
crate::diff_and_apply(prisma_schema, url, &mut connector).await
}
Expand All @@ -29,48 +27,3 @@ async fn create_admin_conn(url: &mut Url) -> ConnectorResult<Quaint> {
url.set_path("/postgres");
Ok(Quaint::new(url.as_ref()).await.unwrap())
}

fn drop_db_when_thread_exits(admin_url: Url, db_name: &str) {
use std::{cell::RefCell, sync::mpsc, thread};
use test_setup::runtime::run_with_thread_local_runtime as tok;

// === Dramatis Personæ ===

// DB_DROP_THREAD: A thread that drops databases.
static DB_DROP_THREAD: OnceCell<mpsc::SyncSender<String>> = OnceCell::new();

let sender = DB_DROP_THREAD.get_or_init(|| {
let (sender, receiver) = mpsc::sync_channel::<String>(4096);

thread::spawn(move || {
let mut admin_url = admin_url;
let conn = tok(create_admin_conn(&mut admin_url)).unwrap();

// Receive new databases to drop.
for msg in receiver.iter() {
tok(conn.raw_cmd(&msg)).unwrap();
}
});

sender
});

// NOTIFIER: a thread local that notifies DB_DROP_THREAD when dropped.
struct Notifier(String, mpsc::SyncSender<String>);

impl Drop for Notifier {
fn drop(&mut self) {
let message = std::mem::take(&mut self.0);

self.1.send(message).unwrap();
}
}

thread_local! {
static NOTIFIER: RefCell<Option<Notifier>> = const { RefCell::new(None) };
}

NOTIFIER.with(move |cell| {
*cell.borrow_mut() = Some(Notifier(format!("DROP DATABASE \"{db_name}\""), sender.clone()));
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ query-engine-metrics = { path = "../../metrics" }
quaint.workspace = true
jsonrpc-core = "17"
insta = "1.7.1"
futures-util = "0.3.30"

# Only this version is vetted, upgrade only after going through the code,
# as this is a small crate with little user base.
Expand Down
45 changes: 24 additions & 21 deletions query-engine/connector-test-kit-rs/query-tests-setup/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ pub use config::*;
pub use connector_tag::*;
pub use datamodel_rendering::*;
pub use error::*;
use futures_util::StreamExt;
pub use logging::*;
pub use query_core;
pub use query_result::*;
Expand Down Expand Up @@ -144,6 +145,8 @@ fn run_relation_link_test_impl(

let (dms, capabilities) = schema_with_relation(on_parent, on_child, id_only);

let mut futs = Vec::new();

insta::allow_duplicates! {
for (i, (dm, caps)) in dms.into_iter().zip(capabilities.into_iter()).enumerate() {
if RELATION_TEST_IDX.map(|idx| idx != i).unwrap_or(false) {
Expand All @@ -165,28 +168,28 @@ fn run_relation_link_test_impl(
let metrics_for_subscriber = metrics.clone();
let (log_capture, log_tx) = TestLogCapture::new();

run_with_tokio(
async move {
println!("Used datamodel:\n {}", datamodel.yellow());
let override_local_max_bind_values = None;
let runner = Runner::load(datamodel.clone(), &[], version, connector_tag, override_local_max_bind_values, metrics, log_capture)
.await
.unwrap();


test_fn(&runner, &dm).with_subscriber(test_tracing_subscriber(
ENV_LOG_LEVEL.to_string(),
metrics_for_subscriber,
log_tx,
))
.await.unwrap();

teardown_project(&datamodel, Default::default(), runner.schema_id())
.await
.unwrap();
}
);
futs.push(async move {
let override_local_max_bind_values = None;
let runner = Runner::load(datamodel.clone(), &[], version, connector_tag, override_local_max_bind_values, metrics, log_capture)
.await
.unwrap();

test_fn(&runner, &dm).with_subscriber(test_tracing_subscriber(
ENV_LOG_LEVEL.to_string(),
metrics_for_subscriber,
log_tx,
))
.await.unwrap();

teardown_project(&datamodel, Default::default(), runner.schema_id())
.await
.unwrap();
});
}

run_with_tokio(async move {
futures_util::stream::iter(futs).buffer_unordered(30).collect::<Vec<_>>().await;
});
}
}

Expand Down
Loading