Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: skip failing ipfs request #21

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 64 additions & 46 deletions availability-oracle/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,63 +281,75 @@ pub async fn reconcile_deny_list(
supported_network_ids: &[String],
supported_ds_kinds: &[String],
) -> Result<(), Error> {
let logger = logger.clone();

// Check the availability status of all subgraphs, and gather which should flip the deny flag.
let status_changes: Vec<([u8; 32], bool)> = subgraph
.deployments_over_threshold(min_signal, grace_period)
.map(|deployment| async {
let deployment = deployment?;
let id = bytes32_to_cid_v0(deployment.id);
let validity = match check(ipfs, id, supported_network_ids, supported_ds_kinds).await {
Ok(()) => Valid::Yes,
Err(CheckError::Invalid(e)) => Valid::No(e),
Err(CheckError::Other(e)) => return Err(e),
};
Result::<_, Error>::Ok((deployment, validity))
.map(|deployment| {
let logger = logger.clone();
async move {
let deployment = match deployment {
Ok(d) => d,
Err(e) => {
error!(logger, "Failed to retrieve deployment data"; "error" => e.to_string());
return None;
}
};
let id = bytes32_to_cid_v0(deployment.id);
let logger = logger.clone();
match check(ipfs, id, supported_network_ids, supported_ds_kinds).await {
Ok(()) => Some((deployment, Valid::Yes)),
Err(CheckError::Invalid(e)) => Some((deployment, Valid::No(e))),
Err(CheckError::Other(e)) => {
error!(logger, "Failed to check subgraph"; "error" => e.to_string());
METRICS.reconcile_runs_ipfs_err.inc();
None
},
}
}
})
.buffered(100)
.try_filter_map(move |(deployment, validity)| {
.filter_map(|opt| async move {
let logger = logger.clone();
async move {
info!(logger, "Check subgraph";
let (deployment, validity) = match opt {
Some((deployment, validity)) => (deployment, validity),
None => return None,
};
info!(logger, "Check subgraph";
"id" => hex::encode(deployment.id),
"cid" => deployment.ipfs_hash()
);
let should_deny = matches!(validity, Valid::No(_));
match deployment.deny == should_deny {
// The validity is unchanged.
true => {
match validity {
Valid::Yes => (),
// Always print the error reason
Valid::No(_) => {
info!(logger, "Invalid";
"id" => hex::encode(deployment.id),
"cid" => deployment.ipfs_hash()
);

let should_deny = matches!(validity, Valid::No(_));
Ok(match deployment.deny == should_deny {
// The validity is unchanged.
true => {
match validity {
Valid::Yes => (),
// Always print the error reason
Valid::No(_) => {
info!(logger, "Invalid";
"cid" => deployment.ipfs_hash(),
"reason" => validity.to_string(),
);
}
};
None
}

// The validity status changed, flip the deny flag.
false => {
info!(logger, "Change deny status";
"id" => hex::encode(deployment.id),
"cid" => deployment.ipfs_hash(),
"status" => should_deny,
"reason" => validity.to_string(),
);
}
};
None
}

// The validity status changed, flip the deny flag.
false => {
info!(logger, "Change deny status";
"id" => hex::encode(deployment.id),
"cid" => deployment.ipfs_hash(),
"status" => should_deny,
"reason" => validity.to_string(),
);
Some((deployment.id, should_deny))
}
})
);
Some((deployment.id, should_deny))
}
}
})
.try_collect()
.await?;
})
.collect::<Vec<_>>()
.await;

state_manager.deny_many(status_changes).await
}
Expand Down Expand Up @@ -522,6 +534,7 @@ struct Metrics {
reconcile_runs_total: prometheus::IntCounter,
reconcile_runs_ok: prometheus::IntCounter,
reconcile_runs_err: prometheus::IntCounter,
reconcile_runs_ipfs_err: prometheus::IntCounter,
}

lazy_static! {
Expand All @@ -546,6 +559,11 @@ impl Metrics {
"Total reconcile runs with errors"
)
.unwrap(),
reconcile_runs_ipfs_err: prometheus::register_int_counter!(
"reconcile_runs_ipfs_err",
"Total reconcile runs with IPFS errors"
)
.unwrap(),
}
}
}
Loading