Skip to content

Commit

Permalink
Prover service trait and SHARP client implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
unstark committed Jun 15, 2024
1 parent 3fe3733 commit 081b35b
Show file tree
Hide file tree
Showing 56 changed files with 5,923 additions and 745 deletions.
Binary file removed .github/.DS_Store
Binary file not shown.
5,211 changes: 4,561 additions & 650 deletions Cargo.lock

Large diffs are not rendered by default.

53 changes: 40 additions & 13 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,10 @@ members = [
"crates/orchestrator",
"crates/da_clients/da-client-interface",
"crates/da_clients/ethereum",
"crates/prover_services/prover_service",
"crates/prover_services/gps_fact_checker",
"crates/prover_services/sharp_service",
"crates/prover_services/stone_service",
"crates/utils",
]

Expand All @@ -15,24 +19,47 @@ authors = ["Apoorv Sadana <@apoorvsadana>"]
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[workspace.dependencies]
async-trait = { version = "0.1.77" }
alloy = { git = "https://github.com/alloy-rs/alloy", rev = "7373f6db761d5a19888e3a0c527e8a3ca31e7a1e" }
alloy-primitives = "0.7.4"
async-trait = "0.1.77"
axum = { version = "0.7.4" }
axum-macros = { version = "0.4.1" }
color-eyre = { version = "0.6.2" }
dotenvy = { version = "0.15.7" }
futures = { version = "0.3.30" }
axum-macros = "0.4.1"
color-eyre = "0.6.2"
dotenvy = "0.15.7"
futures = "0.3.30"
mongodb = { version = "2.8.1" }
omniqueue = { version = "0.2.0" }
rstest = { version = "0.18.2" }
reqwest = { version = "0.11.24" }
rstest = "0.18.2"
serde = { version = "1.0.197" }
serde_json = { version = "1.0.114" }
starknet = { version = "0.9.0" }
thiserror = { version = "1.0.57" }
tokio = { version = "1.36.0" }
tracing = { version = "0.1.40" }
serde_json = "1.0.114"
starknet = "0.9.0"
tempfile = "3.8.1"
thiserror = "1.0.57"
tokio = { version = "1.37.0" }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18" }
url = { version = "2.5.0" }
uuid = { version = "1.7.0" }
url = { version = "2.5.0", features = ["serde"] }
uuid = { version = "1.7.0", features = ["v4", "serde"] }
stark_evm_adapter = "0.1.1"
hex = "0.4"
itertools = "0.13.0"

# Cairo VM (same version as in SNOS)
cairo-vm = { git = "https://github.com/lambdaclass/cairo-vm", rev = "f87be4d9cfad2100d4a5c085cf2aabc9caced40f", features = ["extensive_hints", "cairo-1-hints"] }

# Sharp (Starkware)
snos = { git = "https://github.com/unstark/snos", branch = "bump-cairo-lang" }

# Madara prover API
madara-prover-common = { git = "https://github.com/Moonsong-Labs/madara-prover-api", branch = "od/use-latest-cairo-vm" }
madara-prover-rpc-client = { git = "https://github.com/Moonsong-Labs/madara-prover-api", branch = "od/use-latest-cairo-vm" }

# Project
da-client-interface = { path = "crates/da_clients/da-client-interface" }
ethereum-da-client = { path = "crates/da_clients/ethereum" }
utils = { path = "crates/utils" }
prover-service = { path = "crates/prover_services/prover_service" }
gps-fact-checker = { path = "crates/prover_services/gps_fact_checker" }
sharp-service = { path = "crates/prover_services/sharp_service" }
stone-service = { path = "crates/prover_services/stone_service" }
3 changes: 2 additions & 1 deletion crates/da_clients/ethereum/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@ version.workspace = true
edition.workspace = true

[dependencies]
alloy = { git = "https://github.com/alloy-rs/alloy", rev = "86027c9bb984f3a12a30ffd2a3c5f2f06595f1d6", features = [
alloy = { workspace = true, features = [
"providers",
"rpc-client",
"transport-http",
"reqwest",
] }
async-trait = { workspace = true }
color-eyre = { workspace = true }
Expand Down
8 changes: 4 additions & 4 deletions crates/da_clients/ethereum/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#![allow(missing_docs)]
#![allow(clippy::missing_docs_in_private_items)]
use std::str::FromStr;

use alloy::rpc::client::RpcClient;
use alloy::transports::http::Http;
use alloy::transports::http::{Client, Http};
use async_trait::async_trait;
use color_eyre::Result;
use reqwest::Client;
use starknet::core::types::FieldElement;
use std::str::FromStr;
use url::Url;

use config::EthereumDaConfig;
Expand All @@ -32,7 +32,7 @@ impl DaClient for EthereumDaClient {
impl From<EthereumDaConfig> for EthereumDaClient {
fn from(config: EthereumDaConfig) -> Self {
let provider = RpcClient::builder()
.reqwest_http(Url::from_str(config.rpc_url.as_str()).expect("Failed to parse ETHEREUM_RPC_URL"));
.http(Url::from_str(config.rpc_url.as_str()).expect("Failed to parse ETHEREUM_RPC_URL"));
EthereumDaClient { provider }
}
}
5 changes: 5 additions & 0 deletions crates/orchestrator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
url = { workspace = true }
uuid = { workspace = true, features = ["v4", "serde"] }
sharp-service = { workspace = true }
stone-service = { workspace = true }
prover-service = { workspace = true }
utils = { workspace = true }
cairo-vm = { workspace = true }

[features]
default = ["ethereum", "with_mongodb", "with_sqs"]
Expand Down
49 changes: 42 additions & 7 deletions crates/orchestrator/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,26 @@
use crate::database::mongodb::config::MongoDbConfig;
use crate::database::mongodb::MongoDb;
use crate::database::{Database, DatabaseConfig};
use crate::queue::sqs::SqsQueue;
use crate::queue::QueueProvider;
use crate::utils::env_utils::get_env_var_or_panic;
use da_client_interface::{DaClient, DaConfig};
use dotenvy::dotenv;
use ethereum_da_client::config::EthereumDaConfig;
use ethereum_da_client::EthereumDaClient;
use prover_service::ProverService;
use sharp_service::SharpProverService;
use starknet::providers::jsonrpc::HttpTransport;
use starknet::providers::{JsonRpcClient, Url};
use std::sync::Arc;
use stone_service::gps::GpsFactRegistry;
use stone_service::integrity::IntegrityFactRegistry;
use stone_service::sovereign::SovereignProofRegistry;
use stone_service::StoneProverService;
use tokio::sync::OnceCell;
use utils::env_utils::get_env_var_or_panic;
use utils::settings::default::DefaultSettingsProvider;
use utils::settings::SettingsProvider;

use crate::database::mongodb::config::MongoDbConfig;
use crate::database::mongodb::MongoDb;
use crate::database::{Database, DatabaseConfig};
use crate::queue::sqs::SqsQueue;
use crate::queue::QueueProvider;

/// The app config. It can be accessed from anywhere inside the service
/// by calling `config` function.
Expand All @@ -20,6 +29,8 @@ pub struct Config {
starknet_client: Arc<JsonRpcClient<HttpTransport>>,
/// The DA client to interact with the DA layer
da_client: Box<dyn DaClient>,
/// The service that produces proof and registers it onchain
prover: Box<dyn ProverService>,
/// The database client
database: Box<dyn Database>,
/// The queue provider
Expand All @@ -37,6 +48,11 @@ impl Config {
self.da_client.as_ref()
}

/// Returns the proving service
pub fn prover(&self) -> &dyn ProverService {
self.prover.as_ref()
}

/// Returns the database client
pub fn database(&self) -> &dyn Database {
self.database.as_ref()
Expand Down Expand Up @@ -67,7 +83,15 @@ async fn init_config() -> Config {
// init the queue
let queue = Box::new(SqsQueue {});

Config { starknet_client: Arc::new(provider), da_client: build_da_client(), database, queue }
let settings_provider = DefaultSettingsProvider {};

Config {
starknet_client: Arc::new(provider),
da_client: build_da_client(),
prover: create_prover_service(&settings_provider),
database,
queue,
}
}

/// Returns the app config. Initializes if not already done.
Expand All @@ -85,3 +109,14 @@ fn build_da_client() -> Box<dyn DaClient + Send + Sync> {
_ => panic!("Unsupported DA layer"),
}
}

/// Creates prover service based on the environment variable PROVER_SERVICE
fn create_prover_service(settings_provider: &impl SettingsProvider) -> Box<dyn ProverService> {
match get_env_var_or_panic("PROVER_SERVICE").as_str() {
"sharp" => Box::new(SharpProverService::with_settings(settings_provider)),
"stone_gps" => Box::new(StoneProverService::<GpsFactRegistry>::with_settings(settings_provider)),
"stone_integrity" => Box::new(StoneProverService::<IntegrityFactRegistry>::with_settings(settings_provider)),
"stone_sovereign" => Box::new(StoneProverService::<SovereignProofRegistry>::with_settings(settings_provider)),
_ => panic!("Unsupported prover service"),
}
}
5 changes: 3 additions & 2 deletions crates/orchestrator/src/controllers/jobs_controller.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use crate::controllers::errors::AppError;
use crate::jobs::types::JobType;
use axum::extract::Json;
use serde::Deserialize;

use crate::controllers::errors::AppError;
use crate::jobs::types::JobType;

/// Client request to create a job
#[derive(Debug, Deserialize)]
pub struct CreateJobRequest {
Expand Down
6 changes: 4 additions & 2 deletions crates/orchestrator/src/database/mod.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use crate::jobs::types::{JobItem, JobStatus, JobType};
use std::collections::HashMap;

use async_trait::async_trait;
use color_eyre::Result;
use std::collections::HashMap;
use uuid::Uuid;

use crate::jobs::types::{JobItem, JobStatus, JobType};

/// MongoDB
pub mod mongodb;

Expand Down
3 changes: 2 additions & 1 deletion crates/orchestrator/src/database/mongodb/config.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use utils::env_utils::get_env_var_or_panic;

use crate::database::DatabaseConfig;
use crate::utils::env_utils::get_env_var_or_panic;

pub struct MongoDbConfig {
pub url: String,
Expand Down
29 changes: 14 additions & 15 deletions crates/orchestrator/src/database/mongodb/mod.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
use crate::database::mongodb::config::MongoDbConfig;
use crate::database::Database;
use crate::jobs::types::{JobItem, JobStatus, JobType};
use std::collections::HashMap;

use async_trait::async_trait;
use color_eyre::eyre::eyre;
use color_eyre::Result;
use mongodb::bson::Document;
use mongodb::options::UpdateOptions;
use mongodb::{
bson::doc,
options::{ClientOptions, ServerApi, ServerApiVersion},
Client, Collection,
};
use std::collections::HashMap;
use mongodb::bson::{doc, Document};
use mongodb::options::{ClientOptions, ServerApi, ServerApiVersion, UpdateOptions};
use mongodb::{Client, Collection};
use uuid::Uuid;

use crate::database::mongodb::config::MongoDbConfig;
use crate::database::Database;
use crate::jobs::types::{JobItem, JobStatus, JobType};

pub mod config;

pub struct MongoDb {
Expand All @@ -23,7 +21,8 @@ pub struct MongoDb {
impl MongoDb {
pub async fn new(config: MongoDbConfig) -> Self {
let mut client_options = ClientOptions::parse(config.url).await.expect("Failed to parse MongoDB Url");
// Set the server_api field of the client_options object to set the version of the Stable API on the client
// Set the server_api field of the client_options object to set the version of the Stable API on the
// client
let server_api = ServerApi::builder().version(ServerApiVersion::V1).build();
client_options.server_api = Some(server_api);
// Get a handle to the cluster
Expand All @@ -39,9 +38,9 @@ impl MongoDb {
self.client.database("orchestrator").collection("jobs")
}

/// Updates the job in the database optimistically. This means that the job is updated only if the
/// version of the job in the database is the same as the version of the job passed in. If the version
/// is different, the update fails.
/// Updates the job in the database optimistically. This means that the job is updated only if
/// the version of the job in the database is the same as the version of the job passed in.
/// If the version is different, the update fails.
async fn update_job_optimistically(&self, current_job: &JobItem, update: Document) -> Result<()> {
let filter = doc! {
"id": current_job.id,
Expand Down
13 changes: 8 additions & 5 deletions crates/orchestrator/src/jobs/da_job/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
use crate::config::Config;
use crate::jobs::types::{JobItem, JobStatus, JobType, JobVerificationStatus};
use crate::jobs::Job;
use std::collections::HashMap;

use async_trait::async_trait;
use color_eyre::eyre::eyre;
use color_eyre::Result;
use starknet::core::types::{BlockId, FieldElement, MaybePendingStateUpdate, StateUpdate, StorageEntry};
use starknet::providers::Provider;
use std::collections::HashMap;
use tracing::log;
use uuid::Uuid;

use crate::config::Config;
use crate::jobs::types::{JobItem, JobStatus, JobType, JobVerificationStatus};
use crate::jobs::Job;

pub struct DaJob;

#[async_trait]
Expand Down Expand Up @@ -160,9 +162,10 @@ fn da_word(class_flag: bool, nonce_change: Option<FieldElement>, num_changes: u6

#[cfg(test)]
mod tests {
use super::*;
use rstest::rstest;

use super::*;

#[rstest]
#[case(false, 1, 1, "18446744073709551617")]
#[case(false, 1, 0, "18446744073709551616")]
Expand Down
29 changes: 17 additions & 12 deletions crates/orchestrator/src/jobs/mod.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
use crate::config::{config, Config};
use crate::jobs::constants::{JOB_PROCESS_ATTEMPT_METADATA_KEY, JOB_VERIFICATION_ATTEMPT_METADATA_KEY};
use crate::jobs::types::{JobItem, JobStatus, JobType, JobVerificationStatus};
use crate::queue::job_queue::{add_job_to_process_queue, add_job_to_verification_queue};
use std::collections::HashMap;
use std::time::Duration;

use async_trait::async_trait;
use color_eyre::eyre::eyre;
use color_eyre::Result;
use std::collections::HashMap;
use std::time::Duration;
use tracing::log;
use uuid::Uuid;

use crate::config::{config, Config};
use crate::jobs::constants::{JOB_PROCESS_ATTEMPT_METADATA_KEY, JOB_VERIFICATION_ATTEMPT_METADATA_KEY};
use crate::jobs::types::{JobItem, JobStatus, JobType, JobVerificationStatus};
use crate::queue::job_queue::{add_job_to_process_queue, add_job_to_verification_queue};

mod constants;
pub mod da_job;
pub mod prover_job;
pub mod types;

/// The Job trait is used to define the methods that a job
Expand Down Expand Up @@ -60,8 +63,8 @@ pub async fn create_job(job_type: JobType, internal_id: String) -> Result<()> {
Ok(())
}

/// Processes the job, increments the process attempt count and updates the status of the job in the DB.
/// It then adds the job to the verification queue.
/// Processes the job, increments the process attempt count and updates the status of the job in the
/// DB. It then adds the job to the verification queue.
pub async fn process_job(id: Uuid) -> Result<()> {
let config = config().await;
let job = get_job(id).await?;
Expand All @@ -78,7 +81,8 @@ pub async fn process_job(id: Uuid) -> Result<()> {
}
}
// this updates the version of the job. this ensures that if another thread was about to process
// the same job, it would fail to update the job in the database because the version would be outdated
// the same job, it would fail to update the job in the database because the version would be
// outdated
config.database().update_job_status(&job, JobStatus::LockedForProcessing).await?;

let job_handler = get_job_handler(&job.job_type);
Expand All @@ -96,9 +100,10 @@ pub async fn process_job(id: Uuid) -> Result<()> {
Ok(())
}

/// Verifies the job and updates the status of the job in the DB. If the verification fails, it retries
/// processing the job if the max attempts have not been exceeded. If the max attempts have been exceeded,
/// it marks the job as timedout. If the verification is still pending, it pushes the job back to the queue.
/// Verifies the job and updates the status of the job in the DB. If the verification fails, it
/// retries processing the job if the max attempts have not been exceeded. If the max attempts have
/// been exceeded, it marks the job as timedout. If the verification is still pending, it pushes the
/// job back to the queue.
pub async fn verify_job(id: Uuid) -> Result<()> {
let config = config().await;
let job = get_job(id).await?;
Expand Down
Loading

0 comments on commit 081b35b

Please sign in to comment.