diff --git a/Cargo.lock b/Cargo.lock index edd4c36a60..026548b653 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14484,6 +14484,7 @@ dependencies = [ "sqlx", "starknet 0.12.0", "starknet-crypto 0.7.2", + "tempfile", "tokio", "tokio-stream", "tokio-util", @@ -14558,6 +14559,7 @@ dependencies = [ "sqlx", "starknet 0.12.0", "starknet-crypto 0.7.2", + "tempfile", "thiserror", "tokio", "tokio-util", @@ -14596,6 +14598,7 @@ dependencies = [ "starknet-crypto 0.7.2", "strum 0.25.0", "strum_macros 0.25.3", + "tempfile", "thiserror", "tokio", "tokio-stream", @@ -14635,6 +14638,7 @@ dependencies = [ "starknet-crypto 0.7.2", "strum 0.25.0", "strum_macros 0.25.3", + "tempfile", "thiserror", "tokio", "tokio-stream", diff --git a/bin/torii/Cargo.toml b/bin/torii/Cargo.toml index 977764b26d..aadbd390cd 100644 --- a/bin/torii/Cargo.toml +++ b/bin/torii/Cargo.toml @@ -46,6 +46,7 @@ tracing-subscriber.workspace = true tracing.workspace = true url.workspace = true webbrowser = "0.8" +tempfile.workspace = true [dev-dependencies] camino.workspace = true diff --git a/bin/torii/src/main.rs b/bin/torii/src/main.rs index cf568429ce..3e7931cf8f 100644 --- a/bin/torii/src/main.rs +++ b/bin/torii/src/main.rs @@ -27,10 +27,12 @@ use sqlx::SqlitePool; use starknet::core::types::Felt; use starknet::providers::jsonrpc::HttpTransport; use starknet::providers::JsonRpcClient; +use tempfile::NamedTempFile; use tokio::sync::broadcast; use tokio::sync::broadcast::Sender; use tokio_stream::StreamExt; use torii_core::engine::{Engine, EngineConfig, IndexingFlags, Processors}; +use torii_core::executor::Executor; use torii_core::processors::event_message::EventMessageProcessor; use torii_core::processors::generate_event_processors_map; use torii_core::processors::metadata_update::MetadataUpdateProcessor; @@ -64,7 +66,7 @@ struct Args { /// Database filepath (ex: indexer.db). If specified file doesn't exist, it will be /// created. Defaults to in-memory database - #[arg(short, long, default_value = ":memory:")] + #[arg(short, long, default_value = "")] database: String, /// Specify a block to start indexing from, ignored if stored head exists @@ -163,8 +165,12 @@ async fn main() -> anyhow::Result<()> { }) .expect("Error setting Ctrl-C handler"); + let tempfile = NamedTempFile::new()?; + let database_path = + if args.database.is_empty() { tempfile.path().to_str().unwrap() } else { &args.database }; + let mut options = - SqliteConnectOptions::from_str(&args.database)?.create_if_missing(true).with_regexp(); + SqliteConnectOptions::from_str(database_path)?.create_if_missing(true).with_regexp(); // Performance settings options = options.auto_vacuum(SqliteAutoVacuum::None); @@ -185,7 +191,12 @@ async fn main() -> anyhow::Result<()> { // Get world address let world = WorldContractReader::new(args.world_address, provider.clone()); - let db = Sql::new(pool.clone(), args.world_address).await?; + let (mut executor, sender) = Executor::new(pool.clone(), shutdown_tx.clone()).await?; + tokio::spawn(async move { + executor.run().await.unwrap(); + }); + + let db = Sql::new(pool.clone(), args.world_address, sender.clone()).await?; let processors = Processors { event: generate_event_processors_map(vec![ diff --git a/crates/torii/core/Cargo.toml b/crates/torii/core/Cargo.toml index a1221b1f0d..51e2037ce1 100644 --- a/crates/torii/core/Cargo.toml +++ b/crates/torii/core/Cargo.toml @@ -41,3 +41,4 @@ dojo-test-utils.workspace = true dojo-utils.workspace = true katana-runner.workspace = true scarb.workspace = true +tempfile.workspace = true diff --git a/crates/torii/core/src/engine.rs b/crates/torii/core/src/engine.rs index 9f1131b149..010f7d7385 100644 --- a/crates/torii/core/src/engine.rs +++ b/crates/torii/core/src/engine.rs @@ -7,6 +7,7 @@ use std::time::Duration; use anyhow::Result; use bitflags::bitflags; use dojo_world::contracts::world::WorldContractReader; +use futures_util::future::try_join_all; use hashlink::LinkedHashMap; use starknet::core::types::{ BlockId, BlockTag, EmittedEvent, Event, EventFilter, Felt, MaybePendingBlockWithReceipts, @@ -17,7 +18,6 @@ use starknet::providers::Provider; use tokio::sync::broadcast::Sender; use tokio::sync::mpsc::Sender as BoundedSender; use tokio::sync::Semaphore; -use tokio::task::JoinSet; use tokio::time::{sleep, Instant}; use tracing::{debug, error, info, trace, warn}; @@ -108,6 +108,13 @@ pub struct ParallelizedEvent { pub event: Event, } +#[derive(Debug)] +pub struct EngineHead { + pub block_number: u64, + pub last_pending_block_world_tx: Option, + pub last_pending_block_tx: Option, +} + #[allow(missing_debug_implementations)] pub struct Engine { world: Arc>, @@ -151,7 +158,7 @@ impl Engine

{ // use the start block provided by user if head is 0 let (head, _, _) = self.db.head().await?; if head == 0 { - self.db.set_head(self.config.start_block); + self.db.set_head(self.config.start_block)?; } else if self.config.start_block != 0 { warn!(target: LOG_TARGET, "Start block ignored, stored head exists and will be used instead."); } @@ -164,6 +171,7 @@ impl Engine

{ let mut erroring_out = false; loop { let (head, last_pending_block_world_tx, last_pending_block_tx) = self.db.head().await?; + tokio::select! { _ = shutdown_rx.recv() => { break Ok(()); @@ -179,7 +187,7 @@ impl Engine

{ } match self.process(fetch_result).await { - Ok(()) => {} + Ok(_) => self.db.execute().await?, Err(e) => { error!(target: LOG_TARGET, error = %e, "Processing fetched data."); erroring_out = true; @@ -363,21 +371,15 @@ impl Engine

{ })) } - pub async fn process(&mut self, fetch_result: FetchDataResult) -> Result<()> { + pub async fn process(&mut self, fetch_result: FetchDataResult) -> Result> { match fetch_result { - FetchDataResult::Range(data) => { - self.process_range(data).await?; - } - FetchDataResult::Pending(data) => { - self.process_pending(data).await?; - } - FetchDataResult::None => {} + FetchDataResult::Range(data) => self.process_range(data).await.map(Some), + FetchDataResult::Pending(data) => self.process_pending(data).await.map(Some), + FetchDataResult::None => Ok(None), } - - Ok(()) } - pub async fn process_pending(&mut self, data: FetchPendingResult) -> Result<()> { + pub async fn process_pending(&mut self, data: FetchPendingResult) -> Result { // Skip transactions that have been processed already // Our cursor is the last processed transaction @@ -407,16 +409,19 @@ impl Engine

{ // provider. So we can fail silently and try // again in the next iteration. warn!(target: LOG_TARGET, transaction_hash = %format!("{:#x}", transaction_hash), "Retrieving pending transaction receipt."); - self.db.set_head(data.block_number - 1); + self.db.set_head(data.block_number - 1)?; if let Some(tx) = last_pending_block_tx { - self.db.set_last_pending_block_tx(Some(tx)); + self.db.set_last_pending_block_tx(Some(tx))?; } if let Some(tx) = last_pending_block_world_tx { - self.db.set_last_pending_block_world_tx(Some(tx)); + self.db.set_last_pending_block_world_tx(Some(tx))?; } - self.db.execute().await?; - return Ok(()); + return Ok(EngineHead { + block_number: data.block_number - 1, + last_pending_block_tx, + last_pending_block_world_tx, + }); } _ => { error!(target: LOG_TARGET, error = %e, transaction_hash = %format!("{:#x}", transaction_hash), "Processing pending transaction."); @@ -441,22 +446,24 @@ impl Engine

{ // Set the head to the last processed pending transaction // Head block number should still be latest block number - self.db.set_head(data.block_number - 1); + self.db.set_head(data.block_number - 1)?; if let Some(tx) = last_pending_block_tx { - self.db.set_last_pending_block_tx(Some(tx)); + self.db.set_last_pending_block_tx(Some(tx))?; } if let Some(tx) = last_pending_block_world_tx { - self.db.set_last_pending_block_world_tx(Some(tx)); + self.db.set_last_pending_block_world_tx(Some(tx))?; } - self.db.execute().await?; - - Ok(()) + Ok(EngineHead { + block_number: data.block_number - 1, + last_pending_block_world_tx, + last_pending_block_tx, + }) } - pub async fn process_range(&mut self, data: FetchRangeResult) -> Result<()> { + pub async fn process_range(&mut self, data: FetchRangeResult) -> Result { // Process all transactions let mut last_block = 0; for ((block_number, transaction_hash), events) in data.transactions { @@ -486,22 +493,20 @@ impl Engine

{ self.process_block(block_number, data.blocks[&block_number]).await?; last_block = block_number; } - - if self.db.query_queue.queue.len() >= QUERY_QUEUE_BATCH_SIZE { - self.db.execute().await?; - } } // Process parallelized events self.process_tasks().await?; - self.db.set_head(data.latest_block_number); - self.db.set_last_pending_block_world_tx(None); - self.db.set_last_pending_block_tx(None); + self.db.set_head(data.latest_block_number)?; + self.db.set_last_pending_block_world_tx(None)?; + self.db.set_last_pending_block_tx(None)?; - self.db.execute().await?; - - Ok(()) + Ok(EngineHead { + block_number: data.latest_block_number, + last_pending_block_tx: None, + last_pending_block_world_tx: None, + }) } async fn process_tasks(&mut self) -> Result<()> { @@ -509,15 +514,15 @@ impl Engine

{ let semaphore = Arc::new(Semaphore::new(self.config.max_concurrent_tasks)); // Run all tasks concurrently - let mut set = JoinSet::new(); + let mut handles = Vec::new(); for (task_id, events) in self.tasks.drain() { let db = self.db.clone(); let world = self.world.clone(); let processors = self.processors.clone(); let semaphore = semaphore.clone(); - set.spawn(async move { - let _permit = semaphore.acquire().await.unwrap(); + handles.push(tokio::spawn(async move { + let _permit = semaphore.acquire().await?; let mut local_db = db.clone(); for ParallelizedEvent { event_id, event, block_number, block_timestamp } in events { if let Some(processor) = processors.event.get(&event.keys[0]) { @@ -531,15 +536,13 @@ impl Engine

{ } } } - Ok::<_, anyhow::Error>(local_db) - }); + + Ok::<_, anyhow::Error>(()) + })); } // Join all tasks - while let Some(result) = set.join_next().await { - let local_db = result??; - self.db.merge(local_db)?; - } + try_join_all(handles).await?; Ok(()) } @@ -688,7 +691,7 @@ impl Engine

{ transaction_hash: Felt, ) -> Result<()> { if self.config.flags.contains(IndexingFlags::RAW_EVENTS) { - self.db.store_event(event_id, event, transaction_hash, block_timestamp); + self.db.store_event(event_id, event, transaction_hash, block_timestamp)?; } let event_key = event.keys[0]; diff --git a/crates/torii/core/src/executor.rs b/crates/torii/core/src/executor.rs new file mode 100644 index 0000000000..503759e43f --- /dev/null +++ b/crates/torii/core/src/executor.rs @@ -0,0 +1,297 @@ +use std::mem; + +use anyhow::{Context, Result}; +use dojo_types::schema::{Struct, Ty}; +use sqlx::query::Query; +use sqlx::sqlite::SqliteArguments; +use sqlx::{FromRow, Pool, Sqlite, Transaction}; +use starknet::core::types::Felt; +use tokio::sync::broadcast::{Receiver, Sender}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tokio::sync::oneshot; +use tokio::time::Instant; +use tracing::{debug, error}; + +use crate::simple_broker::SimpleBroker; +use crate::types::{ + Entity as EntityUpdated, Event as EventEmitted, EventMessage as EventMessageUpdated, + Model as ModelRegistered, +}; + +pub(crate) const LOG_TARGET: &str = "torii_core::executor"; + +#[derive(Debug, Clone)] +pub enum Argument { + Null, + Int(i64), + Bool(bool), + String(String), + FieldElement(Felt), +} + +#[derive(Debug, Clone)] +pub enum BrokerMessage { + ModelRegistered(ModelRegistered), + EntityUpdated(EntityUpdated), + EventMessageUpdated(EventMessageUpdated), + EventEmitted(EventEmitted), +} + +#[derive(Debug, Clone)] +pub struct DeleteEntityQuery { + pub entity_id: String, + pub event_id: String, + pub block_timestamp: String, + pub ty: Ty, +} + +#[derive(Debug, Clone)] +pub enum QueryType { + SetEntity(Ty), + DeleteEntity(DeleteEntityQuery), + EventMessage(Ty), + RegisterModel, + StoreEvent, + Execute, + Other, +} + +#[derive(Debug)] +pub struct Executor<'c> { + pool: Pool, + transaction: Transaction<'c, Sqlite>, + publish_queue: Vec, + rx: UnboundedReceiver, + shutdown_rx: Receiver<()>, +} + +#[derive(Debug)] +pub struct QueryMessage { + pub statement: String, + pub arguments: Vec, + pub query_type: QueryType, + tx: Option>>, +} + +impl QueryMessage { + pub fn new(statement: String, arguments: Vec, query_type: QueryType) -> Self { + Self { statement, arguments, query_type, tx: None } + } + + pub fn new_recv( + statement: String, + arguments: Vec, + query_type: QueryType, + ) -> (Self, oneshot::Receiver>) { + let (tx, rx) = oneshot::channel(); + (Self { statement, arguments, query_type, tx: Some(tx) }, rx) + } + + pub fn other(statement: String, arguments: Vec) -> Self { + Self { statement, arguments, query_type: QueryType::Other, tx: None } + } + + pub fn other_recv( + statement: String, + arguments: Vec, + ) -> (Self, oneshot::Receiver>) { + let (tx, rx) = oneshot::channel(); + (Self { statement, arguments, query_type: QueryType::Other, tx: Some(tx) }, rx) + } + + pub fn execute() -> Self { + Self { + statement: "".to_string(), + arguments: vec![], + query_type: QueryType::Execute, + tx: None, + } + } + + pub fn execute_recv() -> (Self, oneshot::Receiver>) { + let (tx, rx) = oneshot::channel(); + ( + Self { + statement: "".to_string(), + arguments: vec![], + query_type: QueryType::Execute, + tx: Some(tx), + }, + rx, + ) + } +} + +impl<'c> Executor<'c> { + pub async fn new( + pool: Pool, + shutdown_tx: Sender<()>, + ) -> Result<(Self, UnboundedSender)> { + let (tx, rx) = unbounded_channel(); + let transaction = pool.begin().await?; + let publish_queue = Vec::new(); + let shutdown_rx = shutdown_tx.subscribe(); + + Ok((Executor { pool, transaction, publish_queue, rx, shutdown_rx }, tx)) + } + + pub async fn run(&mut self) -> Result<()> { + loop { + tokio::select! { + _ = self.shutdown_rx.recv() => { + debug!(target: LOG_TARGET, "Shutting down executor"); + break Ok(()); + } + Some(msg) = self.rx.recv() => { + let QueryMessage { statement, arguments, query_type, tx } = msg; + let mut query = sqlx::query(&statement); + + for arg in &arguments { + query = match arg { + Argument::Null => query.bind(None::), + Argument::Int(integer) => query.bind(integer), + Argument::Bool(bool) => query.bind(bool), + Argument::String(string) => query.bind(string), + Argument::FieldElement(felt) => query.bind(format!("{:#x}", felt)), + } + } + + match self.handle_query_type(query, query_type.clone(), &statement, &arguments, tx).await { + Ok(()) => {}, + Err(e) => { + error!(target: LOG_TARGET, r#type = ?query_type, error = %e, "Failed to execute query."); + } + } + } + } + } + } + + async fn handle_query_type<'a>( + &mut self, + query: Query<'a, Sqlite, SqliteArguments<'a>>, + query_type: QueryType, + statement: &str, + arguments: &[Argument], + sender: Option>>, + ) -> Result<()> { + let tx = &mut self.transaction; + + match query_type { + QueryType::SetEntity(entity) => { + let row = query.fetch_one(&mut **tx).await.with_context(|| { + format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) + })?; + let mut entity_updated = EntityUpdated::from_row(&row)?; + entity_updated.updated_model = Some(entity); + entity_updated.deleted = false; + let broker_message = BrokerMessage::EntityUpdated(entity_updated); + self.publish_queue.push(broker_message); + } + QueryType::DeleteEntity(entity) => { + let delete_model = query.execute(&mut **tx).await.with_context(|| { + format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) + })?; + if delete_model.rows_affected() == 0 { + return Ok(()); + } + + let row = sqlx::query( + "UPDATE entities SET updated_at=CURRENT_TIMESTAMP, executed_at=?, event_id=? \ + WHERE id = ? RETURNING *", + ) + .bind(entity.block_timestamp) + .bind(entity.event_id) + .bind(entity.entity_id) + .fetch_one(&mut **tx) + .await?; + let mut entity_updated = EntityUpdated::from_row(&row)?; + entity_updated.updated_model = + Some(Ty::Struct(Struct { name: entity.ty.name(), children: vec![] })); + + let count = sqlx::query_scalar::<_, i64>( + "SELECT count(*) FROM entity_model WHERE entity_id = ?", + ) + .bind(entity_updated.id.clone()) + .fetch_one(&mut **tx) + .await?; + + // Delete entity if all of its models are deleted + if count == 0 { + sqlx::query("DELETE FROM entities WHERE id = ?") + .bind(entity_updated.id.clone()) + .execute(&mut **tx) + .await?; + entity_updated.deleted = true; + } + + let broker_message = BrokerMessage::EntityUpdated(entity_updated); + self.publish_queue.push(broker_message); + } + QueryType::RegisterModel => { + let row = query.fetch_one(&mut **tx).await.with_context(|| { + format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) + })?; + let model_registered = ModelRegistered::from_row(&row)?; + self.publish_queue.push(BrokerMessage::ModelRegistered(model_registered)); + } + QueryType::EventMessage(entity) => { + let row = query.fetch_one(&mut **tx).await.with_context(|| { + format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) + })?; + let mut event_message = EventMessageUpdated::from_row(&row)?; + event_message.updated_model = Some(entity); + let broker_message = BrokerMessage::EventMessageUpdated(event_message); + self.publish_queue.push(broker_message); + } + QueryType::StoreEvent => { + let row = query.fetch_one(&mut **tx).await.with_context(|| { + format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) + })?; + let event = EventEmitted::from_row(&row)?; + self.publish_queue.push(BrokerMessage::EventEmitted(event)); + } + QueryType::Execute => { + debug!(target: LOG_TARGET, "Executing query."); + let instant = Instant::now(); + let res = self.execute().await; + debug!(target: LOG_TARGET, duration = ?instant.elapsed(), "Executed query."); + + if let Some(sender) = sender { + sender + .send(res) + .map_err(|_| anyhow::anyhow!("Failed to send execute result"))?; + } else { + res?; + } + } + QueryType::Other => { + query.execute(&mut **tx).await.with_context(|| { + format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) + })?; + } + } + + Ok(()) + } + + async fn execute(&mut self) -> Result<()> { + let transaction = mem::replace(&mut self.transaction, self.pool.begin().await?); + transaction.commit().await?; + + for message in self.publish_queue.drain(..) { + send_broker_message(message); + } + + Ok(()) + } +} + +fn send_broker_message(message: BrokerMessage) { + match message { + BrokerMessage::ModelRegistered(model) => SimpleBroker::publish(model), + BrokerMessage::EntityUpdated(entity) => SimpleBroker::publish(entity), + BrokerMessage::EventMessageUpdated(event) => SimpleBroker::publish(event), + BrokerMessage::EventEmitted(event) => SimpleBroker::publish(event), + } +} diff --git a/crates/torii/core/src/lib.rs b/crates/torii/core/src/lib.rs index fe7ae8f4bc..888726d903 100644 --- a/crates/torii/core/src/lib.rs +++ b/crates/torii/core/src/lib.rs @@ -3,9 +3,9 @@ pub mod cache; pub mod engine; pub mod error; +pub mod executor; pub mod model; pub mod processors; -pub mod query_queue; pub mod simple_broker; pub mod sql; pub mod types; diff --git a/crates/torii/core/src/processors/metadata_update.rs b/crates/torii/core/src/processors/metadata_update.rs index 594a32898a..4b17858d89 100644 --- a/crates/torii/core/src/processors/metadata_update.rs +++ b/crates/torii/core/src/processors/metadata_update.rs @@ -64,7 +64,7 @@ where uri = %uri_str, "Resource metadata set." ); - db.set_metadata(resource, &uri_str, block_timestamp); + db.set_metadata(resource, &uri_str, block_timestamp)?; let db = db.clone(); let resource = *resource; @@ -83,9 +83,7 @@ where async fn try_retrieve(mut db: Sql, resource: Felt, uri_str: String) { match metadata(uri_str.clone()).await { Ok((metadata, icon_img, cover_img)) => { - db.update_metadata(&resource, &uri_str, &metadata, &icon_img, &cover_img) - .await - .unwrap(); + db.update_metadata(&resource, &uri_str, &metadata, &icon_img, &cover_img).unwrap(); info!( target: LOG_TARGET, resource = %format!("{:#x}", resource), diff --git a/crates/torii/core/src/processors/store_transaction.rs b/crates/torii/core/src/processors/store_transaction.rs index 2e7056e401..101fb88093 100644 --- a/crates/torii/core/src/processors/store_transaction.rs +++ b/crates/torii/core/src/processors/store_transaction.rs @@ -21,7 +21,7 @@ impl TransactionProcessor

for StoreTran transaction: &Transaction, ) -> Result<(), Error> { let transaction_id = format!("{:#064x}:{:#x}", block_number, transaction_hash); - db.store_transaction(transaction, &transaction_id, block_timestamp); + db.store_transaction(transaction, &transaction_id, block_timestamp)?; Ok(()) } } diff --git a/crates/torii/core/src/query_queue.rs b/crates/torii/core/src/query_queue.rs deleted file mode 100644 index dd80b04e9e..0000000000 --- a/crates/torii/core/src/query_queue.rs +++ /dev/null @@ -1,187 +0,0 @@ -use std::collections::VecDeque; - -use anyhow::{Context, Result}; -use dojo_types::schema::{Struct, Ty}; -use sqlx::{FromRow, Pool, Sqlite}; -use starknet::core::types::Felt; - -use crate::simple_broker::SimpleBroker; -use crate::types::{ - Entity as EntityUpdated, Event as EventEmitted, EventMessage as EventMessageUpdated, - Model as ModelRegistered, -}; - -#[derive(Debug, Clone)] -pub enum Argument { - Null, - Int(i64), - Bool(bool), - String(String), - FieldElement(Felt), -} - -#[derive(Debug, Clone)] -pub enum BrokerMessage { - ModelRegistered(ModelRegistered), - EntityUpdated(EntityUpdated), - EventMessageUpdated(EventMessageUpdated), - EventEmitted(EventEmitted), -} - -#[derive(Debug, Clone)] -pub struct QueryQueue { - pool: Pool, - pub queue: VecDeque<(String, Vec, QueryType)>, -} - -#[derive(Debug, Clone)] -pub struct DeleteEntityQuery { - pub entity_id: String, - pub event_id: String, - pub block_timestamp: String, - pub ty: Ty, -} - -#[derive(Debug, Clone)] -pub enum QueryType { - SetEntity(Ty), - DeleteEntity(DeleteEntityQuery), - EventMessage(Ty), - RegisterModel, - StoreEvent, - Other, -} - -impl QueryQueue { - pub fn new(pool: Pool) -> Self { - QueryQueue { pool, queue: VecDeque::new() } - } - - pub fn enqueue>( - &mut self, - statement: S, - arguments: Vec, - query_type: QueryType, - ) { - self.queue.push_back((statement.into(), arguments, query_type)); - } - - pub async fn execute_all(&mut self) -> Result<()> { - let mut tx = self.pool.begin().await?; - // publishes that are related to queries in the queue, they should be sent - // after the queries are executed - let mut publish_queue = VecDeque::new(); - - while let Some((statement, arguments, query_type)) = self.queue.pop_front() { - let mut query = sqlx::query(&statement); - - for arg in &arguments { - query = match arg { - Argument::Null => query.bind(None::), - Argument::Int(integer) => query.bind(integer), - Argument::Bool(bool) => query.bind(bool), - Argument::String(string) => query.bind(string), - Argument::FieldElement(felt) => query.bind(format!("{:#x}", felt)), - } - } - - match query_type { - QueryType::SetEntity(entity) => { - let row = query.fetch_one(&mut *tx).await.with_context(|| { - format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) - })?; - let mut entity_updated = EntityUpdated::from_row(&row)?; - entity_updated.updated_model = Some(entity); - entity_updated.deleted = false; - let broker_message = BrokerMessage::EntityUpdated(entity_updated); - publish_queue.push_back(broker_message); - } - QueryType::DeleteEntity(entity) => { - let delete_model = query.execute(&mut *tx).await.with_context(|| { - format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) - })?; - if delete_model.rows_affected() == 0 { - continue; - } - - let row = sqlx::query( - "UPDATE entities SET updated_at=CURRENT_TIMESTAMP, executed_at=?, \ - event_id=? WHERE id = ? RETURNING *", - ) - .bind(entity.block_timestamp) - .bind(entity.event_id) - .bind(entity.entity_id) - .fetch_one(&mut *tx) - .await?; - let mut entity_updated = EntityUpdated::from_row(&row)?; - entity_updated.updated_model = - Some(Ty::Struct(Struct { name: entity.ty.name(), children: vec![] })); - - let count = sqlx::query_scalar::<_, i64>( - "SELECT count(*) FROM entity_model WHERE entity_id = ?", - ) - .bind(entity_updated.id.clone()) - .fetch_one(&mut *tx) - .await?; - - // Delete entity if all of its models are deleted - if count == 0 { - sqlx::query("DELETE FROM entities WHERE id = ?") - .bind(entity_updated.id.clone()) - .execute(&mut *tx) - .await?; - entity_updated.deleted = true; - } - - let broker_message = BrokerMessage::EntityUpdated(entity_updated); - publish_queue.push_back(broker_message); - } - QueryType::RegisterModel => { - let row = query.fetch_one(&mut *tx).await.with_context(|| { - format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) - })?; - let model_registered = ModelRegistered::from_row(&row)?; - publish_queue.push_back(BrokerMessage::ModelRegistered(model_registered)); - } - QueryType::EventMessage(entity) => { - let row = query.fetch_one(&mut *tx).await.with_context(|| { - format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) - })?; - let mut event_message = EventMessageUpdated::from_row(&row)?; - event_message.updated_model = Some(entity); - let broker_message = BrokerMessage::EventMessageUpdated(event_message); - publish_queue.push_back(broker_message); - } - QueryType::StoreEvent => { - let row = query.fetch_one(&mut *tx).await.with_context(|| { - format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) - })?; - let event = EventEmitted::from_row(&row)?; - publish_queue.push_back(BrokerMessage::EventEmitted(event)); - } - QueryType::Other => { - query.execute(&mut *tx).await.with_context(|| { - format!("Failed to execute query: {:?}, args: {:?}", statement, arguments) - })?; - } - } - } - - tx.commit().await?; - - while let Some(message) = publish_queue.pop_front() { - send_broker_message(message); - } - - Ok(()) - } -} - -fn send_broker_message(message: BrokerMessage) { - match message { - BrokerMessage::ModelRegistered(model) => SimpleBroker::publish(model), - BrokerMessage::EntityUpdated(entity) => SimpleBroker::publish(entity), - BrokerMessage::EventMessageUpdated(event) => SimpleBroker::publish(event), - BrokerMessage::EventEmitted(event) => SimpleBroker::publish(event), - } -} diff --git a/crates/torii/core/src/sql.rs b/crates/torii/core/src/sql.rs index 8c9e0882c7..c42e447080 100644 --- a/crates/torii/core/src/sql.rs +++ b/crates/torii/core/src/sql.rs @@ -12,10 +12,10 @@ use sqlx::pool::PoolConnection; use sqlx::{Pool, Sqlite}; use starknet::core::types::{Event, Felt, InvokeTransaction, Transaction}; use starknet_crypto::poseidon_hash_many; -use tracing::{debug, warn}; +use tokio::sync::mpsc::UnboundedSender; use crate::cache::{Model, ModelCache}; -use crate::query_queue::{Argument, DeleteEntityQuery, QueryQueue, QueryType}; +use crate::executor::{Argument, DeleteEntityQuery, QueryMessage, QueryType}; use crate::utils::utc_dt_string_from_timestamp; type IsEventMessage = bool; @@ -28,67 +28,43 @@ pub const FELT_DELIMITER: &str = "/"; #[path = "sql_test.rs"] mod test; -#[derive(Debug)] +#[derive(Debug, Clone)] pub struct Sql { world_address: Felt, pub pool: Pool, - pub query_queue: QueryQueue, + pub executor: UnboundedSender, model_cache: Arc, } -impl Clone for Sql { - fn clone(&self) -> Self { - Self { - world_address: self.world_address, - pool: self.pool.clone(), - query_queue: QueryQueue::new(self.pool.clone()), - model_cache: self.model_cache.clone(), - } - } -} - impl Sql { - pub async fn new(pool: Pool, world_address: Felt) -> Result { - let mut query_queue = QueryQueue::new(pool.clone()); - - query_queue.enqueue( + pub async fn new( + pool: Pool, + world_address: Felt, + executor: UnboundedSender, + ) -> Result { + executor.send(QueryMessage::other( "INSERT OR IGNORE INTO contracts (id, contract_address, contract_type) VALUES (?, ?, \ - ?)", + ?)" + .to_string(), vec![ Argument::FieldElement(world_address), Argument::FieldElement(world_address), Argument::String(WORLD_CONTRACT_TYPE.to_string()), ], - QueryType::Other, - ); + ))?; - query_queue.execute_all().await?; - - Ok(Self { + let db = Self { pool: pool.clone(), world_address, - query_queue, + executor, model_cache: Arc::new(ModelCache::new(pool)), - }) - } - - pub fn merge(&mut self, other: Sql) -> Result<()> { - // Merge query queue - self.query_queue.queue.extend(other.query_queue.queue); - - // This should never happen - if self.world_address != other.world_address { - warn!( - "Merging Sql instances with different world addresses: {} and {}", - self.world_address, other.world_address - ); - } + }; + db.execute().await?; - Ok(()) + Ok(db) } pub async fn head(&self) -> Result<(u64, Option, Option)> { - let mut conn: PoolConnection = self.pool.acquire().await?; let indexer_query = sqlx::query_as::<_, (Option, Option, Option, String)>( "SELECT head, last_pending_block_world_tx, last_pending_block_tx, contract_type \ @@ -97,25 +73,38 @@ impl Sql { .bind(format!("{:#x}", self.world_address)); let indexer: (Option, Option, Option, String) = - indexer_query.fetch_one(&mut *conn).await?; + indexer_query.fetch_one(&self.pool).await?; + Ok(( - indexer.0.map(|h| h.try_into().expect("doesn't fit in u64")).unwrap_or(0), + indexer + .0 + .map(|h| h.try_into().map_err(|_| anyhow!("Head value {} doesn't fit in u64", h))) + .transpose()? + .unwrap_or(0), indexer.1.map(|f| Felt::from_str(&f)).transpose()?, indexer.2.map(|f| Felt::from_str(&f)).transpose()?, )) } - pub fn set_head(&mut self, head: u64) { - let head = Argument::Int(head.try_into().expect("doesn't fit in u64")); - let id = Argument::FieldElement(self.world_address); - self.query_queue.enqueue( - "UPDATE contracts SET head = ? WHERE id = ?", - vec![head, id], - QueryType::Other, + pub fn set_head(&mut self, head: u64) -> Result<()> { + let head = Argument::Int( + head.try_into().map_err(|_| anyhow!("Head value {} doesn't fit in i64", head))?, ); + let id = Argument::FieldElement(self.world_address); + self.executor + .send(QueryMessage::other( + "UPDATE contracts SET head = ? WHERE id = ?".to_string(), + vec![head, id], + )) + .map_err(|e| anyhow!("Failed to send set_head message: {}", e))?; + + Ok(()) } - pub fn set_last_pending_block_world_tx(&mut self, last_pending_block_world_tx: Option) { + pub fn set_last_pending_block_world_tx( + &mut self, + last_pending_block_world_tx: Option, + ) -> Result<()> { let last_pending_block_world_tx = if let Some(f) = last_pending_block_world_tx { Argument::String(format!("{:#x}", f)) } else { @@ -124,14 +113,15 @@ impl Sql { let id = Argument::FieldElement(self.world_address); - self.query_queue.enqueue( - "UPDATE contracts SET last_pending_block_world_tx = ? WHERE id = ?", + self.executor.send(QueryMessage::other( + "UPDATE contracts SET last_pending_block_world_tx = ? WHERE id = ?".to_string(), vec![last_pending_block_world_tx, id], - QueryType::Other, - ); + ))?; + + Ok(()) } - pub fn set_last_pending_block_tx(&mut self, last_pending_block_tx: Option) { + pub fn set_last_pending_block_tx(&mut self, last_pending_block_tx: Option) -> Result<()> { let last_pending_block_tx = if let Some(f) = last_pending_block_tx { Argument::String(format!("{:#x}", f)) } else { @@ -139,11 +129,12 @@ impl Sql { }; let id = Argument::FieldElement(self.world_address); - self.query_queue.enqueue( - "UPDATE contracts SET last_pending_block_tx = ? WHERE id = ?", + self.executor.send(QueryMessage::other( + "UPDATE contracts SET last_pending_block_tx = ? WHERE id = ?".to_string(), vec![last_pending_block_tx, id], - QueryType::Other, - ); + ))?; + + Ok(()) } #[allow(clippy::too_many_arguments)] @@ -168,7 +159,6 @@ impl Sql { class_hash=EXCLUDED.class_hash, layout=EXCLUDED.layout, \ packed_size=EXCLUDED.packed_size, unpacked_size=EXCLUDED.unpacked_size, \ executed_at=EXCLUDED.executed_at RETURNING *"; - let arguments = vec![ Argument::String(format!("{:#x}", selector)), Argument::String(namespace.to_string()), @@ -180,8 +170,11 @@ impl Sql { Argument::Int(unpacked_size as i64), Argument::String(utc_dt_string_from_timestamp(block_timestamp)), ]; - - self.query_queue.enqueue(insert_models, arguments, QueryType::RegisterModel); + self.executor.send(QueryMessage::new( + insert_models.to_string(), + arguments, + QueryType::RegisterModel, + ))?; let mut model_idx = 0_i64; self.build_register_queries_recursive( @@ -192,7 +185,7 @@ impl Sql { block_timestamp, &mut 0, &mut 0, - ); + )?; // we set the model in the cache directly // because entities might be using it before the query queue is processed @@ -255,14 +248,18 @@ impl Sql { arguments.push(Argument::String(keys.to_string())); } - self.query_queue.enqueue(insert_entities, arguments, QueryType::SetEntity(entity.clone())); + self.executor.send(QueryMessage::new( + insert_entities.to_string(), + arguments, + QueryType::SetEntity(entity.clone()), + ))?; - self.query_queue.enqueue( + self.executor.send(QueryMessage::other( "INSERT INTO entity_model (entity_id, model_id) VALUES (?, ?) ON CONFLICT(entity_id, \ - model_id) DO NOTHING", + model_id) DO NOTHING" + .to_string(), vec![Argument::String(entity_id.clone()), Argument::String(model_id.clone())], - QueryType::Other, - ); + ))?; let path = vec![namespaced_name]; self.build_set_entity_queries_recursive( @@ -272,7 +269,7 @@ impl Sql { (&entity, keys_str.is_none()), block_timestamp, &vec![], - ); + )?; Ok(()) } @@ -304,8 +301,8 @@ impl Sql { VALUES (?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET \ updated_at=CURRENT_TIMESTAMP, executed_at=EXCLUDED.executed_at, \ event_id=EXCLUDED.event_id RETURNING *"; - self.query_queue.enqueue( - insert_entities, + self.executor.send(QueryMessage::new( + insert_entities.to_string(), vec![ Argument::String(entity_id.clone()), Argument::String(keys_str), @@ -313,13 +310,13 @@ impl Sql { Argument::String(utc_dt_string_from_timestamp(block_timestamp)), ], QueryType::EventMessage(entity.clone()), - ); - self.query_queue.enqueue( + ))?; + self.executor.send(QueryMessage::other( "INSERT INTO event_model (entity_id, model_id) VALUES (?, ?) ON CONFLICT(entity_id, \ - model_id) DO NOTHING", + model_id) DO NOTHING" + .to_string(), vec![Argument::String(entity_id.clone()), Argument::String(model_id.clone())], - QueryType::Other, - ); + ))?; let path = vec![namespaced_name]; self.build_set_entity_queries_recursive( @@ -329,7 +326,7 @@ impl Sql { (&entity, false), block_timestamp, &vec![], - ); + )?; Ok(()) } @@ -345,10 +342,10 @@ impl Sql { let entity_id = format!("{:#x}", entity_id); let path = vec![entity.name()]; // delete entity models data - self.build_delete_entity_queries_recursive(path, &entity_id, &entity); + self.build_delete_entity_queries_recursive(path, &entity_id, &entity)?; - self.query_queue.enqueue( - "DELETE FROM entity_model WHERE entity_id = ? AND model_id = ?", + self.executor.send(QueryMessage::new( + "DELETE FROM entity_model WHERE entity_id = ? AND model_id = ?".to_string(), vec![Argument::String(entity_id.clone()), Argument::String(format!("{:#x}", model_id))], QueryType::DeleteEntity(DeleteEntityQuery { entity_id: entity_id.clone(), @@ -356,26 +353,28 @@ impl Sql { block_timestamp: utc_dt_string_from_timestamp(block_timestamp), ty: entity.clone(), }), - ); + ))?; Ok(()) } - pub fn set_metadata(&mut self, resource: &Felt, uri: &str, block_timestamp: u64) { + pub fn set_metadata(&mut self, resource: &Felt, uri: &str, block_timestamp: u64) -> Result<()> { let resource = Argument::FieldElement(*resource); let uri = Argument::String(uri.to_string()); let executed_at = Argument::String(utc_dt_string_from_timestamp(block_timestamp)); - self.query_queue.enqueue( + self.executor.send(QueryMessage::other( "INSERT INTO metadata (id, uri, executed_at) VALUES (?, ?, ?) ON CONFLICT(id) DO \ UPDATE SET id=excluded.id, executed_at=excluded.executed_at, \ - updated_at=CURRENT_TIMESTAMP", + updated_at=CURRENT_TIMESTAMP" + .to_string(), vec![resource, uri, executed_at], - QueryType::Other, - ); + ))?; + + Ok(()) } - pub async fn update_metadata( + pub fn update_metadata( &mut self, resource: &Felt, uri: &str, @@ -401,7 +400,7 @@ impl Sql { let statement = format!("UPDATE metadata SET {} WHERE id = ?", update.join(",")); arguments.push(Argument::FieldElement(*resource)); - self.query_queue.enqueue(statement, arguments, QueryType::Other); + self.executor.send(QueryMessage::other(statement, arguments))?; Ok(()) } @@ -431,13 +430,13 @@ impl Sql { transaction: &Transaction, transaction_id: &str, block_timestamp: u64, - ) { + ) -> Result<()> { let id = Argument::String(transaction_id.to_string()); let transaction_type = match transaction { Transaction::Invoke(_) => "INVOKE", Transaction::L1Handler(_) => "L1_HANDLER", - _ => return, + _ => return Ok(()), }; let (transaction_hash, sender_address, calldata, max_fee, signature, nonce) = @@ -458,13 +457,14 @@ impl Sql { Argument::String("".to_string()), // has no signature Argument::FieldElement((l1_handler_transaction.nonce).into()), ), - _ => return, + _ => return Ok(()), }; - self.query_queue.enqueue( + self.executor.send(QueryMessage::other( "INSERT OR IGNORE INTO transactions (id, transaction_hash, sender_address, calldata, \ max_fee, signature, nonce, transaction_type, executed_at) VALUES (?, ?, ?, ?, ?, ?, \ - ?, ?, ?)", + ?, ?, ?)" + .to_string(), vec![ id, transaction_hash, @@ -476,8 +476,9 @@ impl Sql { Argument::String(transaction_type.to_string()), Argument::String(utc_dt_string_from_timestamp(block_timestamp)), ], - QueryType::Other, - ); + ))?; + + Ok(()) } pub fn store_event( @@ -486,19 +487,22 @@ impl Sql { event: &Event, transaction_hash: Felt, block_timestamp: u64, - ) { + ) -> Result<()> { let id = Argument::String(event_id.to_string()); let keys = Argument::String(felts_sql_string(&event.keys)); let data = Argument::String(felts_sql_string(&event.data)); let hash = Argument::FieldElement(transaction_hash); let executed_at = Argument::String(utc_dt_string_from_timestamp(block_timestamp)); - self.query_queue.enqueue( + self.executor.send(QueryMessage::new( "INSERT OR IGNORE INTO events (id, keys, data, transaction_hash, executed_at) VALUES \ - (?, ?, ?, ?, ?) RETURNING *", + (?, ?, ?, ?, ?) RETURNING *" + .to_string(), vec![id, keys, data, hash, executed_at], QueryType::StoreEvent, - ); + ))?; + + Ok(()) } #[allow(clippy::too_many_arguments)] @@ -511,11 +515,11 @@ impl Sql { block_timestamp: u64, array_idx: &mut usize, parent_array_idx: &mut usize, - ) { + ) -> Result<()> { if let Ty::Enum(e) = model { if e.options.iter().all(|o| if let Ty::Tuple(t) = &o.ty { t.is_empty() } else { false }) { - return; + return Ok(()); } } @@ -527,13 +531,13 @@ impl Sql { block_timestamp, *array_idx, *parent_array_idx, - ); + )?; - let mut build_member = |pathname: &str, member: &Ty| { + let mut build_member = |pathname: &str, member: &Ty| -> Result<()> { if let Ty::Primitive(_) = member { - return; + return Ok(()); } else if let Ty::ByteArray(_) = member { - return; + return Ok(()); } let mut path_clone = path.clone(); @@ -547,20 +551,22 @@ impl Sql { block_timestamp, &mut (*array_idx + if let Ty::Array(_) = member { 1 } else { 0 }), &mut (*parent_array_idx + if let Ty::Array(_) = model { 1 } else { 0 }), - ); + )?; + + Ok(()) }; if let Ty::Struct(s) = model { for member in s.children.iter() { - build_member(&member.name, &member.ty); + build_member(&member.name, &member.ty)?; } } else if let Ty::Tuple(t) = model { for (idx, member) in t.iter().enumerate() { - build_member(format!("_{}", idx).as_str(), member); + build_member(format!("_{}", idx).as_str(), member)?; } } else if let Ty::Array(array) = model { let ty = &array[0]; - build_member("data", ty); + build_member("data", ty)?; } else if let Ty::Enum(e) = model { for child in e.options.iter() { // Skip enum options that have no type / member @@ -570,9 +576,11 @@ impl Sql { } } - build_member(&child.name, &child.ty); + build_member(&child.name, &child.ty)?; } } + + Ok(()) } fn build_set_entity_queries_recursive( @@ -584,103 +592,107 @@ impl Sql { entity: (&Ty, IsStoreUpdate), block_timestamp: u64, indexes: &Vec, - ) { + ) -> Result<()> { let (entity_id, is_event_message) = entity_id; let (entity, is_store_update_member) = entity; - let update_members = - |members: &[Member], query_queue: &mut QueryQueue, indexes: &Vec| { - let table_id = path.join("$"); - let mut columns = vec![ - "id".to_string(), - "event_id".to_string(), - "executed_at".to_string(), - "updated_at".to_string(), - if is_event_message { - "event_message_id".to_string() - } else { - "entity_id".to_string() - }, - ]; - - let mut arguments = vec![ - Argument::String(if is_event_message { - "event:".to_string() + entity_id - } else { - entity_id.to_string() - }), - Argument::String(event_id.to_string()), - Argument::String(utc_dt_string_from_timestamp(block_timestamp)), - Argument::String(chrono::Utc::now().to_rfc3339()), - Argument::String(entity_id.to_string()), - ]; + let update_members = |members: &[Member], + executor: &mut UnboundedSender, + indexes: &Vec| + -> Result<()> { + let table_id = path.join("$"); + let mut columns = vec![ + "id".to_string(), + "event_id".to_string(), + "executed_at".to_string(), + "updated_at".to_string(), + if is_event_message { + "event_message_id".to_string() + } else { + "entity_id".to_string() + }, + ]; - if !indexes.is_empty() { - columns.push("full_array_id".to_string()); - arguments.push(Argument::String( - std::iter::once(entity_id.to_string()) - .chain(indexes.iter().map(|i| i.to_string())) - .collect::>() - .join(FELT_DELIMITER), - )); - } + let mut arguments = vec![ + Argument::String(if is_event_message { + "event:".to_string() + entity_id + } else { + entity_id.to_string() + }), + Argument::String(event_id.to_string()), + Argument::String(utc_dt_string_from_timestamp(block_timestamp)), + Argument::String(chrono::Utc::now().to_rfc3339()), + Argument::String(entity_id.to_string()), + ]; + + if !indexes.is_empty() { + columns.push("full_array_id".to_string()); + arguments.push(Argument::String( + std::iter::once(entity_id.to_string()) + .chain(indexes.iter().map(|i| i.to_string())) + .collect::>() + .join(FELT_DELIMITER), + )); + } - for (column_idx, idx) in indexes.iter().enumerate() { - columns.push(format!("idx_{}", column_idx)); - arguments.push(Argument::Int(*idx)); - } + for (column_idx, idx) in indexes.iter().enumerate() { + columns.push(format!("idx_{}", column_idx)); + arguments.push(Argument::Int(*idx)); + } - for member in members.iter() { - match &member.ty { - Ty::Primitive(ty) => { - columns.push(format!("external_{}", &member.name)); - arguments.push(Argument::String(ty.to_sql_value().unwrap())); - } - Ty::Enum(e) => { - columns.push(format!("external_{}", &member.name)); - arguments.push(Argument::String(e.to_sql_value().unwrap())); - } - Ty::ByteArray(b) => { - columns.push(format!("external_{}", &member.name)); - arguments.push(Argument::String(b.clone())); - } - _ => {} + for member in members.iter() { + match &member.ty { + Ty::Primitive(ty) => { + columns.push(format!("external_{}", &member.name)); + arguments.push(Argument::String(ty.to_sql_value().unwrap())); + } + Ty::Enum(e) => { + columns.push(format!("external_{}", &member.name)); + arguments.push(Argument::String(e.to_sql_value().unwrap())); + } + Ty::ByteArray(b) => { + columns.push(format!("external_{}", &member.name)); + arguments.push(Argument::String(b.clone())); } + _ => {} } + } - let placeholders: Vec<&str> = arguments.iter().map(|_| "?").collect(); - let statement = if is_store_update_member && indexes.is_empty() { - arguments.push(Argument::String(if is_event_message { - "event:".to_string() + entity_id - } else { - entity_id.to_string() - })); - - // row has to exist. update it directly - format!( - "UPDATE [{table_id}] SET {updates} WHERE id = ?", - table_id = table_id, - updates = columns - .iter() - .zip(placeholders.iter()) - .map(|(column, placeholder)| format!("{} = {}", column, placeholder)) - .collect::>() - .join(", ") - ) + let placeholders: Vec<&str> = arguments.iter().map(|_| "?").collect(); + let statement = if is_store_update_member && indexes.is_empty() { + arguments.push(Argument::String(if is_event_message { + "event:".to_string() + entity_id } else { - format!( - "INSERT OR REPLACE INTO [{table_id}] ({}) VALUES ({})", - columns.join(","), - placeholders.join(",") - ) - }; - - query_queue.enqueue(statement, arguments, QueryType::Other); + entity_id.to_string() + })); + + // row has to exist. update it directly + format!( + "UPDATE [{table_id}] SET {updates} WHERE id = ?", + table_id = table_id, + updates = columns + .iter() + .zip(placeholders.iter()) + .map(|(column, placeholder)| format!("{} = {}", column, placeholder)) + .collect::>() + .join(", ") + ) + } else { + format!( + "INSERT OR REPLACE INTO [{table_id}] ({}) VALUES ({})", + columns.join(","), + placeholders.join(",") + ) }; + executor.send(QueryMessage::other(statement, arguments))?; + + Ok(()) + }; + match entity { Ty::Struct(s) => { - update_members(&s.children, &mut self.query_queue, indexes); + update_members(&s.children, &mut self.executor, indexes)?; for member in s.children.iter() { let mut path_clone = path.clone(); @@ -692,7 +704,7 @@ impl Sql { (&member.ty, is_store_update_member), block_timestamp, indexes, - ); + )?; } } Ty::Enum(e) => { @@ -701,7 +713,7 @@ impl Sql { if let Ty::Tuple(t) = &o.ty { t.is_empty() } else { false } }, ) { - return; + return Ok(()); } let option = e.options[e.option.unwrap() as usize].clone(); @@ -711,9 +723,9 @@ impl Sql { Member { name: "option".to_string(), ty: Ty::Enum(e.clone()), key: false }, Member { name: option.name.clone(), ty: option.ty.clone(), key: false }, ], - &mut self.query_queue, + &mut self.executor, indexes, - ); + )?; match &option.ty { // Skip enum options that have no type / member @@ -728,7 +740,7 @@ impl Sql { (&option.ty, is_store_update_member), block_timestamp, indexes, - ); + )?; } } } @@ -743,9 +755,9 @@ impl Sql { }) .collect::>() .as_slice(), - &mut self.query_queue, + &mut self.executor, indexes, - ); + )?; for (idx, member) in t.iter().enumerate() { let mut path_clone = path.clone(); @@ -757,7 +769,7 @@ impl Sql { (member, is_store_update_member), block_timestamp, indexes, - ); + )?; } } Ty::Array(array) => { @@ -773,7 +785,7 @@ impl Sql { let mut arguments = vec![Argument::String(entity_id.to_string())]; arguments.extend(indexes.iter().map(|idx| Argument::Int(*idx))); - self.query_queue.enqueue(query, arguments, QueryType::Other); + self.executor.send(QueryMessage::other(query, arguments))?; // insert the new array elements for (idx, member) in array.iter().enumerate() { @@ -782,9 +794,9 @@ impl Sql { update_members( &[Member { name: "data".to_string(), ty: member.clone(), key: false }], - &mut self.query_queue, + &mut self.executor, &indexes, - ); + )?; let mut path_clone = path.clone(); path_clone.push("data".to_string()); @@ -795,11 +807,13 @@ impl Sql { (member, is_store_update_member), block_timestamp, &indexes, - ); + )?; } } _ => {} } + + Ok(()) } fn build_delete_entity_queries_recursive( @@ -807,20 +821,19 @@ impl Sql { path: Vec, entity_id: &str, entity: &Ty, - ) { + ) -> Result<()> { match entity { Ty::Struct(s) => { let table_id = path.join("$"); let statement = format!("DELETE FROM [{table_id}] WHERE entity_id = ?"); - self.query_queue.enqueue( + self.executor.send(QueryMessage::other( statement, vec![Argument::String(entity_id.to_string())], - QueryType::Other, - ); + ))?; for member in s.children.iter() { let mut path_clone = path.clone(); path_clone.push(member.name.clone()); - self.build_delete_entity_queries_recursive(path_clone, entity_id, &member.ty); + self.build_delete_entity_queries_recursive(path_clone, entity_id, &member.ty)?; } } Ty::Enum(e) => { @@ -828,16 +841,15 @@ impl Sql { .iter() .all(|o| if let Ty::Tuple(t) = &o.ty { t.is_empty() } else { false }) { - return; + return Ok(()); } let table_id = path.join("$"); let statement = format!("DELETE FROM [{table_id}] WHERE entity_id = ?"); - self.query_queue.enqueue( + self.executor.send(QueryMessage::other( statement, vec![Argument::String(entity_id.to_string())], - QueryType::Other, - ); + ))?; for child in e.options.iter() { if let Ty::Tuple(t) = &child.ty { @@ -848,41 +860,41 @@ impl Sql { let mut path_clone = path.clone(); path_clone.push(child.name.clone()); - self.build_delete_entity_queries_recursive(path_clone, entity_id, &child.ty); + self.build_delete_entity_queries_recursive(path_clone, entity_id, &child.ty)?; } } Ty::Array(array) => { let table_id = path.join("$"); let statement = format!("DELETE FROM [{table_id}] WHERE entity_id = ?"); - self.query_queue.enqueue( + self.executor.send(QueryMessage::other( statement, vec![Argument::String(entity_id.to_string())], - QueryType::Other, - ); + ))?; for member in array.iter() { let mut path_clone = path.clone(); path_clone.push("data".to_string()); - self.build_delete_entity_queries_recursive(path_clone, entity_id, member); + self.build_delete_entity_queries_recursive(path_clone, entity_id, member)?; } } Ty::Tuple(t) => { let table_id = path.join("$"); let statement = format!("DELETE FROM [{table_id}] WHERE entity_id = ?"); - self.query_queue.enqueue( + self.executor.send(QueryMessage::other( statement, vec![Argument::String(entity_id.to_string())], - QueryType::Other, - ); + ))?; for (idx, member) in t.iter().enumerate() { let mut path_clone = path.clone(); path_clone.push(format!("_{}", idx)); - self.build_delete_entity_queries_recursive(path_clone, entity_id, member); + self.build_delete_entity_queries_recursive(path_clone, entity_id, member)?; } } _ => {} } + + Ok(()) } #[allow(clippy::too_many_arguments)] @@ -895,7 +907,7 @@ impl Sql { block_timestamp: u64, array_idx: usize, parent_array_idx: usize, - ) { + ) -> Result<()> { let table_id = path.join("$"); let mut indices = Vec::new(); @@ -987,7 +999,7 @@ impl Sql { Argument::String(utc_dt_string_from_timestamp(block_timestamp)), ]; - self.query_queue.enqueue(statement, arguments, QueryType::Other); + self.executor.send(QueryMessage::other(statement.to_string(), arguments))?; } } Ty::Tuple(tuple) => { @@ -1015,7 +1027,7 @@ impl Sql { Argument::String(utc_dt_string_from_timestamp(block_timestamp)), ]; - self.query_queue.enqueue(statement, arguments, QueryType::Other); + self.executor.send(QueryMessage::other(statement.to_string(), arguments))?; } } Ty::Array(array) => { @@ -1040,7 +1052,7 @@ impl Sql { Argument::String(utc_dt_string_from_timestamp(block_timestamp)), ]; - self.query_queue.enqueue(statement, arguments, QueryType::Other); + self.executor.send(QueryMessage::other(statement.to_string(), arguments))?; } Ty::Enum(e) => { for (idx, child) in e @@ -1079,7 +1091,7 @@ impl Sql { Argument::String(utc_dt_string_from_timestamp(block_timestamp)), ]; - self.query_queue.enqueue(statement, arguments, QueryType::Other); + self.executor.send(QueryMessage::other(statement.to_string(), arguments))?; } } _ => {} @@ -1118,20 +1130,20 @@ impl Sql { create_table_query .push_str("FOREIGN KEY (event_message_id) REFERENCES event_messages(id));"); - self.query_queue.enqueue(create_table_query, vec![], QueryType::Other); + self.executor.send(QueryMessage::other(create_table_query, vec![]))?; - indices.iter().for_each(|s| { - self.query_queue.enqueue(s, vec![], QueryType::Other); - }); - } - - /// Execute all queries in the queue - pub async fn execute(&mut self) -> Result<()> { - debug!("Executing {} queries from the queue", self.query_queue.queue.len()); - self.query_queue.execute_all().await?; + for s in indices.iter() { + self.executor.send(QueryMessage::other(s.to_string(), vec![]))?; + } Ok(()) } + + pub async fn execute(&self) -> Result<()> { + let (execute, recv) = QueryMessage::execute_recv(); + self.executor.send(execute)?; + recv.await? + } } pub fn felts_sql_string(felts: &[Felt]) -> String { diff --git a/crates/torii/core/src/sql_test.rs b/crates/torii/core/src/sql_test.rs index f31cacbb66..499fd0adf8 100644 --- a/crates/torii/core/src/sql_test.rs +++ b/crates/torii/core/src/sql_test.rs @@ -17,9 +17,11 @@ use starknet::core::utils::{get_contract_address, get_selector_from_name}; use starknet::providers::jsonrpc::HttpTransport; use starknet::providers::{JsonRpcClient, Provider}; use starknet_crypto::poseidon_hash_many; +use tempfile::NamedTempFile; use tokio::sync::broadcast; use crate::engine::{Engine, EngineConfig, Processors}; +use crate::executor::Executor; use crate::processors::generate_event_processors_map; use crate::processors::register_model::RegisterModelProcessor; use crate::processors::store_del_record::StoreDelRecordProcessor; @@ -40,7 +42,7 @@ where let to = provider.block_hash_and_number().await?.block_number; let mut engine = Engine::new( world, - db, + db.clone(), provider, Processors { event: generate_event_processors_map(vec![ @@ -60,17 +62,14 @@ where let data = engine.fetch_range(0, to, None).await.unwrap(); engine.process_range(data).await.unwrap(); + db.execute().await.unwrap(); + Ok(engine) } #[tokio::test(flavor = "multi_thread")] #[katana_runner::test(accounts = 10, db_dir = copy_spawn_and_move_db().as_str())] async fn test_load_from_remote(sequencer: &RunnerCtx) { - let options = - SqliteConnectOptions::from_str("sqlite::memory:").unwrap().create_if_missing(true); - let pool = SqlitePoolOptions::new().max_connections(5).connect_with(options).await.unwrap(); - sqlx::migrate!("../migrations").run(&pool).await.unwrap(); - let setup = CompilerTestSetup::from_examples("../../dojo-core", "../../../examples/"); let config = setup.build_test_config("spawn-and-move", Profile::DEV); @@ -121,9 +120,34 @@ async fn test_load_from_remote(sequencer: &RunnerCtx) { TransactionWaiter::new(tx.transaction_hash, &provider).await.unwrap(); + // move + let tx = &account + .execute_v1(vec![Call { + to: actions_address, + selector: get_selector_from_name("move").unwrap(), + calldata: vec![Felt::ONE], + }]) + .send() + .await + .unwrap(); + + TransactionWaiter::new(tx.transaction_hash, &provider).await.unwrap(); + let world_reader = WorldContractReader::new(strat.world_address, Arc::clone(&provider)); - let mut db = Sql::new(pool.clone(), world_reader.address).await.unwrap(); + let tempfile = NamedTempFile::new().unwrap(); + let path = tempfile.path().to_string_lossy(); + let options = SqliteConnectOptions::from_str(&path).unwrap().create_if_missing(true); + let pool = SqlitePoolOptions::new().connect_with(options).await.unwrap(); + sqlx::migrate!("../migrations").run(&pool).await.unwrap(); + + let (shutdown_tx, _) = broadcast::channel(1); + let (mut executor, sender) = Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap(); + tokio::spawn(async move { + executor.run().await.unwrap(); + }); + + let db = Sql::new(pool.clone(), world_reader.address, sender.clone()).await.unwrap(); let _ = bootstrap_engine(world_reader, db.clone(), provider).await.unwrap(); @@ -177,6 +201,7 @@ async fn test_load_from_remote(sequencer: &RunnerCtx) { assert_eq!(unpacked_size, 0); assert_eq!(count_table("entities", &pool).await, 2); + assert_eq!(count_table("event_messages", &pool).await, 2); let (id, keys): (String, String) = sqlx::query_as( format!( @@ -191,18 +216,11 @@ async fn test_load_from_remote(sequencer: &RunnerCtx) { assert_eq!(id, format!("{:#x}", poseidon_hash_many(&[account.address()]))); assert_eq!(keys, format!("{:#x}/", account.address())); - - db.execute().await.unwrap(); } #[tokio::test(flavor = "multi_thread")] #[katana_runner::test(accounts = 10, db_dir = copy_spawn_and_move_db().as_str())] async fn test_load_from_remote_del(sequencer: &RunnerCtx) { - let options = - SqliteConnectOptions::from_str("sqlite::memory:").unwrap().create_if_missing(true); - let pool = SqlitePoolOptions::new().max_connections(5).connect_with(options).await.unwrap(); - sqlx::migrate!("../migrations").run(&pool).await.unwrap(); - let setup = CompilerTestSetup::from_examples("../../dojo-core", "../../../examples/"); let config = setup.build_test_config("spawn-and-move", Profile::DEV); @@ -280,7 +298,19 @@ async fn test_load_from_remote_del(sequencer: &RunnerCtx) { let world_reader = WorldContractReader::new(strat.world_address, Arc::clone(&provider)); - let mut db = Sql::new(pool.clone(), world_reader.address).await.unwrap(); + let tempfile = NamedTempFile::new().unwrap(); + let path = tempfile.path().to_string_lossy(); + let options = SqliteConnectOptions::from_str(&path).unwrap().create_if_missing(true); + let pool = SqlitePoolOptions::new().connect_with(options).await.unwrap(); + sqlx::migrate!("../migrations").run(&pool).await.unwrap(); + + let (shutdown_tx, _) = broadcast::channel(1); + let (mut executor, sender) = Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap(); + tokio::spawn(async move { + executor.run().await.unwrap(); + }); + + let db = Sql::new(pool.clone(), world_reader.address, sender.clone()).await.unwrap(); let _ = bootstrap_engine(world_reader, db.clone(), provider).await; @@ -290,18 +320,11 @@ async fn test_load_from_remote_del(sequencer: &RunnerCtx) { // TODO: check how we can have a test that is more chronological with Torii re-syncing // to ensure we can test intermediate states. - - db.execute().await.unwrap(); } #[tokio::test(flavor = "multi_thread")] #[katana_runner::test(accounts = 10, db_dir = copy_spawn_and_move_db().as_str())] async fn test_update_with_set_record(sequencer: &RunnerCtx) { - let options = - SqliteConnectOptions::from_str("sqlite::memory:").unwrap().create_if_missing(true); - let pool = SqlitePoolOptions::new().max_connections(5).connect_with(options).await.unwrap(); - sqlx::migrate!("../migrations").run(&pool).await.unwrap(); - let setup = CompilerTestSetup::from_examples("../../dojo-core", "../../../examples/"); let config = setup.build_test_config("spawn-and-move", Profile::DEV); @@ -367,11 +390,21 @@ async fn test_update_with_set_record(sequencer: &RunnerCtx) { let world_reader = WorldContractReader::new(strat.world_address, Arc::clone(&provider)); - let mut db = Sql::new(pool.clone(), world_reader.address).await.unwrap(); + let tempfile = NamedTempFile::new().unwrap(); + let path = tempfile.path().to_string_lossy(); + let options = SqliteConnectOptions::from_str(&path).unwrap().create_if_missing(true); + let pool = SqlitePoolOptions::new().connect_with(options).await.unwrap(); + sqlx::migrate!("../migrations").run(&pool).await.unwrap(); - let _ = bootstrap_engine(world_reader, db.clone(), Arc::clone(&provider)).await.unwrap(); + let (shutdown_tx, _) = broadcast::channel(1); + let (mut executor, sender) = Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap(); + tokio::spawn(async move { + executor.run().await.unwrap(); + }); - db.execute().await.unwrap(); + let db = Sql::new(pool.clone(), world_reader.address, sender.clone()).await.unwrap(); + + let _ = bootstrap_engine(world_reader, db.clone(), Arc::clone(&provider)).await.unwrap(); } /// Count the number of rows in a table. diff --git a/crates/torii/graphql/Cargo.toml b/crates/torii/graphql/Cargo.toml index fd7c03236e..651a076a80 100644 --- a/crates/torii/graphql/Cargo.toml +++ b/crates/torii/graphql/Cargo.toml @@ -47,4 +47,5 @@ scarb.workspace = true serial_test = "2.0.0" sozo-ops.workspace = true starknet.workspace = true +tempfile.workspace = true starknet-crypto.workspace = true diff --git a/crates/torii/graphql/src/tests/entities_test.rs b/crates/torii/graphql/src/tests/entities_test.rs index 6138aac846..efd74ab723 100644 --- a/crates/torii/graphql/src/tests/entities_test.rs +++ b/crates/torii/graphql/src/tests/entities_test.rs @@ -5,6 +5,7 @@ mod tests { use serde_json::Value; use starknet::core::types::Felt; use starknet_crypto::poseidon_hash_many; + use tempfile::NamedTempFile; use crate::schema::build_schema; use crate::tests::{ @@ -90,7 +91,9 @@ mod tests { // to run so combine all related tests into one #[tokio::test(flavor = "multi_thread")] async fn entities_test() -> Result<()> { - let pool = spinup_types_test().await?; + let tempfile = NamedTempFile::new().unwrap(); + let path = tempfile.path().to_string_lossy(); + let pool = spinup_types_test(&path).await?; let schema = build_schema(&pool).await.unwrap(); // default without params diff --git a/crates/torii/graphql/src/tests/metadata_test.rs b/crates/torii/graphql/src/tests/metadata_test.rs index 53ff0367ff..8c5e3dff63 100644 --- a/crates/torii/graphql/src/tests/metadata_test.rs +++ b/crates/torii/graphql/src/tests/metadata_test.rs @@ -4,6 +4,8 @@ mod tests { use dojo_world::metadata::WorldMetadata; use sqlx::SqlitePool; use starknet::core::types::Felt; + use tokio::sync::broadcast; + use torii_core::executor::Executor; use torii_core::sql::Sql; use crate::schema::build_schema; @@ -48,7 +50,13 @@ mod tests { #[sqlx::test(migrations = "../migrations")] async fn test_metadata(pool: SqlitePool) { - let mut db = Sql::new(pool.clone(), Felt::ZERO).await.unwrap(); + let (shutdown_tx, _) = broadcast::channel(1); + let (mut executor, sender) = + Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap(); + tokio::spawn(async move { + executor.run().await.unwrap(); + }); + let mut db = Sql::new(pool.clone(), Felt::ZERO, sender).await.unwrap(); let schema = build_schema(&pool).await.unwrap(); let cover_img = "QWxsIHlvdXIgYmFzZSBiZWxvbmcgdG8gdXM="; @@ -70,9 +78,8 @@ mod tests { // TODO: we may want to store here the namespace and the seed. Check the // implementation to actually add those to the metadata table. let world_metadata: WorldMetadata = profile_config.world.into(); - db.set_metadata(&RESOURCE, URI, BLOCK_TIMESTAMP); + db.set_metadata(&RESOURCE, URI, BLOCK_TIMESTAMP).unwrap(); db.update_metadata(&RESOURCE, URI, &world_metadata, &None, &Some(cover_img.to_string())) - .await .unwrap(); db.execute().await.unwrap(); @@ -101,10 +108,16 @@ mod tests { #[sqlx::test(migrations = "../migrations")] async fn test_empty_content(pool: SqlitePool) { - let mut db = Sql::new(pool.clone(), Felt::ZERO).await.unwrap(); + let (shutdown_tx, _) = broadcast::channel(1); + let (mut executor, sender) = + Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap(); + tokio::spawn(async move { + executor.run().await.unwrap(); + }); + let mut db = Sql::new(pool.clone(), Felt::ZERO, sender).await.unwrap(); let schema = build_schema(&pool).await.unwrap(); - db.set_metadata(&RESOURCE, URI, BLOCK_TIMESTAMP); + db.set_metadata(&RESOURCE, URI, BLOCK_TIMESTAMP).unwrap(); db.execute().await.unwrap(); let result = run_graphql_query(&schema, QUERY).await; diff --git a/crates/torii/graphql/src/tests/mod.rs b/crates/torii/graphql/src/tests/mod.rs index 133b46075e..7fe949881c 100644 --- a/crates/torii/graphql/src/tests/mod.rs +++ b/crates/torii/graphql/src/tests/mod.rs @@ -27,6 +27,7 @@ use starknet::providers::{JsonRpcClient, Provider}; use tokio::sync::broadcast; use tokio_stream::StreamExt; use torii_core::engine::{Engine, EngineConfig, Processors}; +use torii_core::executor::Executor; use torii_core::processors::generate_event_processors_map; use torii_core::processors::register_model::RegisterModelProcessor; use torii_core::processors::store_del_record::StoreDelRecordProcessor; @@ -274,11 +275,10 @@ pub async fn model_fixtures(db: &mut Sql) { db.execute().await.unwrap(); } -pub async fn spinup_types_test() -> Result { - // change sqlite::memory: to sqlite:~/.test.db to dump database to disk +pub async fn spinup_types_test(path: &str) -> Result { let options = - SqliteConnectOptions::from_str("sqlite::memory:")?.create_if_missing(true).with_regexp(); - let pool = SqlitePoolOptions::new().max_connections(5).connect_with(options).await.unwrap(); + SqliteConnectOptions::from_str(path).unwrap().create_if_missing(true).with_regexp(); + let pool = SqlitePoolOptions::new().connect_with(options).await.unwrap(); sqlx::migrate!("../migrations").run(&pool).await.unwrap(); let setup = CompilerTestSetup::from_paths("../../dojo-core", &["../types-test"]); @@ -350,12 +350,17 @@ pub async fn spinup_types_test() -> Result { let world = WorldContractReader::new(strat.world_address, Arc::clone(&provider)); - let db = Sql::new(pool.clone(), strat.world_address).await.unwrap(); + let (shutdown_tx, _) = broadcast::channel(1); + let (mut executor, sender) = Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap(); + tokio::spawn(async move { + executor.run().await.unwrap(); + }); + let db = Sql::new(pool.clone(), strat.world_address, sender).await.unwrap(); let (shutdown_tx, _) = broadcast::channel(1); let mut engine = Engine::new( world, - db, + db.clone(), Arc::clone(&provider), Processors { event: generate_event_processors_map(vec![ @@ -374,6 +379,6 @@ pub async fn spinup_types_test() -> Result { let to = account.provider().block_hash_and_number().await?.block_number; let data = engine.fetch_range(0, to, None).await.unwrap(); engine.process_range(data).await.unwrap(); - + db.execute().await.unwrap(); Ok(pool) } diff --git a/crates/torii/graphql/src/tests/models_ordering_test.rs b/crates/torii/graphql/src/tests/models_ordering_test.rs index 9b4abdf26e..42182182fa 100644 --- a/crates/torii/graphql/src/tests/models_ordering_test.rs +++ b/crates/torii/graphql/src/tests/models_ordering_test.rs @@ -3,6 +3,7 @@ mod tests { use anyhow::Result; use async_graphql::dynamic::Schema; use serde_json::Value; + use tempfile::NamedTempFile; use crate::schema::build_schema; use crate::tests::{run_graphql_query, spinup_types_test, Connection, WorldModel}; @@ -44,7 +45,9 @@ mod tests { // to run so combine all related tests into one #[tokio::test(flavor = "multi_thread")] async fn models_ordering_test() -> Result<()> { - let pool = spinup_types_test().await?; + let tempfile = NamedTempFile::new().unwrap(); + let path = tempfile.path().to_string_lossy(); + let pool = spinup_types_test(&path).await?; let schema = build_schema(&pool).await.unwrap(); // default params, test entity relationship, test nested types diff --git a/crates/torii/graphql/src/tests/models_test.rs b/crates/torii/graphql/src/tests/models_test.rs index 163d9afc41..78cd6f5458 100644 --- a/crates/torii/graphql/src/tests/models_test.rs +++ b/crates/torii/graphql/src/tests/models_test.rs @@ -6,6 +6,7 @@ mod tests { use async_graphql::dynamic::Schema; use serde_json::Value; use starknet::core::types::Felt; + use tempfile::NamedTempFile; use crate::schema::build_schema; use crate::tests::{ @@ -166,7 +167,9 @@ mod tests { #[allow(clippy::get_first)] #[tokio::test(flavor = "multi_thread")] async fn models_test() -> Result<()> { - let pool = spinup_types_test().await?; + let tempfile = NamedTempFile::new().unwrap(); + let path = tempfile.path().to_string_lossy(); + let pool = spinup_types_test(&path).await?; let schema = build_schema(&pool).await.unwrap(); // we need to order all the records because insertions are done in parallel diff --git a/crates/torii/graphql/src/tests/subscription_test.rs b/crates/torii/graphql/src/tests/subscription_test.rs index 363082878a..2e32e0c194 100644 --- a/crates/torii/graphql/src/tests/subscription_test.rs +++ b/crates/torii/graphql/src/tests/subscription_test.rs @@ -12,7 +12,8 @@ mod tests { use sqlx::SqlitePool; use starknet::core::types::Event; use starknet_crypto::{poseidon_hash_many, Felt}; - use tokio::sync::mpsc; + use tokio::sync::{broadcast, mpsc}; + use torii_core::executor::Executor; use torii_core::sql::{felts_sql_string, Sql}; use crate::tests::{model_fixtures, run_graphql_subscription}; @@ -21,7 +22,13 @@ mod tests { #[sqlx::test(migrations = "../migrations")] #[serial] async fn test_entity_subscription(pool: SqlitePool) { - let mut db = Sql::new(pool.clone(), Felt::ZERO).await.unwrap(); + let (shutdown_tx, _) = broadcast::channel(1); + let (mut executor, sender) = + Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap(); + tokio::spawn(async move { + executor.run().await.unwrap(); + }); + let mut db = Sql::new(pool.clone(), Felt::ZERO, sender).await.unwrap(); model_fixtures(&mut db).await; // 0. Preprocess expected entity value @@ -156,7 +163,13 @@ mod tests { #[sqlx::test(migrations = "../migrations")] #[serial] async fn test_entity_subscription_with_id(pool: SqlitePool) { - let mut db = Sql::new(pool.clone(), Felt::ZERO).await.unwrap(); + let (shutdown_tx, _) = broadcast::channel(1); + let (mut executor, sender) = + Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap(); + tokio::spawn(async move { + executor.run().await.unwrap(); + }); + let mut db = Sql::new(pool.clone(), Felt::ZERO, sender).await.unwrap(); model_fixtures(&mut db).await; // 0. Preprocess expected entity value @@ -271,7 +284,13 @@ mod tests { #[sqlx::test(migrations = "../migrations")] #[serial] async fn test_model_subscription(pool: SqlitePool) { - let mut db = Sql::new(pool.clone(), Felt::ZERO).await.unwrap(); + let (shutdown_tx, _) = broadcast::channel(1); + let (mut executor, sender) = + Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap(); + tokio::spawn(async move { + executor.run().await.unwrap(); + }); + let mut db = Sql::new(pool.clone(), Felt::ZERO, sender).await.unwrap(); // 0. Preprocess model value let namespace = "types_test".to_string(); let model_name = "Subrecord".to_string(); @@ -336,7 +355,13 @@ mod tests { #[sqlx::test(migrations = "../migrations")] #[serial] async fn test_model_subscription_with_id(pool: SqlitePool) { - let mut db = Sql::new(pool.clone(), Felt::ZERO).await.unwrap(); + let (shutdown_tx, _) = broadcast::channel(1); + let (mut executor, sender) = + Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap(); + tokio::spawn(async move { + executor.run().await.unwrap(); + }); + let mut db = Sql::new(pool.clone(), Felt::ZERO, sender).await.unwrap(); // 0. Preprocess model value let namespace = "types_test".to_string(); let model_name = "Subrecord".to_string(); @@ -402,7 +427,13 @@ mod tests { #[sqlx::test(migrations = "../migrations")] #[serial] async fn test_event_emitted(pool: SqlitePool) { - let mut db = Sql::new(pool.clone(), Felt::ZERO).await.unwrap(); + let (shutdown_tx, _) = broadcast::channel(1); + let (mut executor, sender) = + Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap(); + tokio::spawn(async move { + executor.run().await.unwrap(); + }); + let mut db = Sql::new(pool.clone(), Felt::ZERO, sender).await.unwrap(); let block_timestamp: u64 = 1710754478_u64; let (tx, mut rx) = mpsc::channel(7); tokio::spawn(async move { @@ -423,7 +454,8 @@ mod tests { }, Felt::ZERO, block_timestamp, - ); + ) + .unwrap(); db.execute().await.unwrap(); tx.send(()).await.unwrap(); diff --git a/crates/torii/grpc/Cargo.toml b/crates/torii/grpc/Cargo.toml index 492cc9da34..a9d4b00102 100644 --- a/crates/torii/grpc/Cargo.toml +++ b/crates/torii/grpc/Cargo.toml @@ -36,6 +36,7 @@ dojo-test-utils.workspace = true dojo-utils.workspace = true katana-runner.workspace = true scarb.workspace = true +tempfile.workspace = true [target.'cfg(target_arch = "wasm32")'.dependencies] tonic-web-wasm-client.workspace = true diff --git a/crates/torii/grpc/src/server/tests/entities_test.rs b/crates/torii/grpc/src/server/tests/entities_test.rs index 0b04574a03..d8b7b759d2 100644 --- a/crates/torii/grpc/src/server/tests/entities_test.rs +++ b/crates/torii/grpc/src/server/tests/entities_test.rs @@ -18,8 +18,10 @@ use starknet::core::utils::{get_contract_address, get_selector_from_name}; use starknet::providers::jsonrpc::HttpTransport; use starknet::providers::{JsonRpcClient, Provider}; use starknet_crypto::poseidon_hash_many; +use tempfile::NamedTempFile; use tokio::sync::broadcast; use torii_core::engine::{Engine, EngineConfig, Processors}; +use torii_core::executor::Executor; use torii_core::processors::generate_event_processors_map; use torii_core::processors::register_model::RegisterModelProcessor; use torii_core::processors::store_set_record::StoreSetRecordProcessor; @@ -32,11 +34,17 @@ use crate::types::schema::Entity; #[tokio::test(flavor = "multi_thread")] #[katana_runner::test(accounts = 10, db_dir = copy_spawn_and_move_db().as_str())] async fn test_entities_queries(sequencer: &RunnerCtx) { - let options = SqliteConnectOptions::from_str("sqlite::memory:") - .unwrap() - .create_if_missing(true) - .with_regexp(); - let pool = SqlitePoolOptions::new().max_connections(5).connect_with(options).await.unwrap(); + let tempfile = NamedTempFile::new().unwrap(); + let path = tempfile.path().to_string_lossy(); + let options = + SqliteConnectOptions::from_str(&path).unwrap().create_if_missing(true).with_regexp(); + let pool = SqlitePoolOptions::new() + .min_connections(1) + .idle_timeout(None) + .max_lifetime(None) + .connect_with(options) + .await + .unwrap(); sqlx::migrate!("../migrations").run(&pool).await.unwrap(); let setup = CompilerTestSetup::from_examples("../../dojo-core", "../../../examples/"); @@ -92,7 +100,12 @@ async fn test_entities_queries(sequencer: &RunnerCtx) { TransactionWaiter::new(tx.transaction_hash, &provider).await.unwrap(); - let db = Sql::new(pool.clone(), strat.world_address).await.unwrap(); + let (shutdown_tx, _) = broadcast::channel(1); + let (mut executor, sender) = Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap(); + tokio::spawn(async move { + executor.run().await.unwrap(); + }); + let db = Sql::new(pool.clone(), strat.world_address, sender).await.unwrap(); let (shutdown_tx, _) = broadcast::channel(1); let mut engine = Engine::new( @@ -116,6 +129,8 @@ async fn test_entities_queries(sequencer: &RunnerCtx) { let data = engine.fetch_range(0, to, None).await.unwrap(); engine.process_range(data).await.unwrap(); + db.execute().await.unwrap(); + let (_, receiver) = tokio::sync::mpsc::channel(1); let grpc = DojoWorld::new(db.pool, receiver, strat.world_address, provider.clone()); diff --git a/crates/torii/libp2p/src/server/mod.rs b/crates/torii/libp2p/src/server/mod.rs index 9bc1e25ce3..11f0489687 100644 --- a/crates/torii/libp2p/src/server/mod.rs +++ b/crates/torii/libp2p/src/server/mod.rs @@ -25,6 +25,7 @@ use starknet::core::types::{BlockId, BlockTag, Felt, FunctionCall}; use starknet::core::utils::get_selector_from_name; use starknet::providers::Provider; use starknet_crypto::poseidon_hash_many; +use torii_core::executor::QueryMessage; use torii_core::sql::{felts_sql_string, Sql}; use tracing::{info, warn}; use webrtc::tokio::Certificate; @@ -529,7 +530,7 @@ async fn set_entity( keys: &str, ) -> anyhow::Result<()> { db.set_entity(ty, message_id, block_timestamp, entity_id, model_id, Some(keys)).await?; - db.execute().await?; + db.executor.send(QueryMessage::execute())?; Ok(()) } diff --git a/crates/torii/libp2p/src/tests.rs b/crates/torii/libp2p/src/tests.rs index 7be8d30092..069f82997b 100644 --- a/crates/torii/libp2p/src/tests.rs +++ b/crates/torii/libp2p/src/tests.rs @@ -12,15 +12,9 @@ mod test { use crypto_bigint::U256; use dojo_types::primitive::Primitive; use dojo_types::schema::{Enum, EnumOption, Member, Struct, Ty}; - use dojo_world::contracts::abi::model::Layout; - use futures::StreamExt; use katana_runner::KatanaRunner; use serde_json::Number; - use sqlx::sqlite::{SqliteConnectOptions, SqlitePoolOptions}; use starknet::core::types::Felt; - use torii_core::simple_broker::SimpleBroker; - use torii_core::sql::Sql; - use torii_core::types::EventMessage; #[cfg(target_arch = "wasm32")] use wasm_bindgen_test::*; @@ -540,8 +534,11 @@ mod test { use starknet::providers::JsonRpcClient; use starknet::signers::SigningKey; use starknet_crypto::Felt; + use tempfile::NamedTempFile; use tokio::select; + use tokio::sync::broadcast; use tokio::time::sleep; + use torii_core::executor::Executor; use torii_core::sql::Sql; use crate::server::Relay; @@ -553,10 +550,18 @@ mod test { .try_init(); // Database - let options = ::from_str("sqlite::memory:") + let tempfile = NamedTempFile::new().unwrap(); + let path = tempfile.path().to_string_lossy(); + let options = ::from_str(&path) .unwrap() .create_if_missing(true); - let pool = SqlitePoolOptions::new().max_connections(5).connect_with(options).await.unwrap(); + let pool = SqlitePoolOptions::new() + .min_connections(1) + .idle_timeout(None) + .max_lifetime(None) + .connect_with(options) + .await + .unwrap(); sqlx::migrate!("../migrations").run(&pool).await.unwrap(); let sequencer = KatanaRunner::new().expect("Failed to create Katana sequencer"); @@ -565,7 +570,13 @@ mod test { let account = sequencer.account_data(0); - let mut db = Sql::new(pool.clone(), Felt::ZERO).await?; + let (shutdown_tx, _) = broadcast::channel(1); + let (mut executor, sender) = + Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap(); + tokio::spawn(async move { + executor.run().await.unwrap(); + }); + let mut db = Sql::new(pool.clone(), Felt::ZERO, sender).await.unwrap(); // Register the model of our Message db.register_model( @@ -686,57 +697,6 @@ mod test { } } - // Test to verify that setting an entity message in the SQL database - // triggers a publish event on the broker - #[tokio::test] - async fn test_entity_message_trigger_publish() -> Result<(), Box> { - let _ = tracing_subscriber::fmt() - .with_env_filter("torii::relay::client=debug,torii::relay::server=debug") - .try_init(); - - let options = ::from_str("sqlite::memory:") - .unwrap() - .create_if_missing(true); - let pool = SqlitePoolOptions::new().max_connections(5).connect_with(options).await.unwrap(); - sqlx::migrate!("../migrations").run(&pool).await.unwrap(); - - let mut db = Sql::new(pool.clone(), Felt::ZERO).await.unwrap(); - let mut broker = SimpleBroker::::subscribe(); - - let entity = Ty::Struct(Struct { name: "Message".to_string(), children: vec![] }); - db.register_model( - "test_namespace", - entity.clone(), - Layout::Fixed(vec![]), - Felt::ZERO, - Felt::ZERO, - 0, - 0, - 0, - ) - .await?; - - // FIXME: register_model and set_event_message handle the name and namespace of entity type - // differently. - let entity = - Ty::Struct(Struct { name: "test_namespace-Message".to_string(), children: vec![] }); - - // Set the event message in the database - db.set_event_message(entity, "some_entity_id", 0).await?; - db.query_queue.execute_all().await?; - - // Check if a message was published to the broker - tokio::select! { - Some(message) = broker.next() => { - println!("Received message: {:?}", message); - Ok(()) - }, - _ = tokio::time::sleep(std::time::Duration::from_secs(5)) => { - Err("Timeout: No message received".into()) - } - } - } - #[cfg(target_arch = "wasm32")] #[wasm_bindgen_test] async fn test_client_connection_wasm() -> Result<(), Box> { diff --git a/examples/spawn-and-move/manifests/dev/deployment/manifest.json b/examples/spawn-and-move/manifests/dev/deployment/manifest.json index c3d4991c46..01e006eb03 100644 --- a/examples/spawn-and-move/manifests/dev/deployment/manifest.json +++ b/examples/spawn-and-move/manifests/dev/deployment/manifest.json @@ -1234,9 +1234,9 @@ ] } ], - "address": "0x46c1fd10836a8426197bf412fc5f26ea10f11a8d5c61474407f03f82c096593", - "transaction_hash": "0x7f540b040b1638b76a7f2a8fc13a33050d1c0556a63814f319a01d022b172cf", - "block_number": 3, + "address": "0x5fedbace16902d9ca4cdc1522f9fe156cd8c69a5d25e1436ee4b7b9933ad997", + "transaction_hash": "0x4c8e0d28e32c21f29f33ff68e245b65fcc91763abf53f284cce8c2274ff6115", + "block_number": 6, "seed": "dojo_examples", "metadata": { "profile_name": "dev", diff --git a/examples/spawn-and-move/manifests/dev/deployment/manifest.toml b/examples/spawn-and-move/manifests/dev/deployment/manifest.toml index 4e1ffec809..0835e3cb36 100644 --- a/examples/spawn-and-move/manifests/dev/deployment/manifest.toml +++ b/examples/spawn-and-move/manifests/dev/deployment/manifest.toml @@ -3,9 +3,9 @@ kind = "WorldContract" class_hash = "0x6f38d5d9507c5d9546290e1a27e309efe5a9af3770b6cc1627db4a1b90a7dce" original_class_hash = "0x6f38d5d9507c5d9546290e1a27e309efe5a9af3770b6cc1627db4a1b90a7dce" abi = "manifests/dev/deployment/abis/dojo-world.json" -address = "0x46c1fd10836a8426197bf412fc5f26ea10f11a8d5c61474407f03f82c096593" -transaction_hash = "0x7f540b040b1638b76a7f2a8fc13a33050d1c0556a63814f319a01d022b172cf" -block_number = 3 +address = "0x5fedbace16902d9ca4cdc1522f9fe156cd8c69a5d25e1436ee4b7b9933ad997" +transaction_hash = "0x4c8e0d28e32c21f29f33ff68e245b65fcc91763abf53f284cce8c2274ff6115" +block_number = 6 seed = "dojo_examples" manifest_name = "dojo-world"