From 899a744c7ace2ca3e0b5c32fd34e236ae0217fd4 Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Sun, 22 Sep 2024 11:15:00 -0700 Subject: [PATCH 01/12] Parametrize e2e IPA test with number of inputs --- ipa-core/tests/common/mod.rs | 7 +++---- ipa-core/tests/helper_networks.rs | 6 +++--- ipa-core/tests/ipa_with_relaxed_dp.rs | 6 +++--- 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/ipa-core/tests/common/mod.rs b/ipa-core/tests/common/mod.rs index ca1d5e08a..b21af5cfc 100644 --- a/ipa-core/tests/common/mod.rs +++ b/ipa-core/tests/common/mod.rs @@ -217,8 +217,8 @@ pub fn test_network(https: bool) { T::execute(path, https); } -pub fn test_ipa(mode: IpaSecurityModel, https: bool, encrypted_inputs: bool) { - test_ipa_with_config( +pub fn test_ipa(mode: IpaSecurityModel, https: bool, encrypted_inputs: bool) { + test_ipa_with_config::( mode, https, IpaQueryConfig { @@ -228,7 +228,7 @@ pub fn test_ipa(mode: IpaSecurityModel, https: bool, encrypted_inputs: bool) { ); } -pub fn test_ipa_with_config( +pub fn test_ipa_with_config( mode: IpaSecurityModel, https: bool, config: IpaQueryConfig, @@ -238,7 +238,6 @@ pub fn test_ipa_with_config( panic!("encrypted_input requires https") }; - const INPUT_SIZE: usize = 100; // set to true to always keep the temp dir after test finishes let dir = TempDir::new_delete_on_drop(); let path = dir.path(); diff --git a/ipa-core/tests/helper_networks.rs b/ipa-core/tests/helper_networks.rs index 7775ffba4..caa10335e 100644 --- a/ipa-core/tests/helper_networks.rs +++ b/ipa-core/tests/helper_networks.rs @@ -45,20 +45,20 @@ fn http_network_large_input() { #[test] #[cfg(all(test, web_test))] fn http_semi_honest_ipa() { - test_ipa(IpaSecurityModel::SemiHonest, false, false); + test_ipa::<100>(IpaSecurityModel::SemiHonest, false, false); } #[test] #[cfg(all(test, web_test))] fn https_semi_honest_ipa() { - test_ipa(IpaSecurityModel::SemiHonest, true, true); + test_ipa::<100>(IpaSecurityModel::SemiHonest, true, true); } #[test] #[cfg(all(test, web_test))] #[ignore] fn https_malicious_ipa() { - test_ipa(IpaSecurityModel::Malicious, true, true); + test_ipa::<100>(IpaSecurityModel::Malicious, true, true); } /// Similar to [`network`] tests, but it uses keygen + confgen CLIs to generate helper client config diff --git a/ipa-core/tests/ipa_with_relaxed_dp.rs b/ipa-core/tests/ipa_with_relaxed_dp.rs index 84c4c2a7b..d5bbe98b6 100644 --- a/ipa-core/tests/ipa_with_relaxed_dp.rs +++ b/ipa-core/tests/ipa_with_relaxed_dp.rs @@ -20,7 +20,7 @@ fn relaxed_dp_semi_honest() { let encrypted_input = false; let config = build_config(); - test_ipa_with_config( + test_ipa_with_config::<100>( IpaSecurityModel::SemiHonest, encrypted_input, config, @@ -33,7 +33,7 @@ fn relaxed_dp_malicious() { let encrypted_input = false; let config = build_config(); - test_ipa_with_config( + test_ipa_with_config::<100>( IpaSecurityModel::Malicious, encrypted_input, config, @@ -44,5 +44,5 @@ fn relaxed_dp_malicious() { #[test] #[cfg(all(test, web_test))] fn relaxed_dp_https_malicious_ipa() { - test_ipa(IpaSecurityModel::Malicious, true, true); + test_ipa::<100>(IpaSecurityModel::Malicious, true, true); } From c871ac768e04f52875159ea229a0ae02321fa141 Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Sun, 22 Sep 2024 11:24:19 -0700 Subject: [PATCH 02/12] Add a test that reproduces #1300 --- ipa-core/tests/ipa_with_relaxed_dp.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/ipa-core/tests/ipa_with_relaxed_dp.rs b/ipa-core/tests/ipa_with_relaxed_dp.rs index d5bbe98b6..f4d15e5e1 100644 --- a/ipa-core/tests/ipa_with_relaxed_dp.rs +++ b/ipa-core/tests/ipa_with_relaxed_dp.rs @@ -46,3 +46,9 @@ fn relaxed_dp_malicious() { fn relaxed_dp_https_malicious_ipa() { test_ipa::<100>(IpaSecurityModel::Malicious, true, true); } + +#[test] +#[cfg(all(test, web_test))] +fn relaxed_dp_https_malicious_ipa_10_rows() { + test_ipa::<10>(IpaSecurityModel::Malicious, true, true); +} From 1844723ae92ba1833f58bbb6c610ba093f4ea3de Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Sun, 22 Sep 2024 14:11:43 -0700 Subject: [PATCH 03/12] Fix send buffer alignment issue The issue is described in #1300 --- ipa-core/src/helpers/gateway/send.rs | 146 ++++++++++++++++++++++----- ipa-core/tests/common/mod.rs | 6 +- 2 files changed, 128 insertions(+), 24 deletions(-) diff --git a/ipa-core/src/helpers/gateway/send.rs b/ipa-core/src/helpers/gateway/send.rs index fc73caf5d..b3b28dcdb 100644 --- a/ipa-core/src/helpers/gateway/send.rs +++ b/ipa-core/src/helpers/gateway/send.rs @@ -1,5 +1,6 @@ use std::{ borrow::Borrow, + cmp::min, fmt::Debug, marker::PhantomData, num::NonZeroUsize, @@ -203,10 +204,7 @@ impl GatewaySenders { match self.inner.entry(channel_id.clone()) { Entry::Occupied(entry) => Arc::clone(entry.get()), Entry::Vacant(entry) => { - let sender = Self::new_sender( - &SendChannelConfig::new::(config, total_records), - channel_id.clone(), - ); + let sender = Self::new_sender(&SendChannelConfig::new::(config, total_records), channel_id.clone()); entry.insert(Arc::clone(&sender)); tokio::spawn({ @@ -249,28 +247,71 @@ impl Stream for GatewaySendStream { impl SendChannelConfig { fn new(gateway_config: GatewayConfig, total_records: TotalRecords) -> Self { - debug_assert!(M::Size::USIZE > 0, "Message size cannot be 0"); + Self::new_with(gateway_config, total_records, M::Size::USIZE) + } - let record_size = M::Size::USIZE; - let total_capacity = gateway_config.active.get() * record_size; - Self { - total_capacity: total_capacity.try_into().unwrap(), - record_size: record_size.try_into().unwrap(), - read_size: if total_records.is_indeterminate() - || gateway_config.read_size.get() <= record_size - { + fn new_with( + gateway_config: GatewayConfig, + total_records: TotalRecords, + record_size: usize, + ) -> Self { + debug_assert!(record_size > 0, "Message size cannot be 0"); + // The absolute minimum of capacity we reserve for this channel. We can't go + // below that number, otherwise a deadlock is almost guaranteed. + let min_capacity = gateway_config.active.get() * record_size; + + // first, compute the read size. It must be a multiple of `record_size` to prevent + // misaligned reads and deadlocks. For indeterminate channels, read size must be + // set to the size of one record, to trigger buffer flush on every write + let read_size = + if total_records.is_indeterminate() || gateway_config.read_size.get() <= record_size { record_size } else { - std::cmp::min( - total_capacity, - // closest multiple of record_size to read_size + // closest multiple of record_size to read_size + let proposed_read_size = min( gateway_config.read_size.get() / record_size * record_size, - ) - } - .try_into() - .unwrap(), + min_capacity, + ); + // if min capacity is not a multiple of read size. + // we must adjust read size. Adjusting total capacity is not possible due to + // possible deadlocks - it must be strictly aligned with active work. + // read size goes in `record_size` increments to keep it aligned. + // rem is aligned with both capacity and read_size, so subtracting + // it will keep read_size and capacity aligned + // Here is an example how it may work: + // lets say the active work is set to 10, record size is 512 bytes + // and read size in gateway config is set to 2048 bytes (default value). + // the math above will compute total_capacity to 5120 bytes and + // proposed_read_size to 2048 because it is aligned with 512 record size. + // Now, if we don't adjust then we have an issue as 5120 % 2048 = 1024 != 0. + // Keeping read size like this will cause a deadlock, so we adjust it to + // 1024. + proposed_read_size - min_capacity % proposed_read_size + }; + + // total capacity must be a multiple of both read size and record size. + // Record size is easy to justify: misalignment here leads to either waste of memory + // or deadlock on the last write. Aligning read size and total capacity + // has the same reasoning behind it: reading less than total capacity + // can leave the last chunk half-written and backpressure from active work + // preventing the protocols to make further progress. + let total_capacity = min_capacity / read_size * read_size; + + let this = Self { + total_capacity: total_capacity.try_into().unwrap(), + record_size: record_size.try_into().unwrap(), + read_size: read_size.try_into().unwrap(), total_records, - } + }; + + // make sure we've set these values correctly. + debug_assert_eq!(0, this.total_capacity.get() % this.read_size.get()); + debug_assert_eq!(0, this.total_capacity.get() % this.record_size.get()); + debug_assert!(this.total_capacity.get() >= this.read_size.get()); + debug_assert!(this.total_capacity.get() >= this.record_size.get()); + debug_assert!(this.read_size.get() >= this.record_size.get()); + + this } } @@ -278,6 +319,7 @@ impl SendChannelConfig { mod test { use std::num::NonZeroUsize; + use proptest::proptest; use typenum::Unsigned; use crate::{ @@ -286,7 +328,7 @@ mod test { Serializable, }, helpers::{gateway::send::SendChannelConfig, GatewayConfig, TotalRecords}, - secret_sharing::SharedValue, + secret_sharing::{Sendable, StdArray}, }; impl Default for SendChannelConfig { @@ -301,7 +343,7 @@ mod test { } #[allow(clippy::needless_update)] // to allow progress_check_interval to be conditionally compiled - fn send_config( + fn send_config( total_records: TotalRecords, ) -> SendChannelConfig { let gateway_config = GatewayConfig { @@ -391,4 +433,62 @@ mod test { .get() ); } + + /// This test reproduces ipa/#1300. PRF evaluation sent 32*16 = 512 (record_size * vectorization) + /// chunks through a channel with total capacity 5120 (active work = 10 records) and read size + /// of 2048 bytes. + /// The problem was that read size of 2048 does not divide 5120, so the last chunk was not sent. + #[test] + fn total_capacity_is_a_multiple_of_read_size() { + let config = + send_config::, 10, 2048>(TotalRecords::specified(43).unwrap()); + + assert_eq!(0, config.total_capacity.get() % config.read_size.get()); + assert_eq!(config.total_capacity.get(), 10 * config.record_size.get()); + } + + fn ensure_config( + total_records: Option, + active: usize, + read_size: usize, + record_size: usize, + ) { + let gateway_config = GatewayConfig { + active: active.try_into().unwrap(), + read_size: read_size.try_into().unwrap(), + ..Default::default() + }; + let config = SendChannelConfig::new_with( + gateway_config, + total_records + .map(|v| TotalRecords::specified(v).unwrap()) + .unwrap_or(TotalRecords::Indeterminate), + record_size, + ); + + // total capacity checks + assert!(config.total_capacity.get() > 0); + assert!(config.total_capacity.get() >= record_size); + assert!(config.total_capacity.get() <= record_size * active); + assert!(config.total_capacity.get() >= config.read_size.get()); + assert_eq!(0, config.total_capacity.get() % config.record_size.get()); + + // read size checks + assert!(config.read_size.get() > 0); + assert!(config.read_size.get() >= config.record_size.get()); + assert_eq!(0, config.total_capacity.get() % config.read_size.get()); + assert_eq!(0, config.read_size.get() % config.record_size.get()); + } + + proptest! { + #[test] + fn config_prop( + total_records in proptest::option::of(1_usize..1 << 32), + active in 1_usize..100_000, + read_size in 1_usize..32768, + record_size in 1_usize..4096, + ) { + ensure_config(total_records, active, read_size, record_size); + } + } } diff --git a/ipa-core/tests/common/mod.rs b/ipa-core/tests/common/mod.rs index b21af5cfc..6fa3139ac 100644 --- a/ipa-core/tests/common/mod.rs +++ b/ipa-core/tests/common/mod.rs @@ -217,7 +217,11 @@ pub fn test_network(https: bool) { T::execute(path, https); } -pub fn test_ipa(mode: IpaSecurityModel, https: bool, encrypted_inputs: bool) { +pub fn test_ipa( + mode: IpaSecurityModel, + https: bool, + encrypted_inputs: bool, +) { test_ipa_with_config::( mode, https, From bc5e662a2ae1246a4affe52c352267bd6960276d Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Sun, 22 Sep 2024 18:52:20 -0700 Subject: [PATCH 04/12] Use active_work instead of chunk size for attribution --- ipa-core/src/protocol/context/batcher.rs | 4 +++- ipa-core/src/protocol/context/dzkp_validator.rs | 7 ++++--- ipa-core/src/protocol/ipa_prf/prf_sharding/mod.rs | 3 ++- 3 files changed, 9 insertions(+), 5 deletions(-) diff --git a/ipa-core/src/protocol/context/batcher.rs b/ipa-core/src/protocol/context/batcher.rs index f7021d988..841a4f733 100644 --- a/ipa-core/src/protocol/context/batcher.rs +++ b/ipa-core/src/protocol/context/batcher.rs @@ -99,6 +99,7 @@ impl<'a, B> Batcher<'a, B> { fn batch_offset(&self, record_id: RecordId) -> usize { let batch_index = usize::from(record_id) / self.records_per_batch; + tracing::warn!("for {record_id}, batch is {batch_index} because {} and {}", self.records_per_batch, self.total_records); batch_index .checked_sub(self.first_batch) .expect_not_yet_validated(batch_index) @@ -160,13 +161,14 @@ impl<'a, B> Batcher<'a, B> { ); batch.pending_records.set(record_offset_in_batch, true); batch.pending_count += 1; + tracing::warn!("batcher evaluates {batch_index} batch {record_id}, {remaining_records} for validation readiness: {}/{total_count}", batch.pending_count); if batch.pending_count == total_count { assert!( batch.pending_records[0..total_count].all(), "Expected batch of {total_count} records to be ready for validation, but only have {:?}.", &batch.pending_records[0..total_count], ); - tracing::info!("batch {batch_index} is ready for validation"); + tracing::info!("is_ready_for_validation: batch {batch_index} is ready for validation"); let batch; if batch_offset == 0 { batch = self.batches.pop_front().unwrap(); diff --git a/ipa-core/src/protocol/context/dzkp_validator.rs b/ipa-core/src/protocol/context/dzkp_validator.rs index 517d4db46..cce86b886 100644 --- a/ipa-core/src/protocol/context/dzkp_validator.rs +++ b/ipa-core/src/protocol/context/dzkp_validator.rs @@ -551,9 +551,9 @@ impl Batch { .generate_challenges(ctx.narrow(&Step::Challenge)) .await; + let m = self.get_number_of_multiplications(); let (sum_of_uv, p_r_right_prover, q_r_left_prover) = { // get number of multiplications - let m = self.get_number_of_multiplications(); tracing::info!("validating {m} multiplications"); debug_assert_eq!( m, @@ -579,7 +579,7 @@ impl Batch { }; // verify BatchToVerify, return result - chunk_batch + let r = chunk_batch .verify( ctx.narrow(&Step::VerifyProof), sum_of_uv, @@ -588,7 +588,8 @@ impl Batch { &challenges_for_left_prover, &challenges_for_right_prover, ) - .await + .await; + r } } diff --git a/ipa-core/src/protocol/ipa_prf/prf_sharding/mod.rs b/ipa-core/src/protocol/ipa_prf/prf_sharding/mod.rs index a745d5ea7..b6c44a511 100644 --- a/ipa-core/src/protocol/ipa_prf/prf_sharding/mod.rs +++ b/ipa-core/src/protocol/ipa_prf/prf_sharding/mod.rs @@ -53,6 +53,7 @@ use crate::{ BitDecomposed, FieldSimd, SharedValue, TransposeFrom, }, }; +use crate::seq_join::SeqJoin; pub mod feature_label_dot_product; pub(crate) mod step; @@ -510,7 +511,7 @@ where protocol: &Step::Attribute, validate: &Step::AttributeValidate, }, - chunk_size, + sh_ctx.active_work().get(), ); dzkp_validator.set_total_records(TotalRecords::specified(histogram[1]).unwrap()); let ctx_for_row_number = set_up_contexts(&dzkp_validator.context(), histogram)?; From 8811a5ae943d30e3040b2c6784db1b54c4816f9b Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Sun, 22 Sep 2024 18:55:14 -0700 Subject: [PATCH 05/12] Delete the noise --- ipa-core/src/protocol/context/batcher.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/ipa-core/src/protocol/context/batcher.rs b/ipa-core/src/protocol/context/batcher.rs index 841a4f733..7aabfa1ad 100644 --- a/ipa-core/src/protocol/context/batcher.rs +++ b/ipa-core/src/protocol/context/batcher.rs @@ -99,7 +99,6 @@ impl<'a, B> Batcher<'a, B> { fn batch_offset(&self, record_id: RecordId) -> usize { let batch_index = usize::from(record_id) / self.records_per_batch; - tracing::warn!("for {record_id}, batch is {batch_index} because {} and {}", self.records_per_batch, self.total_records); batch_index .checked_sub(self.first_batch) .expect_not_yet_validated(batch_index) @@ -161,7 +160,6 @@ impl<'a, B> Batcher<'a, B> { ); batch.pending_records.set(record_offset_in_batch, true); batch.pending_count += 1; - tracing::warn!("batcher evaluates {batch_index} batch {record_id}, {remaining_records} for validation readiness: {}/{total_count}", batch.pending_count); if batch.pending_count == total_count { assert!( batch.pending_records[0..total_count].all(), From 5da257edb622792d8f9ff26da24b07ce0149b842 Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Mon, 23 Sep 2024 10:09:08 -0700 Subject: [PATCH 06/12] Set batch size to min(active_work, expected_batch_size) --- ipa-core/src/helpers/gateway/send.rs | 5 ++--- ipa-core/src/protocol/context/dzkp_validator.rs | 5 ++--- ipa-core/src/protocol/ipa_prf/prf_sharding/mod.rs | 3 +-- 3 files changed, 5 insertions(+), 8 deletions(-) diff --git a/ipa-core/src/helpers/gateway/send.rs b/ipa-core/src/helpers/gateway/send.rs index b3b28dcdb..f6665669e 100644 --- a/ipa-core/src/helpers/gateway/send.rs +++ b/ipa-core/src/helpers/gateway/send.rs @@ -434,7 +434,7 @@ mod test { ); } - /// This test reproduces ipa/#1300. PRF evaluation sent 32*16 = 512 (record_size * vectorization) + /// This test reproduces ipa/#1300. PRF evaluation sent 32*16 = 512 (`record_size` * vectorization) /// chunks through a channel with total capacity 5120 (active work = 10 records) and read size /// of 2048 bytes. /// The problem was that read size of 2048 does not divide 5120, so the last chunk was not sent. @@ -461,8 +461,7 @@ mod test { let config = SendChannelConfig::new_with( gateway_config, total_records - .map(|v| TotalRecords::specified(v).unwrap()) - .unwrap_or(TotalRecords::Indeterminate), + .map_or(TotalRecords::Indeterminate, |v| TotalRecords::specified(v).unwrap()), record_size, ); diff --git a/ipa-core/src/protocol/context/dzkp_validator.rs b/ipa-core/src/protocol/context/dzkp_validator.rs index cce86b886..5a5996385 100644 --- a/ipa-core/src/protocol/context/dzkp_validator.rs +++ b/ipa-core/src/protocol/context/dzkp_validator.rs @@ -579,7 +579,7 @@ impl Batch { }; // verify BatchToVerify, return result - let r = chunk_batch + chunk_batch .verify( ctx.narrow(&Step::VerifyProof), sum_of_uv, @@ -588,8 +588,7 @@ impl Batch { &challenges_for_left_prover, &challenges_for_right_prover, ) - .await; - r + .await } } diff --git a/ipa-core/src/protocol/ipa_prf/prf_sharding/mod.rs b/ipa-core/src/protocol/ipa_prf/prf_sharding/mod.rs index b6c44a511..894bb9c17 100644 --- a/ipa-core/src/protocol/ipa_prf/prf_sharding/mod.rs +++ b/ipa-core/src/protocol/ipa_prf/prf_sharding/mod.rs @@ -53,7 +53,6 @@ use crate::{ BitDecomposed, FieldSimd, SharedValue, TransposeFrom, }, }; -use crate::seq_join::SeqJoin; pub mod feature_label_dot_product; pub(crate) mod step; @@ -511,7 +510,7 @@ where protocol: &Step::Attribute, validate: &Step::AttributeValidate, }, - sh_ctx.active_work().get(), + std::cmp::min(sh_ctx.active_work().get(), chunk_size), ); dzkp_validator.set_total_records(TotalRecords::specified(histogram[1]).unwrap()); let ctx_for_row_number = set_up_contexts(&dzkp_validator.context(), histogram)?; From 5b91898485bcf92da09d298df5969ee9e11993d6 Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Mon, 23 Sep 2024 13:06:02 -0700 Subject: [PATCH 07/12] Change the batch size to 1B for aggregate --- ipa-core/src/protocol/ipa_prf/prf_sharding/mod.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ipa-core/src/protocol/ipa_prf/prf_sharding/mod.rs b/ipa-core/src/protocol/ipa_prf/prf_sharding/mod.rs index 894bb9c17..b5d6449c4 100644 --- a/ipa-core/src/protocol/ipa_prf/prf_sharding/mod.rs +++ b/ipa-core/src/protocol/ipa_prf/prf_sharding/mod.rs @@ -539,7 +539,9 @@ where protocol: &Step::Aggregate, validate: &Step::AggregateValidate, }, - aggregate_values_proof_chunk(B, usize::try_from(TV::BITS).unwrap()), + // aggregate_values_proof_chunk(B, usize::try_from(TV::BITS).unwrap()), + // 1B batch size, suboptimal. But only to test that it works for 3M + 1 << 30, ); let user_contributions = flattened_user_results.try_collect::>().await?; let result = From d50522e71146ae47ce9c2e6a706c8762f38aca80 Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Mon, 23 Sep 2024 23:13:01 -0700 Subject: [PATCH 08/12] Spawn queries in dedicated runtime --- ipa-core/src/query/executor.rs | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/ipa-core/src/query/executor.rs b/ipa-core/src/query/executor.rs index b3e197e4d..f2e0b8793 100644 --- a/ipa-core/src/query/executor.rs +++ b/ipa-core/src/query/executor.rs @@ -4,7 +4,7 @@ use std::{ future::{ready, Future}, pin::Pin, }; - +use std::sync::OnceLock; use ::tokio::{ runtime::{Handle, RuntimeFlavor}, sync::oneshot, @@ -17,6 +17,7 @@ use rand::rngs::StdRng; use rand_core::SeedableRng; #[cfg(all(feature = "shuttle", test))] use shuttle::future as tokio; +use tokio::runtime::{Builder, Runtime}; use typenum::Unsigned; #[cfg(any( @@ -71,6 +72,13 @@ where } } +static QUERY_RUNTIME: OnceLock = OnceLock::new(); +fn get_query_runtime() -> &'static Runtime { + QUERY_RUNTIME.get_or_init(|| { + Builder::new_multi_thread().worker_threads(10).thread_name("query_runtime").enable_all().build().unwrap() + }) +} + /// Needless pass by value because IPA v3 does not make use of key registry yet. #[allow(clippy::too_many_lines, clippy::needless_pass_by_value)] pub fn execute( @@ -180,7 +188,7 @@ where { let (tx, rx) = oneshot::channel(); - let join_handle = tokio::spawn(async move { + let join_handle = get_query_runtime().spawn(async move { let gateway = gateway.borrow(); // TODO: make it a generic argument for this function let mut rng = StdRng::from_entropy(); From 271690ed4b680f2543da737e190fd01ab8e9d8bf Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Tue, 24 Sep 2024 09:54:39 -0700 Subject: [PATCH 09/12] Log more information in assertion error --- ipa-core/src/protocol/context/batcher.rs | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/ipa-core/src/protocol/context/batcher.rs b/ipa-core/src/protocol/context/batcher.rs index 7aabfa1ad..56d0ed84a 100644 --- a/ipa-core/src/protocol/context/batcher.rs +++ b/ipa-core/src/protocol/context/batcher.rs @@ -1,5 +1,5 @@ use std::{cmp::min, collections::VecDeque, future::Future}; - +use std::fmt::Debug; use bitvec::{bitvec, prelude::BitVec}; use tokio::sync::watch; @@ -78,7 +78,7 @@ enum Ready { }, } -impl<'a, B> Batcher<'a, B> { +impl<'a, B: Debug> Batcher<'a, B> { pub fn new>( records_per_batch: usize, total_records: T, @@ -248,7 +248,12 @@ impl<'a, B> Batcher<'a, B> { /// If the batcher contains more than one batch. pub fn into_single_batch(mut self) -> B { assert!(self.first_batch == 0); - assert!(self.batches.len() <= 1); + assert!(self.batches.len() <= 1, "Number of batches must be 1, got {}. Total records: {:?}/records per batch: {}. debug: {:?}", + self.batches.len(), + self.total_records, + self.records_per_batch, + self.batches + ); let batch_index = 0; match self.batches.pop_back() { Some(state) => { From 2ee113f3ca9097df3a4a88c14533f6e9c96c8a77 Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Tue, 24 Sep 2024 23:31:48 -0700 Subject: [PATCH 10/12] Bump the number of steps for ZKP to 292 from 192 --- ipa-core/src/protocol/context/step.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ipa-core/src/protocol/context/step.rs b/ipa-core/src/protocol/context/step.rs index d650340f8..f30606d03 100644 --- a/ipa-core/src/protocol/context/step.rs +++ b/ipa-core/src/protocol/context/step.rs @@ -31,7 +31,7 @@ pub(crate) enum ValidateStep { // This really is only for DZKPs and not for MACs. The MAC protocol uses record IDs to // count batches. DZKP probably should do the same to avoid the fixed upper limit. #[derive(CompactStep)] -#[step(count = 192, child = DzkpValidationProtocolStep)] +#[step(count = 292, child = DzkpValidationProtocolStep)] pub(crate) struct DzkpBatchStep(pub usize); // This is used when we don't do batched verification, to avoid paying for x256 as many From c6c176fdf1a5aa49f5210e91ffc3be06560cfc89 Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Wed, 25 Sep 2024 13:46:32 -0700 Subject: [PATCH 11/12] Bump the number of steps for ZKP to 592 from 292 292 fails on 50M --- ipa-core/src/protocol/context/step.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ipa-core/src/protocol/context/step.rs b/ipa-core/src/protocol/context/step.rs index f30606d03..65b45472c 100644 --- a/ipa-core/src/protocol/context/step.rs +++ b/ipa-core/src/protocol/context/step.rs @@ -31,7 +31,7 @@ pub(crate) enum ValidateStep { // This really is only for DZKPs and not for MACs. The MAC protocol uses record IDs to // count batches. DZKP probably should do the same to avoid the fixed upper limit. #[derive(CompactStep)] -#[step(count = 292, child = DzkpValidationProtocolStep)] +#[step(count = 592, child = DzkpValidationProtocolStep)] pub(crate) struct DzkpBatchStep(pub usize); // This is used when we don't do batched verification, to avoid paying for x256 as many From 1663ffc73b9de075aa89cbfd266d676d553652f7 Mon Sep 17 00:00:00 2001 From: Alex Koshelev Date: Wed, 25 Sep 2024 23:43:26 -0700 Subject: [PATCH 12/12] Revert step count back to 292 to reproduce draft issue --- ipa-core/src/protocol/context/step.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ipa-core/src/protocol/context/step.rs b/ipa-core/src/protocol/context/step.rs index 65b45472c..f30606d03 100644 --- a/ipa-core/src/protocol/context/step.rs +++ b/ipa-core/src/protocol/context/step.rs @@ -31,7 +31,7 @@ pub(crate) enum ValidateStep { // This really is only for DZKPs and not for MACs. The MAC protocol uses record IDs to // count batches. DZKP probably should do the same to avoid the fixed upper limit. #[derive(CompactStep)] -#[step(count = 592, child = DzkpValidationProtocolStep)] +#[step(count = 292, child = DzkpValidationProtocolStep)] pub(crate) struct DzkpBatchStep(pub usize); // This is used when we don't do batched verification, to avoid paying for x256 as many