Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(node): commit http errors in data request retrieval #2307

Merged
merged 1 commit into from
Feb 3, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 125 additions & 13 deletions node/src/actors/rad_manager/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::time::Duration;

use actix::{Handler, ResponseFuture};
use futures::FutureExt;
use witnet_data_structures::radon_report::{RadonReport, ReportContext};
use witnet_data_structures::radon_report::{RadonReport, ReportContext, RetrievalMetadata, Stage};
use witnet_rad::{
conditions::{evaluate_tally_precondition_clause, TallyPreconditionClauseResult},
error::RadError,
Expand Down Expand Up @@ -48,6 +48,8 @@ impl Handler<ResolveRA> for RadManager {
}
};
let settings = RadonScriptExecutionSettings::disable_all();
let retrieval_context =
ReportContext::from_stage(Stage::Retrieval(RetrievalMetadata::default()));
let retrieve_responses_fut = sources
.iter()
.map(|retrieve| {
Expand All @@ -69,10 +71,16 @@ impl Handler<ResolveRA> for RadManager {
// Perform retrievals in parallel for the sake of synchronization between sources
// (increasing the likeliness of multiple sources returning results that are closer to each
// other).
let retrieve_responses = futures::future::join_all(retrieve_responses_fut)
.await
.into_iter()
.collect::<Result<Vec<RadonReport<RadonTypes>>, RadError>>()?;
let retrieve_responses: Vec<RadonReport<RadonTypes>> =
futures::future::join_all(retrieve_responses_fut)
.await
.into_iter()
.map(|retrieve| {
retrieve.unwrap_or_else(|error| {
RadonReport::from_result(Err(error), &retrieval_context)
})
})
.collect();

// Evaluate tally precondition to ensure that at least 20% of the data sources are not errors.
// This stage does not need to evaluate the postcondition.
Expand All @@ -87,14 +95,19 @@ impl Handler<ResolveRA> for RadManager {
// Perform aggregation on the values that made it to the output vector after applying the
// source scripts (aka _normalization scripts_ in the original whitepaper) and filtering out
// failures.
let (res, _) = witnet_rad::run_aggregation_report(
values,
aggregate,
RadonScriptExecutionSettings::all_but_partial_results(),
&msg.active_wips,
);

res
let (aggregation_result, aggregation_context) =
witnet_rad::run_aggregation_report(
values,
aggregate,
RadonScriptExecutionSettings::all_but_partial_results(),
&msg.active_wips,
);

// Convert Err into Ok because returning Err from this handler means that the
// node should not commit the result, and we do want to commit this error.
Ok(aggregation_result.unwrap_or_else(|error| {
RadonReport::from_result(Err(error), &aggregation_context)
}))
}
Ok(TallyPreconditionClauseResult::MajorityOfErrors { errors_mode }) => {
Ok(RadonReport::from_result(
Expand Down Expand Up @@ -132,6 +145,10 @@ impl Handler<RunTally> for RadManager {
#[cfg(test)]
mod tests {
use actix::{Actor, MailboxError, Message};
use witnet_data_structures::chain::{
tapi::all_wips_active, RADAggregate, RADRequest, RADRetrieve, RADTally, RADType,
};
use witnet_rad::reducers::RadonReducers;

use crate::utils::test_actix_system;

Expand Down Expand Up @@ -208,4 +225,99 @@ mod tests {
assert_eq!(res, alive);
});
}

#[test]
fn retrieval_http_error() {
// HTTP errors should not short-circuit data request execution
test_actix_system(|| async move {
let rad_manager = RadManager::default().start();
let rad_request = RADRequest {
time_lock: 0,
// One HTTP error source and 2 RNG sources
retrieve: vec![
RADRetrieve {
kind: RADType::HttpGet,
// Invalid URL to trigger HTTP error
url: "a".to_string(),
script: vec![128],
body: vec![],
headers: vec![],
},
RADRetrieve {
kind: RADType::Rng,
url: "".to_string(),
script: vec![128],
body: vec![],
headers: vec![],
},
RADRetrieve {
kind: RADType::Rng,
url: "".to_string(),
script: vec![128],
body: vec![],
headers: vec![],
},
],
aggregate: RADAggregate {
filters: vec![],
reducer: RadonReducers::HashConcatenate as u32,
},
tally: RADTally {
filters: vec![],
reducer: RadonReducers::Mode as u32,
},
};
let active_wips = all_wips_active();
let res = rad_manager
.send(ResolveRA {
rad_request,
timeout: None,
active_wips,
})
.await
.unwrap()
.unwrap();

assert!(matches!(res.into_inner(), RadonTypes::Bytes(..)));
});
}

#[test]
fn aggregation_error() {
test_actix_system(|| async move {
let rad_manager = RadManager::default().start();
let rad_request = RADRequest {
time_lock: 0,
retrieve: vec![RADRetrieve {
kind: RADType::Rng,
url: "".to_string(),
script: vec![128],
body: vec![],
headers: vec![],
}],
aggregate: RADAggregate {
filters: vec![],
// Use invalid reducer to simulate error in aggregation function, although such
// a request is invalid and cannot be included in a block
reducer: u32::MAX,
},
tally: RADTally {
filters: vec![],
reducer: RadonReducers::HashConcatenate as u32,
},
};
let active_wips = all_wips_active();
let res = rad_manager
.send(ResolveRA {
rad_request,
timeout: None,
active_wips,
})
.await
.unwrap()
.unwrap();

assert!(matches!(res.into_inner(), RadonTypes::RadonError(..)));
});
}
}