diff --git a/node/src/actors/rad_manager/handlers.rs b/node/src/actors/rad_manager/handlers.rs index a4867c157..32c525112 100644 --- a/node/src/actors/rad_manager/handlers.rs +++ b/node/src/actors/rad_manager/handlers.rs @@ -4,7 +4,7 @@ use std::time::Duration; use actix::{Handler, ResponseFuture}; use futures::FutureExt; -use witnet_data_structures::radon_report::{RadonReport, ReportContext}; +use witnet_data_structures::radon_report::{RadonReport, ReportContext, RetrievalMetadata, Stage}; use witnet_rad::{ conditions::{evaluate_tally_precondition_clause, TallyPreconditionClauseResult}, error::RadError, @@ -48,6 +48,8 @@ impl Handler for RadManager { } }; let settings = RadonScriptExecutionSettings::disable_all(); + let retrieval_context = + ReportContext::from_stage(Stage::Retrieval(RetrievalMetadata::default())); let retrieve_responses_fut = sources .iter() .map(|retrieve| { @@ -69,10 +71,16 @@ impl Handler for RadManager { // Perform retrievals in parallel for the sake of synchronization between sources // (increasing the likeliness of multiple sources returning results that are closer to each // other). - let retrieve_responses = futures::future::join_all(retrieve_responses_fut) - .await - .into_iter() - .collect::>, RadError>>()?; + let retrieve_responses: Vec> = + futures::future::join_all(retrieve_responses_fut) + .await + .into_iter() + .map(|retrieve| { + retrieve.unwrap_or_else(|error| { + RadonReport::from_result(Err(error), &retrieval_context) + }) + }) + .collect(); // Evaluate tally precondition to ensure that at least 20% of the data sources are not errors. // This stage does not need to evaluate the postcondition. @@ -87,14 +95,19 @@ impl Handler for RadManager { // Perform aggregation on the values that made it to the output vector after applying the // source scripts (aka _normalization scripts_ in the original whitepaper) and filtering out // failures. - let (res, _) = witnet_rad::run_aggregation_report( - values, - aggregate, - RadonScriptExecutionSettings::all_but_partial_results(), - &msg.active_wips, - ); - - res + let (aggregation_result, aggregation_context) = + witnet_rad::run_aggregation_report( + values, + aggregate, + RadonScriptExecutionSettings::all_but_partial_results(), + &msg.active_wips, + ); + + // Convert Err into Ok because returning Err from this handler means that the + // node should not commit the result, and we do want to commit this error. + Ok(aggregation_result.unwrap_or_else(|error| { + RadonReport::from_result(Err(error), &aggregation_context) + })) } Ok(TallyPreconditionClauseResult::MajorityOfErrors { errors_mode }) => { Ok(RadonReport::from_result( @@ -132,6 +145,10 @@ impl Handler for RadManager { #[cfg(test)] mod tests { use actix::{Actor, MailboxError, Message}; + use witnet_data_structures::chain::{ + tapi::all_wips_active, RADAggregate, RADRequest, RADRetrieve, RADTally, RADType, + }; + use witnet_rad::reducers::RadonReducers; use crate::utils::test_actix_system; @@ -208,4 +225,99 @@ mod tests { assert_eq!(res, alive); }); } + + #[test] + fn retrieval_http_error() { + // HTTP errors should not short-circuit data request execution + test_actix_system(|| async move { + let rad_manager = RadManager::default().start(); + let rad_request = RADRequest { + time_lock: 0, + // One HTTP error source and 2 RNG sources + retrieve: vec![ + RADRetrieve { + kind: RADType::HttpGet, + // Invalid URL to trigger HTTP error + url: "a".to_string(), + script: vec![128], + body: vec![], + headers: vec![], + }, + RADRetrieve { + kind: RADType::Rng, + url: "".to_string(), + script: vec![128], + body: vec![], + headers: vec![], + }, + RADRetrieve { + kind: RADType::Rng, + url: "".to_string(), + script: vec![128], + body: vec![], + headers: vec![], + }, + ], + aggregate: RADAggregate { + filters: vec![], + reducer: RadonReducers::HashConcatenate as u32, + }, + tally: RADTally { + filters: vec![], + reducer: RadonReducers::Mode as u32, + }, + }; + let active_wips = all_wips_active(); + let res = rad_manager + .send(ResolveRA { + rad_request, + timeout: None, + active_wips, + }) + .await + .unwrap() + .unwrap(); + + assert!(matches!(res.into_inner(), RadonTypes::Bytes(..))); + }); + } + + #[test] + fn aggregation_error() { + test_actix_system(|| async move { + let rad_manager = RadManager::default().start(); + let rad_request = RADRequest { + time_lock: 0, + retrieve: vec![RADRetrieve { + kind: RADType::Rng, + url: "".to_string(), + script: vec![128], + body: vec![], + headers: vec![], + }], + aggregate: RADAggregate { + filters: vec![], + // Use invalid reducer to simulate error in aggregation function, although such + // a request is invalid and cannot be included in a block + reducer: u32::MAX, + }, + tally: RADTally { + filters: vec![], + reducer: RadonReducers::HashConcatenate as u32, + }, + }; + let active_wips = all_wips_active(); + let res = rad_manager + .send(ResolveRA { + rad_request, + timeout: None, + active_wips, + }) + .await + .unwrap() + .unwrap(); + + assert!(matches!(res.into_inner(), RadonTypes::RadonError(..))); + }); + } }