From b671669fba2fada0a51d00169b57d4a8f2b65fa8 Mon Sep 17 00:00:00 2001 From: grumbach Date: Wed, 30 Oct 2024 15:59:59 +0900 Subject: [PATCH] chore: use universal batching fn --- autonomi/src/client/utils.rs | 43 +++++++++++++++++++----------------- 1 file changed, 23 insertions(+), 20 deletions(-) diff --git a/autonomi/src/client/utils.rs b/autonomi/src/client/utils.rs index fc3679e01b..53c7f88747 100644 --- a/autonomi/src/client/utils.rs +++ b/autonomi/src/client/utils.rs @@ -33,29 +33,32 @@ use crate::utils::payment_proof_from_quotes_and_payments; impl Client { /// Fetch and decrypt all chunks in the data map. pub(crate) async fn fetch_from_data_map(&self, data_map: &DataMap) -> Result { - let mut encrypted_chunks = vec![]; - - let mut stream = futures::stream::iter(data_map.infos().into_iter()) - .map(|info| { - let dst_hash = info.dst_hash; - async move { - self.chunk_get(dst_hash) - .await - .inspect_err(move |err| { - error!("Error fetching chunk {:?}: {err:?}", dst_hash) - }) - .map(|chunk| EncryptedChunk { - index: info.index, - content: chunk.value, - }) + let mut download_tasks = vec![]; + for info in data_map.infos() { + download_tasks.push(async move { + match self + .chunk_get(info.dst_hash) + .await + .inspect_err(|err| error!("Error fetching chunk {:?}: {err:?}", info.dst_hash)) + { + Ok(chunk) => Ok(EncryptedChunk { + index: info.index, + content: chunk.value, + }), + Err(err) => { + error!("Error fetching chunk {:?}: {err:?}", info.dst_hash); + Err(err) + } } - }) - .buffered(*CHUNK_DOWNLOAD_BATCH_SIZE); - - while let Some(encrypted_chunk_result) = stream.next().await { - encrypted_chunks.push(encrypted_chunk_result?); + }); } + let encrypted_chunks = + process_tasks_with_max_concurrency(download_tasks, *CHUNK_DOWNLOAD_BATCH_SIZE) + .await + .into_iter() + .collect::, GetError>>()?; + let data = decrypt_full_set(data_map, &encrypted_chunks).map_err(|e| { error!("Error decrypting encrypted_chunks: {e:?}"); GetError::Decryption(crate::self_encryption::Error::SelfEncryption(e))