From aa4eb82cb87f3d6abfdd6dc397a0483502f14d84 Mon Sep 17 00:00:00 2001 From: Flavian Desverne Date: Mon, 22 Jul 2024 18:12:08 +0200 Subject: [PATCH 1/2] chore: run relation_link_test concurrently --- Cargo.lock | 29 ++++++------ .../qe-setup/src/cockroachdb.rs | 47 ------------------- .../query-tests-setup/Cargo.toml | 1 + .../query-tests-setup/src/lib.rs | 45 +++++++++--------- 4 files changed, 40 insertions(+), 82 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fbe4a8ffb542..50e2f3e3f8e0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1468,9 +1468,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", "futures-sink", @@ -1478,9 +1478,9 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" +checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" [[package]] name = "futures-executor" @@ -1495,15 +1495,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" [[package]] name = "futures-macro" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", @@ -1512,15 +1512,15 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" [[package]] name = "futures-task" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" +checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" [[package]] name = "futures-timer" @@ -1530,9 +1530,9 @@ checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" [[package]] name = "futures-util" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" +checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ "futures-channel", "futures-core", @@ -3974,6 +3974,7 @@ dependencies = [ "async-trait", "colored", "enumflags2", + "futures-util", "hyper", "indexmap 2.2.2", "indoc 2.0.3", diff --git a/query-engine/connector-test-kit-rs/qe-setup/src/cockroachdb.rs b/query-engine/connector-test-kit-rs/qe-setup/src/cockroachdb.rs index 0d876d6b4dcf..aade65d4806b 100644 --- a/query-engine/connector-test-kit-rs/qe-setup/src/cockroachdb.rs +++ b/query-engine/connector-test-kit-rs/qe-setup/src/cockroachdb.rs @@ -1,4 +1,3 @@ -use once_cell::sync::OnceCell; use quaint::{connector::PostgresFlavour, prelude::*, single::Quaint}; use schema_core::schema_connector::{ConnectorError, ConnectorResult}; use url::Url; @@ -20,7 +19,6 @@ pub(crate) async fn cockroach_setup(url: String, prisma_schema: &str) -> Connect conn.raw_cmd(&query).await.unwrap(); - drop_db_when_thread_exits(parsed_url, db_name); let mut connector = sql_schema_connector::SqlSchemaConnector::new_cockroach(); crate::diff_and_apply(prisma_schema, url, &mut connector).await } @@ -29,48 +27,3 @@ async fn create_admin_conn(url: &mut Url) -> ConnectorResult { url.set_path("/postgres"); Ok(Quaint::new(url.as_ref()).await.unwrap()) } - -fn drop_db_when_thread_exits(admin_url: Url, db_name: &str) { - use std::{cell::RefCell, sync::mpsc, thread}; - use test_setup::runtime::run_with_thread_local_runtime as tok; - - // === Dramatis Personæ === - - // DB_DROP_THREAD: A thread that drops databases. - static DB_DROP_THREAD: OnceCell> = OnceCell::new(); - - let sender = DB_DROP_THREAD.get_or_init(|| { - let (sender, receiver) = mpsc::sync_channel::(4096); - - thread::spawn(move || { - let mut admin_url = admin_url; - let conn = tok(create_admin_conn(&mut admin_url)).unwrap(); - - // Receive new databases to drop. - for msg in receiver.iter() { - tok(conn.raw_cmd(&msg)).unwrap(); - } - }); - - sender - }); - - // NOTIFIER: a thread local that notifies DB_DROP_THREAD when dropped. - struct Notifier(String, mpsc::SyncSender); - - impl Drop for Notifier { - fn drop(&mut self) { - let message = std::mem::take(&mut self.0); - - self.1.send(message).unwrap(); - } - } - - thread_local! { - static NOTIFIER: RefCell> = const { RefCell::new(None) }; - } - - NOTIFIER.with(move |cell| { - *cell.borrow_mut() = Some(Notifier(format!("DROP DATABASE \"{db_name}\""), sender.clone())); - }); -} diff --git a/query-engine/connector-test-kit-rs/query-tests-setup/Cargo.toml b/query-engine/connector-test-kit-rs/query-tests-setup/Cargo.toml index cd8abc07331c..06e4af2fd519 100644 --- a/query-engine/connector-test-kit-rs/query-tests-setup/Cargo.toml +++ b/query-engine/connector-test-kit-rs/query-tests-setup/Cargo.toml @@ -34,6 +34,7 @@ query-engine-metrics = { path = "../../metrics" } quaint.workspace = true jsonrpc-core = "17" insta = "1.7.1" +futures-util = "0.3.30" # Only this version is vetted, upgrade only after going through the code, # as this is a small crate with little user base. diff --git a/query-engine/connector-test-kit-rs/query-tests-setup/src/lib.rs b/query-engine/connector-test-kit-rs/query-tests-setup/src/lib.rs index e94c14c6c574..110e6e2cebf2 100644 --- a/query-engine/connector-test-kit-rs/query-tests-setup/src/lib.rs +++ b/query-engine/connector-test-kit-rs/query-tests-setup/src/lib.rs @@ -144,6 +144,8 @@ fn run_relation_link_test_impl( let (dms, capabilities) = schema_with_relation(on_parent, on_child, id_only); + let mut futs = Vec::new(); + insta::allow_duplicates! { for (i, (dm, caps)) in dms.into_iter().zip(capabilities.into_iter()).enumerate() { if RELATION_TEST_IDX.map(|idx| idx != i).unwrap_or(false) { @@ -165,28 +167,29 @@ fn run_relation_link_test_impl( let metrics_for_subscriber = metrics.clone(); let (log_capture, log_tx) = TestLogCapture::new(); - run_with_tokio( - async move { - println!("Used datamodel:\n {}", datamodel.yellow()); - let override_local_max_bind_values = None; - let runner = Runner::load(datamodel.clone(), &[], version, connector_tag, override_local_max_bind_values, metrics, log_capture) - .await - .unwrap(); - - - test_fn(&runner, &dm).with_subscriber(test_tracing_subscriber( - ENV_LOG_LEVEL.to_string(), - metrics_for_subscriber, - log_tx, - )) - .await.unwrap(); - - teardown_project(&datamodel, Default::default(), runner.schema_id()) - .await - .unwrap(); - } - ); + futs.push(async move { + println!("Used datamodel:\n {}", datamodel.yellow()); + let override_local_max_bind_values = None; + let runner = Runner::load(datamodel.clone(), &[], version, connector_tag, override_local_max_bind_values, metrics, log_capture) + .await + .unwrap(); + + test_fn(&runner, &dm).with_subscriber(test_tracing_subscriber( + ENV_LOG_LEVEL.to_string(), + metrics_for_subscriber, + log_tx, + )) + .await.unwrap(); + + teardown_project(&datamodel, Default::default(), runner.schema_id()) + .await + .unwrap(); + }); } + + run_with_tokio(async move { + futures_util::future::join_all(futs).await + }); } } From e6d572f234eba43b32e558650bc92280d007886a Mon Sep 17 00:00:00 2001 From: Flavian Desverne Date: Wed, 24 Jul 2024 15:46:40 +0200 Subject: [PATCH 2/2] limit concurrency --- .../connector-test-kit-rs/query-tests-setup/src/lib.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/query-engine/connector-test-kit-rs/query-tests-setup/src/lib.rs b/query-engine/connector-test-kit-rs/query-tests-setup/src/lib.rs index 110e6e2cebf2..5e23aca30e37 100644 --- a/query-engine/connector-test-kit-rs/query-tests-setup/src/lib.rs +++ b/query-engine/connector-test-kit-rs/query-tests-setup/src/lib.rs @@ -12,6 +12,7 @@ pub use config::*; pub use connector_tag::*; pub use datamodel_rendering::*; pub use error::*; +use futures_util::StreamExt; pub use logging::*; pub use query_core; pub use query_result::*; @@ -168,7 +169,6 @@ fn run_relation_link_test_impl( let (log_capture, log_tx) = TestLogCapture::new(); futs.push(async move { - println!("Used datamodel:\n {}", datamodel.yellow()); let override_local_max_bind_values = None; let runner = Runner::load(datamodel.clone(), &[], version, connector_tag, override_local_max_bind_values, metrics, log_capture) .await @@ -188,7 +188,7 @@ fn run_relation_link_test_impl( } run_with_tokio(async move { - futures_util::future::join_all(futs).await + futures_util::stream::iter(futs).buffer_unordered(30).collect::>().await; }); } }