diff --git a/availability-oracle/src/main.rs b/availability-oracle/src/main.rs index ae42689..d51f7a9 100644 --- a/availability-oracle/src/main.rs +++ b/availability-oracle/src/main.rs @@ -281,63 +281,75 @@ pub async fn reconcile_deny_list( supported_network_ids: &[String], supported_ds_kinds: &[String], ) -> Result<(), Error> { - let logger = logger.clone(); - // Check the availability status of all subgraphs, and gather which should flip the deny flag. let status_changes: Vec<([u8; 32], bool)> = subgraph .deployments_over_threshold(min_signal, grace_period) - .map(|deployment| async { - let deployment = deployment?; - let id = bytes32_to_cid_v0(deployment.id); - let validity = match check(ipfs, id, supported_network_ids, supported_ds_kinds).await { - Ok(()) => Valid::Yes, - Err(CheckError::Invalid(e)) => Valid::No(e), - Err(CheckError::Other(e)) => return Err(e), - }; - Result::<_, Error>::Ok((deployment, validity)) + .map(|deployment| { + let logger = logger.clone(); + async move { + let deployment = match deployment { + Ok(d) => d, + Err(e) => { + error!(logger, "Failed to retrieve deployment data"; "error" => e.to_string()); + return None; + } + }; + let id = bytes32_to_cid_v0(deployment.id); + let logger = logger.clone(); + match check(ipfs, id, supported_network_ids, supported_ds_kinds).await { + Ok(()) => Some((deployment, Valid::Yes)), + Err(CheckError::Invalid(e)) => Some((deployment, Valid::No(e))), + Err(CheckError::Other(e)) => { + error!(logger, "Failed to check subgraph"; "error" => e.to_string()); + METRICS.reconcile_runs_ipfs_err.inc(); + None + }, + } + } }) .buffered(100) - .try_filter_map(move |(deployment, validity)| { + .filter_map(|opt| async move { let logger = logger.clone(); - async move { - info!(logger, "Check subgraph"; + let (deployment, validity) = match opt { + Some((deployment, validity)) => (deployment, validity), + None => return None, + }; + info!(logger, "Check subgraph"; + "id" => hex::encode(deployment.id), + "cid" => deployment.ipfs_hash() + ); + let should_deny = matches!(validity, Valid::No(_)); + match deployment.deny == should_deny { + // The validity is unchanged. + true => { + match validity { + Valid::Yes => (), + // Always print the error reason + Valid::No(_) => { + info!(logger, "Invalid"; "id" => hex::encode(deployment.id), - "cid" => deployment.ipfs_hash() - ); - - let should_deny = matches!(validity, Valid::No(_)); - Ok(match deployment.deny == should_deny { - // The validity is unchanged. - true => { - match validity { - Valid::Yes => (), - // Always print the error reason - Valid::No(_) => { - info!(logger, "Invalid"; + "cid" => deployment.ipfs_hash(), + "reason" => validity.to_string(), + ); + } + }; + None + } + + // The validity status changed, flip the deny flag. + false => { + info!(logger, "Change deny status"; "id" => hex::encode(deployment.id), "cid" => deployment.ipfs_hash(), + "status" => should_deny, "reason" => validity.to_string(), - ); - } - }; - None - } - - // The validity status changed, flip the deny flag. - false => { - info!(logger, "Change deny status"; - "id" => hex::encode(deployment.id), - "cid" => deployment.ipfs_hash(), - "status" => should_deny, - "reason" => validity.to_string(), - ); - Some((deployment.id, should_deny)) - } - }) + ); + Some((deployment.id, should_deny)) + } } - }) - .try_collect() - .await?; + }) + .collect::>() + .await; state_manager.deny_many(status_changes).await } @@ -522,6 +534,7 @@ struct Metrics { reconcile_runs_total: prometheus::IntCounter, reconcile_runs_ok: prometheus::IntCounter, reconcile_runs_err: prometheus::IntCounter, + reconcile_runs_ipfs_err: prometheus::IntCounter, } lazy_static! { @@ -546,6 +559,11 @@ impl Metrics { "Total reconcile runs with errors" ) .unwrap(), + reconcile_runs_ipfs_err: prometheus::register_int_counter!( + "reconcile_runs_ipfs_err", + "Total reconcile runs with IPFS errors" + ) + .unwrap(), } } }