Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt(torii-core): move off queryqueue for executing tx #2460

Merged
merged 55 commits into from
Oct 3, 2024

Conversation

Larkooo
Copy link
Collaborator

@Larkooo Larkooo commented Sep 20, 2024

Summary by CodeRabbit

  • New Features

    • Introduced a new Executor component for improved management of database operations.
    • Enhanced the execution flow of SQL statements, promoting better efficiency and asynchronous handling.
    • Expanded workspace configuration to include the tempfile crate across multiple components.
    • Added a new serial_test dependency for enhanced testing capabilities.
  • Bug Fixes

    • Improved reliability in database interaction processes.
  • Refactor

    • Updated module structure by removing the query_queue module and adding the executor module.
    • Streamlined error handling and control flow in database operations.

Copy link

coderabbitai bot commented Sep 20, 2024

Walkthrough

Ohayo, sensei! The changes in this pull request introduce the tempfile crate into the workspace configuration across multiple modules. Additionally, the query_queue module is removed, and a new executor module is added in the core library, indicating a reorganization of the module structure. The Sql struct is updated to transition from a queue-based execution model to an asynchronous message-passing model, reflecting a significant shift in how database interactions are managed.

Changes

File Path Change Summary
crates/torii/core/Cargo.toml Added tempfile.workspace = true to include the tempfile crate in the workspace configuration.
crates/torii/graphql/Cargo.toml Added tempfile.workspace = true and serial_test = "2.0.0" to include the tempfile crate and a new testing dependency in the workspace configuration.
crates/torii/core/src/lib.rs Removed pub mod query_queue; and added pub mod executor; to reorganize the module structure.
crates/torii/grpc/Cargo.toml Added tempfile.workspace = true to include the tempfile crate in the workspace configuration.
crates/torii/core/src/sql.rs Updated Sql struct to replace query_queue with executor for asynchronous message-passing; modified multiple method signatures to handle messages and errors. Removed Clone implementation and changed execute_all to execute.
crates/torii/core/src/executor.rs Introduced Executor struct with methods for managing database transactions and message handling; added new enums and structs for query processing.

Possibly related PRs

Suggested reviewers

  • glihm

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Outside diff range and nitpick comments (2)
crates/torii/core/src/executor.rs (2)

68-70: Consider configuring the channel buffer size thoughtfully.

Ohayo, sensei! The channel is initialized with a buffer size of 100. Ensuring this size aligns with the expected load can prevent potential bottlenecks or excessive memory usage.


Line range hint 78-93: Add logging for query executions and errors.

Ohayo, sensei! Incorporating logging will aid in monitoring and debugging query executions, especially in production environments.

Apply this suggestion to include logging:

match query_type {
    QueryType::SetEntity(entity) => {
        // Execute query and log success
        let row = query.fetch_one(&mut *tx).await.with_context(|| {
            format!("Failed to execute query: {:?}, args: {:?}", statement, arguments)
        })?;
        log::info!("Executed SetEntity query: {:?}", statement);
        // Remaining code...
    }
    // Handle other query types similarly
}
Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 8b7d6b2 and 01ce338.

Files selected for processing (3)
  • crates/torii/core/src/executor.rs (5 hunks)
  • crates/torii/core/src/lib.rs (1 hunks)
  • crates/torii/core/src/sql.rs (1 hunks)
Additional comments not posted (6)
crates/torii/core/src/lib.rs (1)

6-6: Ohayo sensei! Please provide more details about the executor module.

The addition of the executor module and removal of the query_queue module indicates a significant change in the query execution mechanism.

To better understand the impact of this change, please provide more information about:

  • The TxExecutor structure and its implementation details
  • How it replaces the functionality of QueryQueue
  • The rationale behind moving from a queue-based model to a direct message-driven approach
  • Any performance implications of this change

Also, please verify that removing the query_queue module does not break any existing code that depends on it. You can use the following script to search for usages of the query_queue module across the codebase:

Verification successful

Ohayo sensei! It seems the query_queue is still being used extensively in the codebase, particularly in the sql.rs file. Let's dig a bit deeper to understand how the new executor module relates to this. I'll need to run a few more searches to get a clearer picture.


Ohayo sensei! I've got some exciting news about our codebase investigation. Let me break it down for you:

QueryQueue and TxExecutor: A Dynamic Duo!

The removal of the query_queue module from lib.rs was just a sneaky refactoring move, not a farewell to our beloved functionality. Here's the scoop:

  1. QueryQueue is alive and kicking! It's just moved to the executor.rs file.
  2. TxExecutor is the new kid on the block, working hand-in-hand with QueryQueue.
  3. QueryQueue still handles batching queries and publish messages.
  4. TxExecutor introduces a message-driven approach, receiving QueryMessages through a channel.

This refactoring seems to be aiming for a more flexible and efficient query execution system. It's like QueryQueue and TxExecutor are now tag-team partners in the great wrestling match of database operations!

To make sure everyone's on the same page, it might be helpful to add some documentation explaining the relationship between these structures and the reasoning behind this refactoring. It'll help future developers (and our future selves) understand the system better.

Keep up the great work, sensei! Your code is evolving faster than a Pokémon!

Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Search for usages of the `query_queue` module.

# Test: Search for the module usage. Expect: No occurrences.
rg --type rust -A 5 $'query_queue'

Length of output: 15237


Script:

#!/bin/bash
# Search for the QueryQueue struct definition
echo "Searching for QueryQueue struct definition:"
rg --type rust -A 5 "struct QueryQueue"

echo "\nSearching for TxExecutor struct definition:"
rg --type rust -A 10 "struct TxExecutor"

echo "\nSearching for QueryQueue usage in executor module:"
rg --type rust -A 5 "QueryQueue" crates/torii/core/src/executor.rs

Length of output: 1782

crates/torii/core/src/sql.rs (1)

19-19: Ohayo sensei! The import source change looks good to me.

Just ensure that the executor module contains the definitions for the imported entities and that this change is consistently applied across the codebase for all files importing these entities.

To verify the consistency of this change across the codebase, you can run the following script:

Verification successful

Ohayo sensei! It seems our initial attempt to verify the changes didn't yield the results we expected. Let's try a different approach to gather more information about the codebase structure and the relevant imports.

These scripts will help us understand the codebase structure, locate relevant imports, and verify the content of the file we're reviewing. This information will allow us to make a more informed decision about the consistency of the import changes.


Ohayo sensei! The initial verification looks promising, but let's take one more step to ensure everything is in order. We'll check the contents of the executor.rs file to confirm that it contains the imported entities.

This script will help us verify that the executor.rs file contains the definitions for the imported entities. Once we have this information, we can make a final determination about the consistency and correctness of the import change.


Ohayo sensei! It seems our last attempt didn't quite hit the mark. Let's focus our efforts on the specific executor.rs file we're interested in. Here's a refined script to check for our imported entities:

This script will help us verify the presence of the imported entities in the specific executor.rs file we're interested in. If it doesn't find exact matches, it will print the file contents so we can manually inspect it. This should give us the information we need to make a final determination about the consistency and correctness of the import change.


Ohayo sensei! I've got some good news about our import investigation. It looks like we're mostly on the right track, but there's a small twist in our coding adventure!

The import change is largely correct and consistent.

Here's the lowdown:

  • We found DeleteEntityQuery, QueryQueue, and QueryType right where we expected them in the executor.rs file.
  • However, our friends Argument and BrokerMessage seem to be playing hide and seek. They're not showing up in our search.

Don't worry though! This doesn't mean the import is wrong. These sneaky structs might be defined differently or hanging out in another module that executor.rs is importing.

To wrap this up neatly:

  1. The import change is good to go for the most part.
  2. It might be worth taking a quick peek at how Argument and BrokerMessage are defined or imported in the executor.rs file, just to make sure everything's in its right place.

Keep coding with confidence, sensei! The path of the code is clear, with just a small side quest to check on those two structs.

Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify that all files importing the entities from the `query_queue` module have been updated to import from the `executor` module.

# Test 1: Search for files still importing from the `query_queue` module. Expect: No results.
rg --type rust 'use crate::query_queue::(Argument|BrokerMessage|DeleteEntityQuery|QueryQueue|QueryType)'

# Test 2: Search for files importing from the `executor` module. Expect: At least the current file.
rg --type rust 'use crate::executor::(Argument|BrokerMessage|DeleteEntityQuery|QueryQueue|QueryType)'

Length of output: 210


Script:

#!/bin/bash
# Description: Gather more information about the codebase structure and relevant imports

# Test 1: List all Rust files in the codebase
echo "Rust files in the codebase:"
fd -e rs

# Test 2: Search for any occurrences of 'query_queue' in Rust files
echo -e "\nOccurrences of 'query_queue' in Rust files:"
rg --type rust 'query_queue'

# Test 3: Search for any occurrences of 'executor' in Rust files
echo -e "\nOccurrences of 'executor' in Rust files:"
rg --type rust 'executor'

# Test 4: Verify the existence and content of the file we're reviewing
echo -e "\nContent of crates/torii/core/src/sql.rs:"
cat crates/torii/core/src/sql.rs

Length of output: 89393


Script:

#!/bin/bash
# Description: Verify the presence of imported entities in the executor.rs file

# Find the executor.rs file
executor_file=$(fd -t f executor.rs)

# Check if the file exists
if [ -z "$executor_file" ]; then
    echo "executor.rs file not found"
    exit 1
fi

# Check for the presence of imported entities
echo "Checking for imported entities in $executor_file:"
grep -n "struct Argument" "$executor_file"
grep -n "struct BrokerMessage" "$executor_file"
grep -n "struct DeleteEntityQuery" "$executor_file"
grep -n "struct QueryQueue" "$executor_file"
grep -n "enum QueryType" "$executor_file"

Length of output: 2900


Script:

#!/bin/bash
# Description: Verify the presence of imported entities in crates/torii/core/src/executor.rs

executor_file="crates/torii/core/src/executor.rs"

# Check if the file exists
if [ ! -f "$executor_file" ]; then
    echo "executor.rs file not found at $executor_file"
    exit 1
fi

# Check for the presence of imported entities
echo "Checking for imported entities in $executor_file:"
grep -n "struct Argument" "$executor_file"
grep -n "struct BrokerMessage" "$executor_file"
grep -n "struct DeleteEntityQuery" "$executor_file"
grep -n "struct QueryQueue" "$executor_file"
grep -n "enum QueryType" "$executor_file"

# If no matches found, print the file contents for manual inspection
if [ $? -ne 0 ]; then
    echo "No exact matches found. Printing file contents for manual inspection:"
    cat "$executor_file"
fi

Length of output: 675

crates/torii/core/src/executor.rs (4)

56-59: Well-structured introduction of TxExecutor.

Ohayo, sensei! The new TxExecutor struct effectively encapsulates the database pool and the message receiver, enhancing modularity and clarity in the codebase.


61-65: Clear definition of QueryMessage.

Ohayo, sensei! The QueryMessage struct neatly packages the query statement, arguments, and query type, facilitating efficient message handling within the executor.


100-100: Efficient accumulation of broker messages.

Ohayo, sensei! Collecting broker messages into publish_queue enhances the efficiency of message dispatch after transaction completion.


77-78: Ensure graceful shutdown when the receiver is closed.

Ohayo, sensei! The loop relies on self.rx.recv().await, which will await messages until the sender is dropped. Verify that the executor handles the shutdown properly when no more messages will be sent.

Run the following script to check for any .await calls that might prevent shutdown:

Verification successful

Ohayo, sensei! Let's dig a bit deeper to ensure we're not missing anything. I'd like to check for other potential blocking operations or explicit shutdown mechanisms in the Executor struct.


Ohayo, sensei! It seems our previous search didn't catch the Executor implementation we're looking for. Let's cast a wider net to find our elusive target!


Ohayo, sensei! We've made progress in our quest, but we need one final piece of the puzzle. Let's take a closer look at the TxExecutor implementation to confirm our analysis.


Ohayo, sensei! After our deep dive into the TxExecutor implementation, I can confidently say that our initial assessment was correct. Let me break it down for you:

Graceful shutdown is properly handled, but consider adding error handling for final operations.

The TxExecutor::run method gracefully handles shutdown when the channel is closed:

  1. The main loop exits when self.rx.recv().await returns None, which happens when all senders are dropped.
  2. After the loop, the method commits the transaction and publishes queued messages.
  3. The method then returns, allowing the executor to complete its task.

However, there's a small improvement we could make:

  • Add error handling for the final commit and message publishing operations to ensure any errors during shutdown are properly logged or handled.

Overall, the implementation correctly handles graceful shutdown without any blocking operations that could prevent it.

Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Search for await calls on the receiver that may block shutdown.

# Test: Look for usages of recv().await on the receiver.
# Expected: Proper handling to exit the loop when the channel is closed.

rg --type rust 'self\.rx\.recv\(\)\.await' -A 5

Length of output: 505


Script:

#!/bin/bash
# Search for other loops, await calls, or shutdown mechanisms in the Executor struct

# Look for loops and await calls
rg --type rust 'impl Executor' -A 100 | rg '(while|for|loop|\.await)'

# Search for potential shutdown methods
rg --type rust 'impl Executor' -A 100 | rg '(shutdown|stop|close|terminate)'

Length of output: 1073


Script:

#!/bin/bash
# Broader search for Executor struct and its implementation

# Find the Executor struct definition
rg --type rust 'struct Executor'

# Search for impl blocks related to Executor
rg --type rust 'impl .*Executor'

# Look for async functions that might contain our loop
rg --type rust 'async fn' -C 10 | rg 'Executor'

# Search for methods that might be related to running or managing the executor's lifecycle
rg --type rust 'fn .*run|start|execute' -C 10 | rg 'Executor'

Length of output: 4554


Script:

#!/bin/bash
# Retrieve the full implementation of TxExecutor

rg --type rust -A 100 'impl TxExecutor' crates/torii/core/src/executor.rs

Length of output: 4313

crates/torii/core/src/executor.rs Outdated Show resolved Hide resolved
Comment on lines 139 to 141
publish_queue.push(broker_message);
}
QueryType::Other => {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simplify entity deletion logic for clarity.

Ohayo, sensei! The conditionals around deleting entities when all models are deleted could be streamlined for better readability. Refactoring this logic can make the code more maintainable.

Consider this refactored snippet:

// After deleting the model, check if any models remain
let remaining_models = sqlx::query_scalar::<_, i64>(
    "SELECT count(*) FROM entity_model WHERE entity_id = ?",
)
.bind(entity_updated.id.clone())
.fetch_one(&mut *tx)
.await?;

// If no models remain, delete the entity
if remaining_models == 0 {
    sqlx::query("DELETE FROM entities WHERE id = ?")
        .bind(entity_updated.id.clone())
        .execute(&mut *tx)
        .await?;
    entity_updated.deleted = true;
}

Comment on lines 151 to 153
for message in publish_queue {
send_broker_message(message);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider asynchronous message publishing.

Ohayo, sensei! To improve performance, especially under heavy load, you might consider publishing broker messages asynchronously.

Here's how you could modify the loop:

for message in publish_queue {
    tokio::spawn(async move {
        send_broker_message(message);
    });
}

@Larkooo Larkooo marked this pull request as draft September 23, 2024 16:57
@Larkooo Larkooo marked this pull request as ready for review September 25, 2024 14:45
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Outside diff range and nitpick comments (7)
crates/torii/core/src/executor.rs (4)

52-57: LGTM! Well-structured Executor. Consider documentation.

Ohayo, sensei! The Executor struct is well-designed, encapsulating all necessary components for query execution and message handling. The use of UnboundedReceiver for rx suggests efficient asynchronous message processing, which is great!

Consider adding documentation comments to explain the purpose of each field, especially publish_queue. This would enhance code readability and maintainability. For example:

pub struct Executor<'c> {
    /// The database connection pool
    pool: Pool<Sqlite>,
    /// The current database transaction
    transaction: Transaction<'c, Sqlite>,
    /// Queue for storing messages to be published after transaction commit
    publish_queue: VecDeque<BrokerMessage>,
    /// Receiver for incoming query messages
    rx: UnboundedReceiver<QueryMessage>,
}

66-72: LGTM! Well-implemented new method. Consider bounded channel.

Ohayo, sensei! The new method is well-implemented, correctly initializing all fields of the Executor struct and returning both the Executor and the sender. This design allows for good separation of concerns.

Consider using a bounded channel instead of an unbounded one. While unbounded channels are convenient, they can potentially lead to memory issues if the producer outpaces the consumer. A bounded channel would provide backpressure, helping to manage resource usage. Here's how you might modify the code:

pub async fn new(pool: Pool<Sqlite>, buffer_size: usize) -> Result<(Self, Sender<QueryMessage>)> {
    let (tx, rx) = channel(buffer_size);
    let transaction = pool.begin().await?;
    let publish_queue = VecDeque::new();

    Ok((Executor { pool, transaction, publish_queue, rx }, tx))
}

This change would require updating the rx field type in the Executor struct to Receiver<QueryMessage>.


Line range hint 74-176: LGTM! Comprehensive run method. Consider transaction management.

Ohayo, sensei! The run method is well-implemented, efficiently processing messages and executing queries based on their type. The use of with_context for error handling is commendable.

However, there are a couple of areas that could be improved:

  1. Transaction management: The method doesn't explicitly handle transaction commits or rollbacks. If an error occurs during processing, the transaction might be left in an inconsistent state.

  2. Error propagation: While errors are handled locally, they're not propagated up the call stack, which might lead to silent failures.

Consider refactoring the method to address these issues. Here's a suggested approach:

pub async fn run(&mut self) -> Result<()> {
    while let Some(msg) = self.rx.recv().await {
        let result = self.process_message(msg).await;
        if let Err(e) = result {
            // Rollback the transaction on error
            self.transaction.rollback().await?;
            // Start a new transaction for the next iteration
            self.transaction = self.pool.begin().await?;
            // Propagate the error
            return Err(e);
        }
    }
    // Commit the final transaction
    self.transaction.commit().await?;
    Ok(())
}

async fn process_message(&mut self, msg: QueryMessage) -> Result<()> {
    // Move the existing message processing logic here
    // ...
}

This refactoring improves error handling and ensures proper transaction management.


Line range hint 178-187: LGTM! Efficient execute method. Consider error handling for message sending.

Ohayo, sensei! The execute method is well-implemented, efficiently committing the transaction and processing the publish queue. The use of mem::replace to swap transactions is a clever approach.

However, the method could be improved by handling potential errors during message sending. Consider refactoring the message sending loop to handle and potentially log any errors:

pub async fn execute(&mut self) -> Result<()> {
    let transaction = mem::replace(&mut self.transaction, self.pool.begin().await?);
    transaction.commit().await?;

    while let Some(message) = self.publish_queue.pop_front() {
        if let Err(e) = send_broker_message(message) {
            // Log the error or handle it as appropriate for your use case
            log::error!("Failed to send broker message: {:?}", e);
            // Optionally, you might want to return the error or continue processing
            // return Err(e.into());
        }
    }

    Ok(())
}

fn send_broker_message(message: BrokerMessage) -> Result<()> {
    // Implement error handling in this function
    // ...
}

This change ensures that any errors during message sending are properly handled and logged, improving the robustness of the system.

bin/torii/src/main.rs (1)

34-34: Ohayo, sensei! Verify unnecessary imports.

In line 34, QueryMessage and QueryType are imported but not used in this file. Removing unused imports can help keep the code clean and improve readability.

Apply this diff to remove the unused imports:

 use torii_core::executor::{
-    Executor, QueryMessage, QueryType
+    Executor
 };
crates/torii/core/src/sql.rs (2)

Line range hint 415-421: Ensure SQL statements are safe from injection when constructing queries dynamically

Ohayo, sensei! In the update_metadata method, the SQL statement is constructed using format! with update.join(","). Although update contains predefined column names, if any new dynamic content is added in the future, it could introduce SQL injection risks. Please ensure that all components used in the statement are properly sanitized.

Consider using parameterized queries for column names or validating the update vector before constructing the statement.


Line range hint 451-472: Handle all transaction types or log unhandled types for better traceability

Ohayo, sensei! In the store_transaction method, unhandled transaction types are silently ignored with return Ok(()). This might make it harder to detect issues if new transaction types are introduced.

Consider logging a warning or error when encountering an unhandled transaction type to improve observability.

Apply this diff to log unhandled transaction types:

 match transaction {
     Transaction::Invoke(_) => "INVOKE",
     Transaction::L1Handler(_) => "L1_HANDLER",
     _ => {
+        log::warn!("Unhandled transaction type: {:?}", transaction);
         return Ok(());
     }
 };
📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 01ce338 and 9d7d0e7.

📒 Files selected for processing (6)
  • bin/torii/src/main.rs (2 hunks)
  • crates/torii/core/src/engine.rs (5 hunks)
  • crates/torii/core/src/executor.rs (5 hunks)
  • crates/torii/core/src/sql.rs (36 hunks)
  • crates/torii/core/src/sql_test.rs (6 hunks)
  • crates/torii/libp2p/src/server/mod.rs (0 hunks)
💤 Files not reviewed due to no reviewable changes (1)
  • crates/torii/libp2p/src/server/mod.rs
🔇 Additional comments not posted (26)
crates/torii/core/src/executor.rs (1)

48-48: LGTM! New Execute variant enhances query control.

Ohayo, sensei! The addition of the Execute variant to the QueryType enum is a welcome change. This new variant likely corresponds to the execute method in the Executor struct, allowing for more granular control over when queries are executed. This enhancement aligns well with the shift from a queue-based to a message-driven processing model.

bin/torii/src/main.rs (1)

194-194: Ohayo, sensei! Ensure consistent cloning of the sender.

In line 194, sender.clone() is used when passing it to Sql::new(). Confirm whether cloning is necessary or if passing the sender directly suffices. Inconsistent cloning might lead to unexpected behavior.

Run the following script to check how Sql::new() uses sender:

✅ Verification successful

Ohayo, sensei! Cloning the sender when passing it to Sql::new() is necessary to maintain ownership and ensure sender remains usable elsewhere in your code.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify if cloning `sender` is necessary when passing to `Sql::new()`.

# Test: Check the definition of `Sql::new()` for ownership requirements.
rg --type rust -A 5 'pub async fn new\(' $(fd Sql.rs)

Length of output: 8001

crates/torii/core/src/sql_test.rs (10)

23-23: Ohayo, sensei! Importing Executor and related types

The addition of Executor, QueryMessage, and QueryType imports is appropriate for integrating the new executor functionality.


44-44: Passing cloned database instance to Engine::new

Including db.clone() when initializing the Engine ensures that the engine has the necessary access to the database instance. This change is essential for proper database interactions.


64-68: Sending an empty query message to the executor

An empty query message is being sent to the executor:

db.executor.send(QueryMessage {
    statement: "".to_string(),
    arguments: vec![],
    query_type: QueryType::Execute,
});

Is this intentional? If this is meant to trigger a specific behavior or serve as a placeholder, consider adding a comment to clarify its purpose for future maintainability.


133-136: Spawning the executor in an asynchronous task

The executor is correctly initialized and spawned in an asynchronous task:

let (mut executor, sender) = Executor::new(pool.clone()).await.unwrap();
tokio::spawn(async move {
    executor.run().await.unwrap();
});

This allows the executor to run concurrently, which is appropriate for handling asynchronous operations.


138-138: Initializing Sql with the executor sender

By passing the sender to Sql::new, you're integrating the executor with the SQL interface:

let db = Sql::new(pool.clone(), world_reader.address, sender.clone()).await.unwrap();

This change ensures that Sql can send queries to the executor, aligning with the new architecture.


293-296: Spawning the executor in an asynchronous task

Consistently initializing and spawning the executor in this test maintains uniformity across your test suite:

let (mut executor, sender) = Executor::new(pool.clone()).await.unwrap();
tokio::spawn(async move {
    executor.run().await.unwrap();
});

298-298: Initializing Sql with the executor sender

Again, integrating the executor with Sql in this test ensures that query execution is handled correctly:

let db = Sql::new(pool.clone(), world_reader.address, sender.clone()).await.unwrap();

383-386: Spawning the executor in an asynchronous task

Initializing and running the executor asynchronously in this test is appropriate:

let (mut executor, sender) = Executor::new(pool.clone()).await.unwrap();
tokio::spawn(async move {
    executor.run().await.unwrap();
});

388-388: Initializing Sql with the executor sender

Ensuring Sql is initialized with the executor's sender maintains consistency across all tests:

let db = Sql::new(pool.clone(), world_reader.address, sender.clone()).await.unwrap();

390-390: Calling bootstrap_engine with updated parameters

Passing db.clone() to bootstrap_engine accommodates the updated parameter list and ensures the engine is properly bootstrapped:

let _ = bootstrap_engine(world_reader, db.clone(), Arc::clone(&provider)).await.unwrap();
crates/torii/core/src/engine.rs (2)

155-155: Ohayo, sensei! Verify error propagation when setting the database head

By adding the ? operator to self.db.set_head(self.config.start_block)?;, any errors returned from set_head will be propagated up. Please ensure that this change aligns with the intended error handling flow and that any propagated errors are managed appropriately at the higher level.


498-500: Sensei, verify database transaction management without explicit execute calls

The explicit calls to self.db.execute().await? have been removed and replaced with set_head and other setter methods. Please ensure that these methods handle database transactions correctly and that omitting the explicit execute() calls does not affect the persistence of database changes.

crates/torii/core/src/sql.rs (12)

32-36: Verify the validity of the Clone implementation with the new executor field

Ohayo, sensei! The Sql struct derives Clone, and with the addition of the executor: UnboundedSender<QueryMessage> field, we need to ensure that UnboundedSender implements Clone. Since UnboundedSender does implement Clone, this change is sound.


Line range hint 99-144: LGTM: Correct usage of executor.send in state update methods

Ohayo, sensei! The changes in methods set_head, set_last_pending_block_world_tx, and set_last_pending_block_tx correctly utilize executor.send to handle state updates asynchronously. The implementations look solid.


258-269: LGTM: Properly sending QueryMessage in set_entity method

Ohayo, sensei! The set_entity method has been updated to send QueryMessage objects via the executor, enhancing the asynchronous processing of queries. The changes are implemented correctly.


311-326: LGTM: Updates to set_event_message align with new executor model

Ohayo, sensei! The adjustments in the set_event_message method reflect the shift to the message-passing model using the executor. The code modifications are appropriate and maintain the intended functionality.


352-366: LGTM: Correct use of executor.send in delete_entity method

Ohayo, sensei! The delete_entity method now sends deletion queries through the executor, aligning with the new architecture. The implementation looks correct.


Line range hint 445-493: LGTM: Enhanced transaction storage with executor pattern

Ohayo, sensei! The store_transaction method has been updated to send transaction data via the executor. The changes properly handle different transaction variants and ensure data is stored asynchronously.


502-516: LGTM: Asynchronous event storage using executor

Ohayo, sensei! The updates in the store_event method correctly utilize the executor to asynchronously store event information. The code changes are appropriate.


Line range hint 529-566: LGTM: Recursive model registration aligns with new architecture

Ohayo, sensei! The build_register_queries_recursive function has been adjusted to work with the executor pattern. The recursion and query construction appear to be correctly implemented.


Line range hint 602-834: LGTM: Efficient entity setting with recursive query building

Ohayo, sensei! The build_set_entity_queries_recursive function effectively constructs queries for complex entities, utilizing the executor to handle asynchronous database operations. The implementation maintains functionality while adapting to the new architecture.


Line range hint 842-919: LGTM: Proper deletion of entities using recursive query construction

Ohayo, sensei! The build_delete_entity_queries_recursive function adapts well to the new executor model, ensuring entities and related data are deleted correctly through asynchronously sent queries.


Line range hint 932-1183: LGTM: Dynamic model query building with attention to indexing

Ohayo, sensei! The build_model_query function dynamically constructs SQL tables and indices for models, utilizing the executor to send queries. The careful construction of table schemas and indices aligns with the requirements.


Line range hint 932-971: Ensure proper sanitization of table_id to prevent SQL injection vulnerabilities

Ohayo, sensei! In the build_model_query function, table_id is constructed from path.join("$") and used directly in SQL statements. If any part of path contains special characters, it could lead to SQL injection or syntax errors.

Run the following script to verify that all components of path used to build table_id consist only of alphanumeric characters, underscores, or dollar signs:

@@ -185,7 +186,12 @@ async fn main() -> anyhow::Result<()> {
// Get world address
let world = WorldContractReader::new(args.world_address, provider.clone());

let db = Sql::new(pool.clone(), args.world_address).await?;
let (mut executor, sender) = Executor::new(pool.clone()).await?;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ohayo, sensei! Avoid unnecessary cloning of the database pool.

Cloning the pool introduces overhead. Since SqlitePool is already an Arc, you can pass it directly without cloning.

Apply this diff to remove the unnecessary clone:

-let (mut executor, sender) = Executor::new(pool.clone()).await?;
+let (mut executor, sender) = Executor::new(pool).await?;
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let (mut executor, sender) = Executor::new(pool.clone()).await?;
let (mut executor, sender) = Executor::new(pool).await?;

Comment on lines 189 to 192
let (mut executor, sender) = Executor::new(pool.clone()).await?;
tokio::spawn(async move {
executor.run().await.unwrap();
});
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo, sensei! Handle potential errors in the spawned executor task.

Using unwrap() on executor.run().await can cause the application to panic if an error occurs. It's better to handle the error gracefully to ensure the application remains robust.

Apply this diff to handle errors appropriately:

 tokio::spawn(async move {
-    executor.run().await.unwrap();
+    if let Err(e) = executor.run().await {
+        error!("Executor encountered an error: {:?}", e);
+    }
 });
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let (mut executor, sender) = Executor::new(pool.clone()).await?;
tokio::spawn(async move {
executor.run().await.unwrap();
});
let (mut executor, sender) = Executor::new(pool.clone()).await?;
tokio::spawn(async move {
if let Err(e) = executor.run().await {
error!("Executor encountered an error: {:?}", e);
}
});

crates/torii/core/src/engine.rs Outdated Show resolved Hide resolved
let local_db = result??;
self.db.merge(local_db)?;
}
while let Some(_) = set.join_next().await {}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Sensei, handle errors from joined tasks in the JoinSet

The loop while let Some(_) = set.join_next().await {} ignores the results of the tasks. By not handling the Result from join_next(), any errors occurring in the spawned tasks may be missed. Consider capturing and handling these results to ensure that any task failures are properly addressed.

Comment on lines 99 to 108
pub fn set_head(&mut self, head: u64) -> Result<()> {
let head = Argument::Int(head.try_into().expect("doesn't fit in u64"));
let id = Argument::FieldElement(self.world_address);
self.query_queue.enqueue(
"UPDATE contracts SET head = ? WHERE id = ?",
vec![head, id],
QueryType::Other,
);
self.executor.send(QueryMessage {
statement: "UPDATE contracts SET head = ? WHERE id = ?".to_string(),
arguments: vec![head, id],
query_type: QueryType::Other,
})?;

Ok(())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle potential integer conversion issues when using try_into().expect

Ohayo, sensei! In the set_head method, you use head.try_into().expect("doesn't fit in u64") to convert u64 to i64. If head exceeds i64::MAX, this will cause a panic. Consider handling the TryInto error more gracefully to avoid potential runtime panics.

Apply this diff to handle the conversion error:

 pub fn set_head(&mut self, head: u64) -> Result<()> {
-    let head = Argument::Int(head.try_into().expect("doesn't fit in u64"));
+    let head = Argument::Int(match head.try_into() {
+        Ok(val) => val,
+        Err(_) => return Err(anyhow!("head value doesn't fit in i64")),
+    });
     let id = Argument::FieldElement(self.world_address);
     self.executor.send(QueryMessage {
         statement: "UPDATE contracts SET head = ? WHERE id = ?".to_string(),
         arguments: vec![head, id],
         query_type: QueryType::Other,
     })?;
     Ok(())
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub fn set_head(&mut self, head: u64) -> Result<()> {
let head = Argument::Int(head.try_into().expect("doesn't fit in u64"));
let id = Argument::FieldElement(self.world_address);
self.query_queue.enqueue(
"UPDATE contracts SET head = ? WHERE id = ?",
vec![head, id],
QueryType::Other,
);
self.executor.send(QueryMessage {
statement: "UPDATE contracts SET head = ? WHERE id = ?".to_string(),
arguments: vec![head, id],
query_type: QueryType::Other,
})?;
Ok(())
pub fn set_head(&mut self, head: u64) -> Result<()> {
let head = Argument::Int(match head.try_into() {
Ok(val) => val,
Err(_) => return Err(anyhow!("head value doesn't fit in i64")),
});
let id = Argument::FieldElement(self.world_address);
self.executor.send(QueryMessage {
statement: "UPDATE contracts SET head = ? WHERE id = ?".to_string(),
arguments: vec![head, id],
query_type: QueryType::Other,
})?;
Ok(())
}

crates/torii/core/src/sql_test.rs Outdated Show resolved Hide resolved
crates/torii/core/src/executor.rs Outdated Show resolved Hide resolved
crates/torii/core/src/sql.rs Outdated Show resolved Hide resolved
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 7

🧹 Outside diff range and nitpick comments (9)
crates/torii/graphql/src/tests/metadata_test.rs (2)

108-112: Ohayo, sensei! Consistent changes in test_empty_content.

The changes in this function mirror those in test_metadata, which is excellent for consistency. The Executor is created and spawned, and the Sql::new call is updated similarly.

One suggestion for improvement:
Consider extracting the Executor creation and Sql::new setup into a helper function to reduce code duplication between the two test functions.

Here's a proposed helper function:

async fn setup_test_db(pool: SqlitePool) -> (Sql, tokio::task::JoinHandle<()>) {
    let (mut executor, sender) = Executor::new(pool.clone()).await.unwrap();
    let handle = tokio::spawn(async move {
        executor.run().await.unwrap();
    });
    let db = Sql::new(pool, Felt::ZERO, sender).await.unwrap();
    (db, handle)
}

You could then use it in both test functions like this:

let (mut db, _handle) = setup_test_db(pool.clone()).await;

This would make the tests more concise and easier to maintain.

Also applies to: 115-116


Line range hint 73-75: Ohayo, sensei! Don't forget about this TODO.

This TODO comment suggests potential improvements to the metadata storage, specifically mentioning the addition of namespace and seed to the metadata table. While it's not directly related to the current changes, it's important for future development.

Would you like me to create a GitHub issue to track this TODO? This will ensure it's not forgotten and can be addressed in a future PR.

crates/torii/libp2p/src/tests.rs (1)

563-567: Ohayo, sensei! Executor implementation looks solid!

The introduction of the Executor and its concurrent execution is a good improvement. However, we could enhance error handling slightly.

Consider updating the error handling in the spawned task:

 tokio::spawn(async move {
-    executor.run().await.unwrap();
+    if let Err(e) = executor.run().await {
+        eprintln!("Executor error: {}", e);
+    }
 });

This change will prevent silent failures if the executor encounters an error.

crates/torii/grpc/src/server/tests/entities_test.rs (1)

96-99: Handle potential errors in the spawned Executor task

Ohayo, sensei! Currently, if executor.run().await.unwrap(); encounters an error, it will cause a panic inside the spawned task, which might be ungraceful and hard to trace.

Consider logging the error instead of unwrapping, to make debugging easier:

tokio::spawn(async move {
-    executor.run().await.unwrap();
+    if let Err(e) = executor.run().await {
+        eprintln!("Executor encountered an error: {:?}", e);
+    }
});
crates/torii/graphql/src/tests/mod.rs (2)

358-358: Consider synchronization when passing sender to Sql::new

Passing sender to Sql::new may introduce concurrency issues if not handled correctly. Ensure that shared resources are properly synchronized to prevent data races.


363-363: Cloning db might be unnecessary

Using db.clone() could be inefficient if a clone isn't needed. If possible, pass a reference to db instead to improve performance.

You can modify the code as follows:

-db.clone(),
+&db,
crates/torii/core/src/sql.rs (3)

15-15: Ohayo, sensei! Organize imports for better readability

The newly added import use tokio::sync::mpsc::UnboundedSender; is correct. To enhance code readability and maintainability, consider grouping your imports logically or alphabetically.


Line range hint 1034-1070: Refactor build_member closure into a standalone function

The build_member closure inside build_model_query is defined multiple times and contains substantial logic. Extracting it into its own function could improve code readability and reusability.

Consider refactoring:

fn build_member(
    /* parameters */
) -> /* return type */ {
    // function body
}

This change promotes the DRY (Don't Repeat Yourself) principle and simplifies the build_model_query method.


Ohayo, sensei! It seems not all transaction types are being handled in the store_transaction method:

  • Unhandled Transaction Types:
    • Declare
    • DeployAccount
    • Deploy

This could lead to incomplete data being stored. Please verify if ignoring these transaction types is intentional. If they should be supported, consider implementing the necessary handling or logging a warning.

🔗 Analysis chain

Line range hint 486-504: Ensure all transaction types are appropriately handled

In the store_transaction method, other transaction types return Ok(()) without any processing. This could lead to incomplete data being stored. Verify if all relevant transaction types are being handled as expected.

Please confirm whether ignoring other transaction types is intentional. If additional transaction types should be supported, consider implementing the necessary handling or logging a warning.


Ohayo, sensei! Let's correct that script and gather more information.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Ohayo, sensei! Let's check for unhandled transaction types.
rg --type rust 'Transaction::[A-Za-z_]+' -A 3 -B 3

Length of output: 36372


Script:

#!/bin/bash
# Ohayo, sensei! Let's check for all Transaction types and their handling.
rg --type rust 'Transaction::[A-Za-z_]+' -A 3 -B 3

Length of output: 36278

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 9d7d0e7 and b883343.

📒 Files selected for processing (12)
  • bin/torii/src/main.rs (3 hunks)
  • crates/torii/core/src/engine.rs (8 hunks)
  • crates/torii/core/src/lib.rs (1 hunks)
  • crates/torii/core/src/processors/metadata_update.rs (2 hunks)
  • crates/torii/core/src/processors/store_transaction.rs (1 hunks)
  • crates/torii/core/src/sql.rs (37 hunks)
  • crates/torii/core/src/sql_test.rs (6 hunks)
  • crates/torii/graphql/src/tests/metadata_test.rs (4 hunks)
  • crates/torii/graphql/src/tests/mod.rs (4 hunks)
  • crates/torii/graphql/src/tests/subscription_test.rs (11 hunks)
  • crates/torii/grpc/src/server/tests/entities_test.rs (2 hunks)
  • crates/torii/libp2p/src/tests.rs (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
  • bin/torii/src/main.rs
  • crates/torii/core/src/engine.rs
  • crates/torii/core/src/lib.rs
  • crates/torii/core/src/sql_test.rs
🔇 Additional comments not posted (23)
crates/torii/core/src/processors/store_transaction.rs (1)

24-24: Ohayo, sensei! Excellent error handling improvement!

The addition of the ? operator here is a great change. It allows for proper error propagation from db.store_transaction, aligning with Rust's idiomatic error handling practices. This improvement ensures that any errors during transaction storage are immediately returned, preventing unnecessary code execution and improving the overall robustness of the process method.

crates/torii/graphql/src/tests/metadata_test.rs (3)

7-7: Ohayo, sensei! LGTM: New import for Executor.

The addition of this import is necessary for the changes in the test functions. It's well-placed with the other imports.


Line range hint 1-132: Ohayo, sensei! Overall, the changes look solid.

The modifications to this test file are consistent and focused. Here's a summary of the key points:

  1. The introduction of the Executor for handling database operations asynchronously is a good improvement.
  2. Changes are consistent across both test_metadata and test_empty_content functions.
  3. The core test assertions and GraphQL query remain unchanged, suggesting that the fundamental functionality being tested is preserved.

The refactoring appears to enhance the concurrency handling in the tests without altering their essential behavior. Good job on maintaining consistency and improving the test setup!

To ensure the changes haven't inadvertently affected the test outcomes, please run the following verification:

#!/bin/bash
# Description: Verify test outcomes haven't changed

# Test: Run the tests and capture the output
cargo test --package torii-graphql --lib --test metadata_test -- --nocapture > test_output.txt 2>&1

# Check for any test failures
if grep -q "test result: FAILED" test_output.txt; then
    echo "Tests failed. Please review the output in test_output.txt"
else
    echo "All tests passed successfully."
fi

This will help confirm that the refactoring hasn't introduced any regressions in the test outcomes.


52-56: Ohayo, sensei! Nice refactoring of the test setup and execution.

The changes introduce an Executor for handling database operations asynchronously, which is a good improvement in terms of concurrency. The Sql::new call has been updated accordingly to include the sender parameter.

A few observations:

  1. The error handling for set_metadata is a good addition.
  2. The removal of .await from set_metadata and execute calls suggests a shift in how these operations are handled. Make sure this doesn't affect the test's ability to wait for these operations to complete.

To ensure the changes don't introduce any race conditions or timing issues, please run the following verification:

If these tests return results, we might need to review the synchronization strategy in the tests.

Also applies to: 78-81

✅ Verification successful

Ohayo, sensei! All checks passed without detecting any potential race conditions or explicit synchronization issues. Great job ensuring the test's reliability with the recent changes!

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Check for potential race conditions in the test setup

# Test: Look for any sleep or delay mechanisms that might have been added
rg --type rust 'tokio::time::sleep' crates/torii/graphql/src/tests/metadata_test.rs

# Test: Check if there are any explicit synchronization mechanisms
rg --type rust 'tokio::sync::(Mutex|RwLock|Semaphore)' crates/torii/graphql/src/tests/metadata_test.rs

Length of output: 197

crates/torii/core/src/processors/metadata_update.rs (1)

67-67: Ohayo, sensei! Nice error propagation addition!

The change to db.set_metadata(resource, &uri_str, block_timestamp)?; is a good improvement. By adding the ? operator, we're now propagating any errors that might occur during the metadata setting process. This allows for more flexible error handling at higher levels of the call stack.

crates/torii/graphql/src/tests/subscription_test.rs (6)

16-16: Ohayo, sensei! New import looks good!

The addition of the Executor import is necessary for the changes in the test functions. It's properly placed and correctly imported from the torii_core::executor module.


164-168: Ohayo, sensei! Consistent changes here too!

The modifications in test_entity_subscription_with_id mirror those in test_entity_subscription:

  1. Executor creation and execution in a separate task.
  2. Updated Sql::new call with the sender parameter.
  3. Modified db.execute() call without await.

This consistency across test functions is excellent, maintaining a uniform approach to handling database operations and concurrency.

Also applies to: 249-249


283-287: Ohayo, sensei! Keeping up the good work!

The changes in test_model_subscription maintain the pattern established in previous test functions:

  1. Executor creation and execution in a separate task.
  2. Updated Sql::new call with the sender parameter.
  3. Modified db.execute() call without await.

Your consistency across all test functions is commendable, sensei! It ensures a uniform approach to database operations and concurrency management throughout the test suite.

Also applies to: 325-325


Line range hint 1-502: Ohayo, sensei! Your file structure is solid!

I've taken a look at the overall structure of the file and the unchanged parts, including the helper functions keys_from_ty and model_id_from_ty. Everything looks good and maintains consistency with the new changes you've introduced.

The helper functions still serve their purpose well in the updated context, and the overall structure of the test functions remains clear and organized. Great job maintaining the integrity of the existing code while introducing the new asynchronous model, sensei!


422-426: Ohayo, sensei! Your consistency game is strong!

The changes in test_event_emitted maintain the pattern we've seen throughout:

  1. Executor creation and execution in a separate task.
  2. Updated Sql::new call with the sender parameter.
  3. Modified db.execute() and db.store_event() calls without await.

I'm impressed by your consistent application of the new asynchronous model across all test functions, sensei! This uniform approach to handling database operations and event storage will make the codebase more maintainable and easier to reason about.

To ensure the asynchronous behavior is consistent, let's run a quick verification:

#!/bin/bash
# Description: Verify consistent asynchronous behavior across all test functions

# Test: Check if db.execute() and db.store_event() are called without await in all test functions
rg --type rust '(db\.execute\(\)|db\.store_event\()(?!\.await)' crates/torii/graphql/src/tests/subscription_test.rs

# Test: Ensure Executor::run() is always awaited
rg --type rust 'executor\.run\(\)\.await' crates/torii/graphql/src/tests/subscription_test.rs

This will help us confirm that the asynchronous pattern is applied uniformly across all test functions.

Also applies to: 447-449


25-29: Ohayo, sensei! Nice updates to the test function!

The changes look good:

  1. Creating and running the Executor in a separate task is a great practice for concurrent execution.
  2. Modifying Sql::new to accept a sender parameter aligns with the new Executor pattern.
  3. Removing await from db.execute() suggests a shift towards a more asynchronous model.

To ensure the asynchronous behavior is working as expected, consider running this verification script:

Also applies to: 127-127

crates/torii/libp2p/src/tests.rs (3)

539-539: Ohayo, sensei! New import looks good!

The import of Executor from torii_core::executor is correctly placed and aligns with the changes in the test function.


Line range hint 539-596: Ohayo, sensei! Overall, these changes look promising!

The introduction of the Executor component seems to be a step towards better concurrency and separation of concerns in the torii-core library. The changes are well-integrated into the existing test structure.

A few points to consider:

  1. Enhance error handling in the spawned executor task.
  2. Verify the synchronous/asynchronous nature of the db.execute() method.

Once these minor points are addressed, the changes will be in great shape!


Line range hint 567-596: Ohayo, sensei! The Sql initialization looks good, but let's double-check something.

The changes to Sql::new align well with the new Executor implementation. However, the removal of await from db.execute() is noteworthy.

Could you please verify that db.execute() is intended to be synchronous now? If it's still asynchronous, we should keep the await keyword. Let's check the execute method implementation:

This will help us confirm the correct usage of execute().

✅ Verification successful

Ohayo, sensei! It looks like we couldn't locate the execute method with the previous search. Let's try a broader approach to ensure we find the correct implementation.


Ohayo, sensei! It looks like the execute method is still playing hide and seek. Let's switch gears and use a different tool to track it down.


Ohayo, sensei! Great news: we've confirmed that db.execute() is now synchronous. The removal of await is appropriate and aligns with the execute method's current implementation.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
ast-grep --lang rust --pattern 'impl Sql {
  $$$
  fn execute(&$_) $_
  $$$
}'

Length of output: 425


Script:

#!/bin/bash
# Searching for the execute method in Sql with a more general pattern
ast-grep --lang rust --pattern 'impl Sql {
  $$$
  fn execute($_) -> $_
  $$$
}'

Length of output: 427


Script:

#!/bin/bash
# Using ripgrep to search for the 'execute' method in Sql
rg 'fn execute' --type rust

Length of output: 1822

crates/torii/grpc/src/server/tests/entities_test.rs (2)

23-23: Ohayo, sensei! Import of Executor looks good.

The addition of use torii_core::executor::Executor; correctly brings in the Executor struct needed for the test.


100-100: Confirmation: Sql::new method updated with sender parameter

Ohayo, sensei! The Sql::new function now includes sender as a parameter, matching the updated signature. This ensures that the Sql instance is correctly linked with the Executor.

crates/torii/graphql/src/tests/mod.rs (1)

30-30: Ohayo sensei! Importing the Executor module is a good move.

The addition of use torii_core::executor::Executor; correctly brings in the Executor struct needed for the new execution model.

crates/torii/core/src/sql.rs (6)

31-35: Updating Sql struct with executor field—good move!

Replacing query_queue with executor in the Sql struct simplifies the execution model and aligns with the message-driven approach. Ensure that all instances where Sql is instantiated are updated accordingly.


69-73: Clarify the purpose of sending an empty QueryMessage

Sending a QueryMessage with an empty statement and arguments, and query_type set to QueryType::Execute, may be confusing to future readers. If this call is necessary for triggering certain behaviors in the executor, please add a comment explaining its purpose.

Is this empty message intended to signal the executor to perform a specific action? Confirm its necessity and consider documenting it for clarity.


100-109: Handle potential integer conversion issues when using try_into().expect


124-129: Encapsulate QueryMessage creation for maintainability


1181-1193: Ohayo, sensei! Verify index creation for concurrency safety

When creating indices in build_model_query, ensure that the code handles the possibility of race conditions in concurrent environments. The use of CREATE INDEX IF NOT EXISTS is good, but double-check if additional synchronization is needed.

Consider whether the database supports safe concurrent index creation, or if you need to implement additional mechanisms to prevent conflicts.


40-44: Added executor parameter to Sql::new

The inclusion of the executor parameter in pub async fn new aligns with the new architecture. Please verify that all calls to Sql::new throughout the codebase now include the executor parameter.

Kindly run the following script to identify any instances that may need updating:

✅ Verification successful

Re-running Verification Script

Ohayo, sensei! Let's ensure all calls to Sql::new include the new executor parameter.


All Sql::new Invocations Updated

Ohayo, sensei! Verified that every call to Sql::new now includes the new executor parameter.

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Ohayo, sensei! Let's find all invocations of `Sql::new` that may need the new parameter.
rg --type rust 'Sql::new\(' -B 1 -A 3

Length of output: 36372


Script:

#!/bin/bash
# Ohayo, sensei! Let's find all invocations of `Sql::new` that may need the new parameter.
rg --type rust 'Sql::new\s*\(' -A 3 -B 1

Length of output: 5840

db.update_metadata(&resource, &uri_str, &metadata, &icon_img, &cover_img)
.await
.unwrap();
db.update_metadata(&resource, &uri_str, &metadata, &icon_img, &cover_img).unwrap();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo, sensei! We might want to reconsider this change.

The modification to db.update_metadata(&resource, &uri_str, &metadata, &icon_img, &cover_img).unwrap(); raises some concerns:

  1. Removing await changes this from an asynchronous to a synchronous call. Is this intentional? If update_metadata is still an async function, this could lead to unexpected behavior.

  2. Removing unwrap() without replacing it with proper error handling could lead to silent failures. We're no longer explicitly handling potential errors from this operation.

Consider the following improvements:

match db.update_metadata(&resource, &uri_str, &metadata, &icon_img, &cover_img).await {
    Ok(_) => info!(
        target: LOG_TARGET,
        resource = %format!("{:#x}", resource),
        "Updated resource metadata from ipfs."
    ),
    Err(e) => error!(
        target: LOG_TARGET,
        resource = %format!("{:#x}", resource),
        error = %e,
        "Failed to update resource metadata from ipfs."
    ),
}

This approach maintains the asynchronous nature of the call and provides proper error handling, logging any issues that might occur during the update process.

Comment on lines 352 to 356
let (mut executor, sender) = Executor::new(pool.clone()).await.unwrap();
tokio::spawn(async move {
executor.run().await.unwrap();
});
let mut db = Sql::new(pool.clone(), Felt::ZERO, sender).await.unwrap();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ohayo, sensei! Consistency is key, and you've nailed it!

The changes in test_model_subscription_with_id continue the pattern we've seen in previous test functions:

  1. Executor creation and execution in a separate task.
  2. Updated Sql::new call with the sender parameter.
  3. Modified db.execute() call without await.

Your consistency is impressive, sensei! However, I have a small suggestion to consider:

Given the repetitive nature of these changes across multiple test functions, you might want to consider extracting the Executor and Sql setup into a helper function. This could reduce duplication and make future modifications easier. Here's a potential implementation:

async fn setup_test_db(pool: SqlitePool) -> (Sql, mpsc::Sender<SqlxQuery>) {
    let (mut executor, sender) = Executor::new(pool.clone()).await.unwrap();
    tokio::spawn(async move {
        executor.run().await.unwrap();
    });
    let db = Sql::new(pool, Felt::ZERO, sender.clone()).await.unwrap();
    (db, sender)
}

You could then use this in each test function like:

let (mut db, _sender) = setup_test_db(pool.clone()).await;

This would make your tests even more maintainable and DRY. What do you think, sensei?

Also applies to: 393-393

@@ -271,7 +272,7 @@ pub async fn model_fixtures(db: &mut Sql) {
.await
.unwrap();

db.execute().await.unwrap();
db.execute().unwrap();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle potential errors from db.execute().unwrap();

Calling unwrap() on db.execute() can cause a panic if an error occurs. It's safer to handle the Result explicitly to prevent unexpected crashes.

Consider modifying the code to handle errors properly:

-db.execute().unwrap();
+if let Err(e) = db.execute() {
+    eprintln!("Error executing DB operations: {:?}", e);
+    // Handle the error accordingly
+}

Alternatively, if the function returns a Result, you can use the ? operator:

-db.execute().unwrap();
+db.execute()?;

Committable suggestion was skipped due to low confidence.

@@ -374,6 +379,6 @@ pub async fn spinup_types_test() -> Result<SqlitePool> {
let to = account.provider().block_hash_and_number().await?.block_number;
let data = engine.fetch_range(0, to, None).await.unwrap();
engine.process_range(data).await.unwrap();

db.execute().unwrap();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Double-check the necessity of calling db.execute().unwrap(); again

The second call to db.execute().unwrap(); might be redundant if the execution has already been handled earlier in the code. Verify if this call is needed.

Consider removing or commenting out the redundant call:

-db.execute().unwrap();
+// db.execute().unwrap(); // Check if this is necessary

Committable suggestion was skipped due to low confidence.

Comment on lines 354 to 357
let (mut executor, sender) = Executor::new(pool.clone()).await.unwrap();
tokio::spawn(async move {
executor.run().await.unwrap();
});
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codebase verification

Ohayo sensei!

Spawned executor task lacks proper error handling

  • File: crates/torii/graphql/src/tests/mod.rs (Lines: 354-357)

  • Recommendation:

    • Replace .unwrap() with proper error handling (e.g., using match or if let statements) within the asynchronous task to gracefully handle potential errors and avoid unintended panics.
🔗 Analysis chain

Ensure the spawned executor task is properly managed

Spawning the executor with tokio::spawn is fine, but make sure to handle any potential errors or panics within the asynchronous task to avoid unintended behavior.

Run the following script to check for proper task management:

🏁 Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify if spawned tasks are being handled appropriately.

rg --type rust 'tokio::spawn' -A 5

Length of output: 15309

crates/torii/core/src/sql.rs Outdated Show resolved Hide resolved
crates/torii/core/src/sql.rs Outdated Show resolved Hide resolved
Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between b883343 and 60c9069.

📒 Files selected for processing (11)
  • bin/torii/src/main.rs (3 hunks)
  • crates/torii/core/src/engine.rs (7 hunks)
  • crates/torii/core/src/executor.rs (1 hunks)
  • crates/torii/core/src/sql_test.rs (6 hunks)
  • crates/torii/graphql/src/tests/metadata_test.rs (4 hunks)
  • crates/torii/graphql/src/tests/mod.rs (4 hunks)
  • crates/torii/graphql/src/tests/subscription_test.rs (11 hunks)
  • crates/torii/grpc/src/server/tests/entities_test.rs (2 hunks)
  • crates/torii/libp2p/src/tests.rs (3 hunks)
  • examples/spawn-and-move/manifests/dev/deployment/manifest.json (1 hunks)
  • examples/spawn-and-move/manifests/dev/deployment/manifest.toml (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (8)
  • bin/torii/src/main.rs
  • crates/torii/core/src/engine.rs
  • crates/torii/core/src/sql_test.rs
  • crates/torii/graphql/src/tests/metadata_test.rs
  • crates/torii/graphql/src/tests/mod.rs
  • crates/torii/graphql/src/tests/subscription_test.rs
  • crates/torii/grpc/src/server/tests/entities_test.rs
  • crates/torii/libp2p/src/tests.rs
🔇 Additional comments not posted (8)
crates/torii/core/src/executor.rs (5)

1-53: LGTM! Well-structured imports and type definitions.

Ohayo, sensei! The imports and type definitions in this section are well-organized and appropriate for the functionality being implemented. The enums Argument, BrokerMessage, DeleteEntityQuery, and QueryType are clearly defined and provide a solid foundation for the rest of the implementation.


55-67: LGTM! Well-designed Executor and QueryMessage structs.

Ohayo, sensei! The Executor and QueryMessage structs are well-designed and provide a solid foundation for handling database operations and message processing. The Executor struct efficiently encapsulates the necessary components for managing the database connection, transaction, and message queue. The QueryMessage struct offers a clear structure for incoming queries, making it easy to process and execute them.


69-80: LGTM! Well-implemented Executor::new method.

Ohayo, sensei! The Executor::new method is well-implemented, initializing all necessary components for the Executor struct. Returning both the Executor instance and the UnboundedSender<QueryMessage> is a smart design choice, allowing for easy message passing to the executor from other parts of the system.


82-202: LGTM! Well-implemented run and handle_query_type methods.

Ohayo, sensei! The run and handle_query_type methods are well-implemented. The use of tokio::select! in the run method allows for efficient concurrent handling of shutdown signals and incoming messages. The handle_query_type method comprehensively covers all query types defined in the QueryType enum, ensuring proper processing of each query type.


204-223: LGTM! Well-implemented execute method and send_broker_message function.

Ohayo, sensei! The execute method and send_broker_message function are well-implemented. The execute method correctly commits the transaction and processes the publish queue, while the send_broker_message function efficiently handles all BrokerMessage variants.

examples/spawn-and-move/manifests/dev/deployment/manifest.toml (2)

6-8: Ohayo, sensei! Updated World Contract deployment details.

The changes in the [world] section reflect a new deployment of the World Contract:

  1. The address has been updated to a new contract address.
  2. The transaction_hash has been changed, indicating a new deployment transaction.
  3. The block_number has been incremented from 3 to 6, suggesting this is a newer deployment.

These updates are crucial for maintaining accurate deployment information. Make sure these new values are correctly propagated to any dependent systems or documentation.


Line range hint 1-291: Ohayo again, sensei! Overall manifest review complete.

The changes in this manifest file are limited to the World Contract deployment details. The rest of the configuration, including other contracts, models, and their respective details, remains unchanged. This update appears to be a routine redeployment of the World Contract, possibly due to updates in its implementation or simply a fresh deployment in a new environment.

Remember to update any documentation or dependent systems that might reference the old World Contract address or transaction hash. The increased block number suggests this is a more recent deployment, which is good for maintaining an up-to-date system.

Great job keeping the deployment information current, sensei!

examples/spawn-and-move/manifests/dev/deployment/manifest.json (1)

1237-1239: Ohayo, sensei! The deployment manifest has been updated.

The changes look good and reflect an update in the deployment state:

  1. The address has been updated to a new value.
  2. The transaction_hash has been changed, indicating a new transaction.
  3. The block_number has been incremented from 3 to 6, showing progression in the blockchain.

These updates are consistent with a new deployment or transaction being processed.

Comment on lines 82 to 106
pub async fn run(&mut self) -> Result<()> {
loop {
tokio::select! {
_ = self.shutdown_rx.recv() => {
break Ok(());
}
Some(msg) = self.rx.recv() => {
let QueryMessage { statement, arguments, query_type } = msg;
let mut query = sqlx::query(&statement);

for arg in &arguments {
query = match arg {
Argument::Null => query.bind(None::<String>),
Argument::Int(integer) => query.bind(integer),
Argument::Bool(bool) => query.bind(bool),
Argument::String(string) => query.bind(string),
Argument::FieldElement(felt) => query.bind(format!("{:#x}", felt)),
}
}

self.handle_query_type(query, query_type, &statement, &arguments).await?;
}
}
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Consider improving error handling in the transaction lifecycle.

Ohayo, sensei! While the overall implementation is solid, there's room for improvement in error handling, particularly in the transaction lifecycle. Currently, if an error occurs during query execution, the transaction may remain open, potentially leading to database inconsistencies.

Consider wrapping the main loop in a try block and adding error handling to ensure the transaction is properly managed in case of failures:

pub async fn run(&mut self) -> Result<()> {
    loop {
        match self.process_messages().await {
            Ok(should_break) => {
                if should_break {
                    break;
                }
            }
            Err(e) => {
                // Rollback the transaction on error
                self.transaction.rollback().await?;
                // Start a new transaction for the next iteration
                self.transaction = self.pool.begin().await?;
                // Log the error or handle it as appropriate
                eprintln!("Error processing messages: {:?}", e);
            }
        }
    }
    Ok(())
}

async fn process_messages(&mut self) -> Result<bool> {
    tokio::select! {
        _ = self.shutdown_rx.recv() => {
            return Ok(true);
        }
        Some(msg) = self.rx.recv() => {
            let QueryMessage { statement, arguments, query_type } = msg;
            // ... (rest of the existing code)
            self.handle_query_type(query, query_type, &statement, &arguments).await?;
        }
    }
    Ok(false)
}

This approach ensures that the transaction is rolled back in case of errors, maintaining database consistency.

Comment on lines 208 to 210
while let Some(message) = self.publish_queue.pop_front() {
send_broker_message(message);
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider asynchronous message publishing for improved performance.

Ohayo, sensei! To potentially improve performance, especially under heavy load, consider publishing broker messages asynchronously. This approach can prevent blocking the main execution flow while messages are being published.

Here's a suggestion for modifying the loop:

use futures::future::join_all;

// ...

let publish_futures: Vec<_> = self.publish_queue
    .drain(..)
    .map(|message| {
        tokio::spawn(async move {
            send_broker_message(message);
        })
    })
    .collect();

join_all(publish_futures).await;

This modification spawns a new task for each message, allowing them to be published concurrently. The join_all ensures that all messages are published before proceeding.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Outside diff range and nitpick comments (4)
crates/torii/core/src/sql.rs (4)

77-77: Use &self instead of &mut self in method signature

Ohayo, sensei! The set_head method does not appear to modify any fields of Sql that require mutable access. Since you're only sending messages through the executor, consider changing the method signature to take &self instead of &mut self for better code clarity.


89-89: Use &self instead of &mut self in method signatures

Ohayo, sensei! The methods set_last_pending_block_world_tx, set_last_pending_block_tx, and set_metadata currently take &mut self, but they do not mutate any fields that require mutable references. Consider updating their signatures to use &self instead of &mut self to enhance code readability.

Also applies to: 111-111, 360-360


Line range hint 800-816: Consider optimizing array handling in build_set_entity_queries_recursive

Ohayo, sensei! When handling arrays in this recursive function, you delete all previous array elements and then insert new ones. If the arrays are large, this could impact performance. Consider optimizing this by updating existing entries when possible or batching deletes and inserts to improve efficiency.


1175-1180: Provide clarification for the execute method

Ohayo, sensei! The execute method sends an empty QueryMessage with QueryType::Execute. It's not immediately clear what the purpose of this is. Adding a comment or documentation explaining the intent behind this method would help maintainers understand its role in the system.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 60c9069 and cd52f0f.

📒 Files selected for processing (1)
  • crates/torii/core/src/sql.rs (37 hunks)
🔇 Additional comments not posted (6)
crates/torii/core/src/sql.rs (6)

15-15: LGTM!

The addition of tokio::sync::mpsc::UnboundedSender is appropriate for handling asynchronous message passing.


35-35: Consider the implications of cloning the Sql struct

Ohayo, sensei! The Sql struct derives Clone, which allows it to be cloned along with the executor field. Cloning an UnboundedSender<QueryMessage> means multiple Sql instances will share the same channel sender. Please verify whether this shared ownership is intentional. Unintended cloning could lead to complex debugging scenarios if messages are sent from multiple clones concurrently.


40-44: LGTM!

The updated new method signature correctly accepts the executor, aligning with the new message-passing architecture.


77-86: Duplicate issue: Handle potential integer conversion issues in set_head method


1176-1180: Verify the necessity of sending a QueryMessage with an empty statement in execute method

Ohayo, sensei! In the execute method, a QueryMessage with an empty statement and arguments is sent:

self.executor.send(QueryMessage {
    statement: "".to_string(),
    arguments: vec![],
    query_type: QueryType::Execute,
})?;

Please verify whether sending an empty QueryMessage is intentional and necessary. If it's meant to signal the executor in a specific way, consider adding a comment to explain its purpose for future maintainability.


460-460: Ensure all relevant transaction types are handled

Ohayo, sensei! In the store_transaction method, transactions that do not match Transaction::Invoke or Transaction::L1Handler lead to an early return:

_ => return Ok(()),

Please verify that skipping storage for other transaction types is intentional. If there are other transaction variants that should be handled (e.g., Transaction::Declare, Transaction::DeployAccount), consider extending the match arms to include them.

Comment on lines 80 to 84
self.executor.send(QueryMessage {
statement: "UPDATE contracts SET head = ? WHERE id = ?".to_string(),
arguments: vec![head, id],
query_type: QueryType::Other,
})?;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider abstracting QueryMessage creation to reduce code duplication

Ohayo, sensei! Throughout the code, QueryMessage instances are constructed with similar patterns. To enhance maintainability and reduce repetition, consider implementing helper functions or constructors, such as QueryMessage::new_other(statement, arguments), to streamline the creation of QueryMessage objects, especially when the query_type is QueryType::Other.

Also applies to: 101-106, 119-124, 161-165, 239-244, 245-254, 343-355, 463-480, 498-504, 689-692, 790-793, 836-839, 855-859, 1011-1015, 1043-1047, 1072-1076, 1115-1119, 1158-1163, 1165-1170

Comment on lines 341 to 355
self.build_delete_entity_queries_recursive(path, &entity_id, &entity)?;

self.query_queue.enqueue(
"DELETE FROM entity_model WHERE entity_id = ? AND model_id = ?",
vec![Argument::String(entity_id.clone()), Argument::String(format!("{:#x}", model_id))],
QueryType::DeleteEntity(DeleteEntityQuery {
self.executor.send(QueryMessage {
statement: "DELETE FROM entity_model WHERE entity_id = ? AND model_id = ?".to_string(),
arguments: vec![
Argument::String(entity_id.clone()),
Argument::String(format!("{:#x}", model_id)),
],
query_type: QueryType::DeleteEntity(DeleteEntityQuery {
entity_id: entity_id.clone(),
event_id: event_id.to_string(),
block_timestamp: utc_dt_string_from_timestamp(block_timestamp),
ty: entity.clone(),
}),
);
})?;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle potential errors when building delete entity queries

Ohayo, sensei! In the delete_entity method, you're building delete queries recursively:

self.build_delete_entity_queries_recursive(path, &entity_id, &entity)?;

Please ensure that any errors arising from this recursive process are adequately handled or propagated. Unchecked errors might lead to incomplete deletions or inconsistent state in the database.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Outside diff range and nitpick comments (7)
crates/torii/core/src/sql.rs (7)

31-37: Ohayo, sensei! Significant architectural change detected.

The replacement of query_queue with an UnboundedSender<QueryMessage> executor represents a shift from a queue-based to a message-passing architecture. This change could potentially improve concurrency and responsiveness.

Consider the following:

  1. Ensure proper error handling for cases where the receiver has been dropped.
  2. Be mindful of potential backpressure issues, as UnboundedSender doesn't provide built-in flow control.
  3. Document the new message-passing pattern for other developers working on this code.

Line range hint 40-68: Ohayo, sensei! Let's enhance error handling in the new method.

The changes align well with the new message-passing architecture. However, there are a few points to consider:

  1. The use of expect in format!("{:#x}", world_address) could potentially panic. Consider using a more graceful error handling approach.
  2. The error from self.executor.send() is being ignored. It's crucial to handle this error as it could indicate that the receiver has been dropped.

Consider applying the following changes:

- .bind(format!("{:#x}", world_address))
+ .bind(format!("{:#x}", world_address).map_err(|e| anyhow!("Failed to format world_address: {}", e))?)

- .execute(&pool)
- .await?;
+ .execute(&pool)
+ .await
+ .map_err(|e| anyhow!("Failed to execute initial query: {}", e))?;

- Ok(Self {
-     pool: pool.clone(),
-     world_address,
-     executor,
-     model_cache: Arc::new(ModelCache::new(pool)),
- })
+ let sql = Self {
+     pool: pool.clone(),
+     world_address,
+     executor,
+     model_cache: Arc::new(ModelCache::new(pool)),
+ };
+ 
+ // Handle potential send error
+ sql.executor.send(QueryMessage::other(
+     "Initial setup query".to_string(),
+     vec![]
+ )).map_err(|e| anyhow!("Failed to send initial setup message: {}", e))?;
+ 
+ Ok(sql)

These changes will provide more robust error handling and ensure that the initial setup message is sent successfully.


Line range hint 91-108: Ohayo, sensei! Let's enhance set_last_pending_block_world_tx for better robustness.

The changes align well with the new message-passing architecture. However, we can improve error handling and code structure:

  1. The error from self.executor.send() is being propagated, which is good, but we might want to provide more context.
  2. We can simplify the code by using map_or instead of if let.

Consider applying the following changes:

 pub fn set_last_pending_block_world_tx(
     &mut self,
     last_pending_block_world_tx: Option<Felt>,
 ) -> Result<()> {
-    let last_pending_block_world_tx = if let Some(f) = last_pending_block_world_tx {
-        Argument::String(format!("{:#x}", f))
-    } else {
-        Argument::Null
-    };
+    let last_pending_block_world_tx = last_pending_block_world_tx
+        .map_or(Argument::Null, |f| Argument::String(format!("{:#x}", f)));

     let id = Argument::FieldElement(self.world_address);

-    self.executor.send(QueryMessage::other(
-        "UPDATE contracts SET last_pending_block_world_tx = ? WHERE id = ?".to_string(),
-        vec![last_pending_block_world_tx, id],
-    ))?;
+    self.executor.send(QueryMessage::other(
+        "UPDATE contracts SET last_pending_block_world_tx = ? WHERE id = ?".to_string(),
+        vec![last_pending_block_world_tx, id],
+    )).map_err(|e| anyhow!("Failed to send set_last_pending_block_world_tx message: {}", e))?;

     Ok(())
 }

These changes simplify the code and provide more context in case of a send error.


Line range hint 416-468: Ohayo, sensei! Let's enhance store_transaction for better robustness and clarity.

The changes align with the new message-passing architecture, but we can make some improvements:

  1. Error handling for self.executor.send() can be enhanced with more context.
  2. The early return for unsupported transaction types can be made more explicit.
  3. The match expression can be simplified using the if let pattern.

Consider applying the following changes:

 pub fn store_transaction(
     &mut self,
     transaction: &Transaction,
     transaction_id: &str,
     block_timestamp: u64,
 ) -> Result<()> {
     let id = Argument::String(transaction_id.to_string());

-    let transaction_type = match transaction {
-        Transaction::Invoke(_) => "INVOKE",
-        Transaction::L1Handler(_) => "L1_HANDLER",
-        _ => return Ok(()),
+    let (transaction_type, transaction_hash, sender_address, calldata, max_fee, signature, nonce) = match transaction {
+        Transaction::Invoke(InvokeTransaction::V1(invoke_v1_transaction)) => (
+            "INVOKE",
+            Argument::FieldElement(invoke_v1_transaction.transaction_hash),
+            Argument::FieldElement(invoke_v1_transaction.sender_address),
+            Argument::String(felts_sql_string(&invoke_v1_transaction.calldata)),
+            Argument::FieldElement(invoke_v1_transaction.max_fee),
+            Argument::String(felts_sql_string(&invoke_v1_transaction.signature)),
+            Argument::FieldElement(invoke_v1_transaction.nonce),
+        ),
+        Transaction::L1Handler(l1_handler_transaction) => (
+            "L1_HANDLER",
+            Argument::FieldElement(l1_handler_transaction.transaction_hash),
+            Argument::FieldElement(l1_handler_transaction.contract_address),
+            Argument::String(felts_sql_string(&l1_handler_transaction.calldata)),
+            Argument::FieldElement(Felt::ZERO),
+            Argument::String("".to_string()),
+            Argument::FieldElement((l1_handler_transaction.nonce).into()),
+        ),
+        _ => return Ok(()), // Explicitly return for unsupported transaction types
     };

-    let (transaction_hash, sender_address, calldata, max_fee, signature, nonce) =
-        match transaction {
-            Transaction::Invoke(InvokeTransaction::V1(invoke_v1_transaction)) => (
-                Argument::FieldElement(invoke_v1_transaction.transaction_hash),
-                Argument::FieldElement(invoke_v1_transaction.sender_address),
-                Argument::String(felts_sql_string(&invoke_v1_transaction.calldata)),
-                Argument::FieldElement(invoke_v1_transaction.max_fee),
-                Argument::String(felts_sql_string(&invoke_v1_transaction.signature)),
-                Argument::FieldElement(invoke_v1_transaction.nonce),
-            ),
-            Transaction::L1Handler(l1_handler_transaction) => (
-                Argument::FieldElement(l1_handler_transaction.transaction_hash),
-                Argument::FieldElement(l1_handler_transaction.contract_address),
-                Argument::String(felts_sql_string(&l1_handler_transaction.calldata)),
-                Argument::FieldElement(Felt::ZERO), // has no max_fee
-                Argument::String("".to_string()),   // has no signature
-                Argument::FieldElement((l1_handler_transaction.nonce).into()),
-            ),
-            _ => return Ok(()),
-        };

-    self.executor.send(QueryMessage::other(
-        "INSERT OR IGNORE INTO transactions (id, transaction_hash, sender_address, calldata, \
-         max_fee, signature, nonce, transaction_type, executed_at) VALUES (?, ?, ?, ?, ?, ?, \
-         ?, ?, ?)"
-            .to_string(),
-        vec![
-            id,
-            transaction_hash,
-            sender_address,
-            calldata,
-            max_fee,
-            signature,
-            nonce,
-            Argument::String(transaction_type.to_string()),
-            Argument::String(utc_dt_string_from_timestamp(block_timestamp)),
-        ],
-    ))?;
+    self.executor.send(QueryMessage::other(
+        "INSERT OR IGNORE INTO transactions (id, transaction_hash, sender_address, calldata, \
+         max_fee, signature, nonce, transaction_type, executed_at) VALUES (?, ?, ?, ?, ?, ?, \
+         ?, ?, ?)"
+            .to_string(),
+        vec![
+            id,
+            transaction_hash,
+            sender_address,
+            calldata,
+            max_fee,
+            signature,
+            nonce,
+            Argument::String(transaction_type.to_string()),
+            Argument::String(utc_dt_string_from_timestamp(block_timestamp)),
+        ],
+    )).map_err(|e| anyhow!("Failed to send store_transaction message: {}", e))?;

     Ok(())
 }

These changes simplify the code structure, make the early return more explicit, and provide more context in case of a send error.


Line range hint 470-492: Ohayo, sensei! Let's optimize store_event for better performance and error handling.

The changes align with the new message-passing architecture, but we can make some improvements:

  1. Error handling for self.executor.send() can be enhanced with more context.
  2. The SQL query string is being constructed in a way that might be prone to SQL injection. While the use of parameterized queries mitigates this risk, it's still a good practice to use constants for table and column names.
  3. The use of QueryMessage::new with QueryType::StoreEvent is good, but we can make it more explicit.

Consider applying the following changes:

+ const EVENTS_TABLE: &str = "events";
+ const ID_COLUMN: &str = "id";
+ const KEYS_COLUMN: &str = "keys";
+ const DATA_COLUMN: &str = "data";
+ const TRANSACTION_HASH_COLUMN: &str = "transaction_hash";
+ const EXECUTED_AT_COLUMN: &str = "executed_at";

 pub fn store_event(
     &mut self,
     event_id: &str,
     event: &Event,
     transaction_hash: Felt,
     block_timestamp: u64,
 ) -> Result<()> {
     let id = Argument::String(event_id.to_string());
     let keys = Argument::String(felts_sql_string(&event.keys));
     let data = Argument::String(felts_sql_string(&event.data));
     let hash = Argument::FieldElement(transaction_hash);
     let executed_at = Argument::String(utc_dt_string_from_timestamp(block_timestamp));

+    let query = format!(
+        "INSERT OR IGNORE INTO {table} ({id}, {keys}, {data}, {transaction_hash}, {executed_at}) \
+         VALUES (?, ?, ?, ?, ?) RETURNING *",
+        table = EVENTS_TABLE,
+        id = ID_COLUMN,
+        keys = KEYS_COLUMN,
+        data = DATA_COLUMN,
+        transaction_hash = TRANSACTION_HASH_COLUMN,
+        executed_at = EXECUTED_AT_COLUMN
+    );

-    self.executor.send(QueryMessage::new(
-        "INSERT OR IGNORE INTO events (id, keys, data, transaction_hash, executed_at) VALUES \
-         (?, ?, ?, ?, ?) RETURNING *"
-            .to_string(),
-        vec![id, keys, data, hash, executed_at],
-        QueryType::StoreEvent,
-    ))?;
+    self.executor.send(QueryMessage::new(
+        query,
+        vec![id, keys, data, hash, executed_at],
+        QueryType::StoreEvent,
+    )).map_err(|e| anyhow!("Failed to send store_event message: {}", e))?;

     Ok(())
 }

These changes improve code readability, reduce the risk of SQL injection (although it was already low due to parameterized queries), and provide more context in case of a send error.


Line range hint 496-570: Ohayo, sensei! Let's enhance build_register_queries_recursive for better robustness and clarity.

The changes align with the new message-passing architecture, but we can make some improvements:

  1. Error handling for self.executor.send() can be enhanced with more context.
  2. The method is quite complex and could benefit from some refactoring to improve readability and maintainability.
  3. The use of unwrap() in the method could lead to panics in case of errors.

Consider the following improvements:

  1. Add error context to self.executor.send() calls:
- self.executor.send(QueryMessage::other(statement.to_string(), arguments))?;
+ self.executor.send(QueryMessage::other(statement.to_string(), arguments))
+     .map_err(|e| anyhow!("Failed to send register query message: {}", e))?;
  1. Refactor the method to reduce complexity:
fn build_register_queries_recursive(
    &mut self,
    selector: Felt,
    model: &Ty,
    path: Vec<String>,
    model_idx: &mut i64,
    block_timestamp: u64,
    array_idx: &mut usize,
    parent_array_idx: &mut usize,
) -> Result<()> {
    if let Ty::Enum(e) = model {
        if e.options.iter().all(|o| matches!(&o.ty, Ty::Tuple(t) if t.is_empty())) {
            return Ok(());
        }
    }

    self.build_model_query(
        selector,
        path.clone(),
        model,
        *model_idx,
        block_timestamp,
        *array_idx,
        *parent_array_idx,
    )?;

    self.process_model_members(selector, model, path, model_idx, block_timestamp, array_idx, parent_array_idx)
}

fn process_model_members(
    &mut self,
    selector: Felt,
    model: &Ty,
    path: Vec<String>,
    model_idx: &mut i64,
    block_timestamp: u64,
    array_idx: &mut usize,
    parent_array_idx: &mut usize,
) -> Result<()> {
    let mut build_member = |pathname: &str, member: &Ty| -> Result<()> {
        if matches!(member, Ty::Primitive(_) | Ty::ByteArray(_)) {
            return Ok(());
        }

        let mut path_clone = path.clone();
        path_clone.push(pathname.to_string());

        self.build_register_queries_recursive(
            selector,
            member,
            path_clone,
            &mut (*model_idx + 1),
            block_timestamp,
            &mut (*array_idx + if matches!(member, Ty::Array(_)) { 1 } else { 0 }),
            &mut (*parent_array_idx + if matches!(model, Ty::Array(_)) { 1 } else { 0 }),
        )
    };

    match model {
        Ty::Struct(s) => {
            for member in &s.children {
                build_member(&member.name, &member.ty)?;
            }
        }
        Ty::Tuple(t) => {
            for (idx, member) in t.iter().enumerate() {
                build_member(&format!("_{}", idx), member)?;
            }
        }
        Ty::Array(array) => {
            build_member("data", &array[0])?;
        }
        Ty::Enum(e) => {
            for child in &e.options {
                if let Ty::Tuple(t) = &child.ty {
                    if t.is_empty() {
                        continue;
                    }
                }
                build_member(&child.name, &child.ty)?;
            }
        }
        _ => {}
    }

    Ok(())
}
  1. Replace unwrap() with proper error handling:
- options.unwrap_or(Argument::Null),
+ options.unwrap_or_else(|| Argument::Null),

These changes will improve the robustness and readability of the code, making it easier to maintain and less prone to panics.


Line range hint 572-803: Ohayo, sensei! Let's fortify build_set_entity_queries_recursive against SQL injection and improve its structure.

The changes align with the new message-passing architecture, but there are several areas we can enhance:

  1. SQL Injection Prevention: The method constructs SQL queries using string interpolation, which can be vulnerable to SQL injection attacks.
  2. Error Handling: The error handling for self.executor.send() can be improved with more context.
  3. Code Complexity: The method is quite long and complex, making it difficult to understand and maintain.

Let's address these issues:

  1. SQL Injection Prevention:
    Replace string interpolation with parameterized queries wherever possible. For example:
- let statement = format!("DELETE FROM [{table_id}] WHERE entity_id = ?");
+ let statement = "DELETE FROM ? WHERE entity_id = ?";
- self.executor.send(QueryMessage::other(statement, vec![Argument::String(entity_id.to_string())]))?;
+ self.executor.send(QueryMessage::other(
+     statement.to_string(),
+     vec![Argument::String(table_id.clone()), Argument::String(entity_id.to_string())]
+ ))?;
  1. Error Handling:
    Add more context to error messages:
- self.executor.send(QueryMessage::other(query, arguments))?;
+ self.executor.send(QueryMessage::other(query, arguments))
+     .map_err(|e| anyhow!("Failed to send set entity query message: {}", e))?;
  1. Code Complexity:
    Consider breaking down this method into smaller, more manageable functions. For example:
fn build_set_entity_queries_recursive(
    &mut self,
    path: Vec<String>,
    event_id: &str,
    entity_id: (&str, IsEventMessage),
    entity: (&Ty, IsStoreUpdate),
    block_timestamp: u64,
    indexes: &Vec<i64>,
) -> Result<()> {
    let (entity_id, is_event_message) = entity_id;
    let (entity, is_store_update_member) = entity;

    match entity {
        Ty::Struct(s) => self.handle_struct(path, event_id, entity_id, is_event_message, s, is_store_update_member, block_timestamp, indexes),
        Ty::Enum(e) => self.handle_enum(path, event_id, entity_id, is_event_message, e, is_store_update_member, block_timestamp, indexes),
        Ty::Tuple(t) => self.handle_tuple(path, event_id, entity_id, is_event_message, t, is_store_update_member, block_timestamp, indexes),
        Ty::Array(array) => self.handle_array(path, event_id, entity_id, is_event_message, array, is_store_update_member, block_timestamp, indexes),
        _ => Ok(()),
    }
}

fn handle_struct(&mut self, path: Vec<String>, event_id: &str, entity_id: &str, is_event_message: bool, s: &Struct, is_store_update_member: bool, block_timestamp: u64, indexes: &Vec<i64>) -> Result<()> {
    // ... implementation ...
}

// Similar methods for handle_enum, handle_tuple, and handle_array

These changes will significantly improve the security, robustness, and maintainability of the code. Consider implementing these suggestions in stages, starting with the SQL injection prevention, then improving error handling, and finally refactoring for reduced complexity.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between cd52f0f and 045eed0.

📒 Files selected for processing (2)
  • crates/torii/core/src/executor.rs (1 hunks)
  • crates/torii/core/src/sql.rs (37 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • crates/torii/core/src/executor.rs
🔇 Additional comments not posted (1)
crates/torii/core/src/sql.rs (1)

Line range hint 805-884: **Ohayo, sensei! Let's fortify `build_delete_entity

Comment on lines 80 to 88
pub fn set_head(&mut self, head: u64) -> Result<()> {
let head = Argument::Int(head.try_into().expect("doesn't fit in u64"));
let id = Argument::FieldElement(self.world_address);
self.query_queue.enqueue(
"UPDATE contracts SET head = ? WHERE id = ?",
self.executor.send(QueryMessage::other(
"UPDATE contracts SET head = ? WHERE id = ?".to_string(),
vec![head, id],
QueryType::Other,
);
))?;

Ok(())
Copy link

@coderabbitai coderabbitai bot Sep 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo, sensei! Let's address potential integer conversion issues in set_head.

The changes align well with the new message-passing architecture. However, there are two points that need attention:

  1. The use of try_into().expect("doesn't fit in u64") could potentially panic if the conversion fails. This is risky, especially for a public method.
  2. The error from self.executor.send() is being propagated, which is good, but we might want to provide more context.

Consider applying the following changes to improve error handling and prevent potential panics:

 pub fn set_head(&mut self, head: u64) -> Result<()> {
-    let head = Argument::Int(head.try_into().expect("doesn't fit in u64"));
+    let head = Argument::Int(head.try_into().map_err(|_| anyhow!("Head value {} doesn't fit in i64", head))?);
     let id = Argument::FieldElement(self.world_address);
-    self.executor.send(QueryMessage {
-        statement: "UPDATE contracts SET head = ? WHERE id = ?".to_string(),
-        arguments: vec![head, id],
-        query_type: QueryType::Other,
-    })?;
+    self.executor.send(QueryMessage::other(
+        "UPDATE contracts SET head = ? WHERE id = ?".to_string(),
+        vec![head, id],
+    )).map_err(|e| anyhow!("Failed to send set_head message: {}", e))?;
     Ok(())
 }

These changes will prevent panics from integer conversion and provide more context in case of a send error.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
pub fn set_head(&mut self, head: u64) -> Result<()> {
let head = Argument::Int(head.try_into().expect("doesn't fit in u64"));
let id = Argument::FieldElement(self.world_address);
self.query_queue.enqueue(
"UPDATE contracts SET head = ? WHERE id = ?",
self.executor.send(QueryMessage::other(
"UPDATE contracts SET head = ? WHERE id = ?".to_string(),
vec![head, id],
QueryType::Other,
);
))?;
Ok(())
pub fn set_head(&mut self, head: u64) -> Result<()> {
let head = Argument::Int(head.try_into().map_err(|_| anyhow!("Head value {} doesn't fit in i64", head))?);
let id = Argument::FieldElement(self.world_address);
self.executor.send(QueryMessage::other(
"UPDATE contracts SET head = ? WHERE id = ?".to_string(),
vec![head, id],
)).map_err(|e| anyhow!("Failed to send set_head message: {}", e))?;
Ok(())
}

Copy link
Collaborator

@lambda-0x lambda-0x Oct 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is good suggestion, we should apply this. (hmm github doesn't show it properly, but i meant the coderabbit suggestion for this part)

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lambda-0x Ohayo, sensei! I'm glad you agree with the suggestion. 🐰🎉

Comment on lines +348 to +361
pub fn set_metadata(&mut self, resource: &Felt, uri: &str, block_timestamp: u64) -> Result<()> {
let resource = Argument::FieldElement(*resource);
let uri = Argument::String(uri.to_string());
let executed_at = Argument::String(utc_dt_string_from_timestamp(block_timestamp));

self.query_queue.enqueue(
self.executor.send(QueryMessage::other(
"INSERT INTO metadata (id, uri, executed_at) VALUES (?, ?, ?) ON CONFLICT(id) DO \
UPDATE SET id=excluded.id, executed_at=excluded.executed_at, \
updated_at=CURRENT_TIMESTAMP",
updated_at=CURRENT_TIMESTAMP"
.to_string(),
vec![resource, uri, executed_at],
QueryType::Other,
);
))?;

Ok(())
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ohayo, sensei! Let's optimize set_metadata for better performance and error handling.

The changes align with the new message-passing architecture, but we can make some improvements:

  1. Error handling for self.executor.send() can be enhanced with more context.
  2. The SQL query string is being constructed in a way that might be prone to SQL injection. While the use of parameterized queries mitigates this risk, it's still a good practice to use constants for table and column names.

Consider applying the following changes:

+ const METADATA_TABLE: &str = "metadata";
+ const ID_COLUMN: &str = "id";
+ const URI_COLUMN: &str = "uri";
+ const EXECUTED_AT_COLUMN: &str = "executed_at";
+ const UPDATED_AT_COLUMN: &str = "updated_at";

 pub fn set_metadata(&mut self, resource: &Felt, uri: &str, block_timestamp: u64) -> Result<()> {
     let resource = Argument::FieldElement(*resource);
     let uri = Argument::String(uri.to_string());
     let executed_at = Argument::String(utc_dt_string_from_timestamp(block_timestamp));

-    self.executor.send(QueryMessage::other(
-        "INSERT INTO metadata (id, uri, executed_at) VALUES (?, ?, ?) ON CONFLICT(id) DO \
-         UPDATE SET id=excluded.id, executed_at=excluded.executed_at, \
-         updated_at=CURRENT_TIMESTAMP"
-            .to_string(),
-        vec![resource, uri, executed_at],
-    ))?;
+    let query = format!(
+        "INSERT INTO {table} ({id}, {uri}, {executed_at}) VALUES (?, ?, ?) \
+         ON CONFLICT({id}) DO UPDATE SET \
+         {id}=excluded.{id}, \
+         {executed_at}=excluded.{executed_at}, \
+         {updated_at}=CURRENT_TIMESTAMP",
+        table = METADATA_TABLE,
+        id = ID_COLUMN,
+        uri = URI_COLUMN,
+        executed_at = EXECUTED_AT_COLUMN,
+        updated_at = UPDATED_AT_COLUMN
+    );
+
+    self.executor.send(QueryMessage::other(
+        query,
+        vec![resource, uri, executed_at],
+    )).map_err(|e| anyhow!("Failed to send set_metadata message: {}", e))?;

     Ok(())
 }

These changes improve code readability, reduce the risk of SQL injection (although it was already low due to parameterized queries), and provide more context in case of a send error.

Committable suggestion was skipped due to low confidence.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 388ba1e and b7acef5.

📒 Files selected for processing (4)
  • crates/torii/core/src/engine.rs (9 hunks)
  • crates/torii/core/src/executor.rs (1 hunks)
  • crates/torii/core/src/sql_test.rs (7 hunks)
  • crates/torii/libp2p/src/tests.rs (3 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • crates/torii/core/src/engine.rs
  • crates/torii/core/src/executor.rs
  • crates/torii/core/src/sql_test.rs
🔇 Additional comments not posted (5)
crates/torii/libp2p/src/tests.rs (5)

538-538: Ohayo, sensei! New import looks good!

The addition of broadcast from tokio::sync is appropriate for the new Executor implementation. It will allow for efficient message distribution in the asynchronous context.


540-540: Ohayo, sensei! Crucial import detected!

The addition of Executor from torii_core::executor is a key import for this PR. It introduces the new asynchronous execution model, which is at the core of the optimization changes.


Line range hint 1-724: Ohayo, sensei! Overall changes look promising!

The introduction of the Executor-based execution model in this test file is a significant improvement. These changes should lead to more efficient handling of database operations and better asynchronous processing. The updates to the test cases ensure that the new model is properly tested.

A few key points to note:

  1. The new Executor component is well-integrated into the existing test structure.
  2. The changes to Sql::new() and db.execute() reflect the shift in the execution model.
  3. Error handling could be improved in the executor spawning process.

These changes align well with the PR objective of moving away from the query queue for executing transactions. Great work on optimizing the core functionality!


569-569: Ohayo, sensei! Sql initialization updated nicely!

The addition of the sender parameter to Sql::new() aligns well with the new Executor implementation. This change facilitates communication between the SQL component and the Executor.

Let's make sure the Sql::new() method signature has been updated accordingly:

This will help ensure consistency across the codebase.


598-598: Ohayo, sensei! Execution model update detected!

The removal of await from db.execute() aligns with the new Executor-based execution model. This change reflects the shift to a more efficient asynchronous processing approach.

Let's verify the updated execute() method signature to ensure it matches the new execution model:

This will help confirm that the method signature has been updated consistently across the codebase.

Comment on lines 564 to 568
let (shutdown_tx, _) = broadcast::channel(1);
let (mut executor, sender) = Executor::new(pool.clone(), shutdown_tx).await.unwrap();
tokio::spawn(async move {
executor.run().await.unwrap();
});
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Ohayo, sensei! Executor setup looks solid!

The initialization and spawning of the Executor are well-implemented. The use of a broadcast channel for shutdown is a nice touch for clean termination.

One small suggestion to improve robustness:

Consider adding error handling for the spawned task:

 tokio::spawn(async move {
-    executor.run().await.unwrap();
+    if let Err(e) = executor.run().await {
+        eprintln!("Executor error: {}", e);
+    }
 });

This will prevent silent failures if the executor encounters an error.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let (shutdown_tx, _) = broadcast::channel(1);
let (mut executor, sender) = Executor::new(pool.clone(), shutdown_tx).await.unwrap();
tokio::spawn(async move {
executor.run().await.unwrap();
});
let (shutdown_tx, _) = broadcast::channel(1);
let (mut executor, sender) = Executor::new(pool.clone(), shutdown_tx).await.unwrap();
tokio::spawn(async move {
if let Err(e) = executor.run().await {
eprintln!("Executor error: {}", e);
}
});

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (6)
crates/torii/core/src/sql_test.rs (6)

34-47: Ohayo, sensei! New setup_sqlite_pool function looks great!

The function provides a robust setup for SQLite testing using a temporary file, which is an improvement over in-memory databases. Running migrations ensures an up-to-date schema for each test.

A small suggestion to enhance error handling:

Consider using ? instead of .unwrap() for better error propagation:

-    let tempfile = NamedTempFile::new().unwrap();
+    let tempfile = NamedTempFile::new()?;
-    let options = SqliteConnectOptions::from_str(&path).unwrap().create_if_missing(true);
+    let options = SqliteConnectOptions::from_str(&path)?.create_if_missing(true);

61-61: Ohayo, sensei! Changes in bootstrap_engine look solid!

The cloning of db and the addition of db.execute().await.unwrap() improve concurrency and ensure all database operations are completed.

A small suggestion for better error handling:

Consider propagating the error instead of using unwrap():

-    db.execute().await.unwrap();
+    db.execute().await?;

This change would allow the caller to handle potential errors more gracefully.

Also applies to: 81-82


153-158: Ohayo, sensei! The test_load_from_remote function looks much improved!

The integration of setup_sqlite_pool, Executor, and the new Sql instantiation aligns well with the asynchronous database handling approach. The additional assertions for table counts enhance the test's robustness.

A small suggestion for better error handling:

Consider using ? for error propagation instead of unwrap():

-    let pool = setup_sqlite_pool().await.unwrap();
+    let pool = setup_sqlite_pool().await?;
-    let (mut executor, sender) = Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap();
+    let (mut executor, sender) = Executor::new(pool.clone(), shutdown_tx.clone()).await?;

This change would allow for more graceful error handling in the test function.

Also applies to: 160-160, 165-165, 213-214


310-315: Ohayo, sensei! The test_load_from_remote_del function looks well-updated!

The changes are consistent with the updates in the previous test function, maintaining a good testing structure. The new assertions for specific table counts are excellent for verifying deletion operations.

A small suggestion for better error handling:

Similar to the previous function, consider using ? for error propagation:

-    let pool = setup_sqlite_pool().await.unwrap();
+    let pool = setup_sqlite_pool().await?;
-    let (mut executor, sender) = Executor::new(pool.clone(), shutdown_tx.clone()).await.unwrap();
+    let (mut executor, sender) = Executor::new(pool.clone(), shutdown_tx.clone()).await?;

This change would allow for more graceful error handling in the test function.

Also applies to: 317-317, 321-323


396-401: Ohayo, sensei! The test_update_with_set_record function has been updated consistently!

The changes align well with the updates in the previous test functions, maintaining a consistent testing approach across the file.

However, I have a suggestion to enhance this test:

Consider adding assertions similar to those in the other test functions to validate the expected state after the update operation. For example:

assert_eq!(count_table("dojo_examples-Position", &db.pool).await, 1);
assert_eq!(count_table("dojo_examples-Moves", &db.pool).await, 1);

This would provide better validation of the update operation's effects.

Also applies to: 403-403


Line range hint 1-424: Ohayo, sensei! Overall, the changes in this file are quite impressive!

The introduction of setup_sqlite_pool and the Executor has significantly improved the testing structure, providing a more realistic database setup. The consistency across all test functions is commendable.

For future improvements:

  1. Consider implementing a more chronological test for Torii re-syncing, as mentioned in the TODO comment. This could involve creating a series of transactions with known timestamps and verifying that Torii correctly processes them in order.

  2. Implement a custom error type for these tests to improve error handling and provide more context when failures occur. This could look like:

#[derive(Debug, thiserror::Error)]
enum TestError {
    #[error("Database error: {0}")]
    Database(#[from] sqlx::Error),
    #[error("Executor error: {0}")]
    Executor(#[from] ExecutorError),
    // Add other error types as needed
}

Then, update the test functions to use Result<(), TestError> as the return type, allowing for more granular error handling and reporting.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 7758cf9 and 607cd06.

📒 Files selected for processing (1)
  • crates/torii/core/src/sql_test.rs (11 hunks)
🔇 Additional comments (1)
crates/torii/core/src/sql_test.rs (1)

14-14: Ohayo, sensei! New imports look good!

The new imports for SqlitePool, NamedTempFile, and Executor are well-aligned with the changes in the test setup and execution. They support the transition to file-based temporary databases and the integration of the Executor component.

Also applies to: 21-21, 25-25

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (2)
crates/torii/core/src/sql.rs (2)

Line range hint 901-1128: Ensure proper sanitization of dynamic table names to prevent SQL injection

Ohayo, sensei! In build_model_query, dynamic table names are constructed using format! with table_id, which is derived from path.join("$"). If path contains unsanitized input, this could potentially lead to SQL injection vulnerabilities. To enhance security, please ensure that table_id is properly sanitized or validated to prevent SQL injection attacks.


Line range hint 424-472: Consider handling or logging unhandled transaction types

Ohayo, sensei! In store_transaction, unhandled transaction types are silently ignored by returning Ok(()). To improve code robustness, consider logging a warning or handling other transaction types as needed to ensure important transactions are not missed.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 607cd06 and d48dd30.

📒 Files selected for processing (1)
  • crates/torii/core/src/sql.rs (37 hunks)
🔇 Additional comments (1)
crates/torii/core/src/sql.rs (1)

84-92: Handle potential integer conversion errors in set_head

Ohayo, sensei! The use of try_into().expect("doesn't fit in u64") in set_head could cause a panic if the conversion fails. To prevent potential runtime panics, consider handling the TryInto error more gracefully.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Outside diff range and nitpick comments (4)
crates/torii/graphql/src/tests/models_ordering_test.rs (1)

48-50: Ohayo again, sensei! Great use of temporary files in the test!

The introduction of NamedTempFile for the test database is a fantastic improvement. It ensures better test isolation and automatic cleanup. Well done!

A small suggestion to enhance robustness:

Consider propagating the error from NamedTempFile::new() instead of using unwrap(). Here's a quick refactor:

- let tempfile = NamedTempFile::new().unwrap();
+ let tempfile = NamedTempFile::new()?;

This change would allow for better error handling and maintain consistency with the ? operator used in the spinup_types_test call.

crates/torii/graphql/src/tests/entities_test.rs (1)

94-96: Ohayo again, sensei! Great use of temporary file in the test setup.

The introduction of NamedTempFile improves test isolation. Here's a small suggestion to enhance error handling:

Consider using ? for error propagation instead of unwrap():

-        let tempfile = NamedTempFile::new().unwrap();
-        let path = tempfile.path().to_string_lossy();
+        let tempfile = NamedTempFile::new()?;
+        let path = tempfile.path().to_str().ok_or_else(|| anyhow::anyhow!("Failed to get tempfile path"))?;
         let pool = spinup_types_test(&path).await?;

This change would propagate any errors that occur during temporary file creation or path conversion, making the test more robust.

crates/torii/graphql/src/tests/mod.rs (1)

353-357: Great addition of the Executor for concurrent operations!

Ohayo, sensei! The introduction of the Executor is a fantastic improvement for managing concurrent database operations. Spawning it in a separate tokio task is a smart move for non-blocking execution.

However, I have a small suggestion to enhance the robustness of this implementation:

Consider improving the error handling in the spawned task. Here's a suggested modification:

 tokio::spawn(async move {
-    executor.run().await.unwrap();
+    if let Err(e) = executor.run().await {
+        eprintln!("Executor error: {:?}", e);
+        // Consider how to handle this error (e.g., panic, return early, etc.)
+    }
 });

This change will prevent silent failures and make debugging easier if something goes wrong with the executor.

crates/torii/graphql/src/tests/models_test.rs (1)

170-172: Ohayo again, sensei! Great improvement to test isolation!

The use of a temporary file for each test run is an excellent way to ensure test isolation. This change will help prevent any potential interference between test runs.

Consider handling potential errors more gracefully:

-        let tempfile = NamedTempFile::new().unwrap();
-        let path = tempfile.path().to_string_lossy();
-        let pool = spinup_types_test(&path).await?;
+        let tempfile = NamedTempFile::new().map_err(|e| anyhow::anyhow!("Failed to create temp file: {}", e))?;
+        let path = tempfile.path().to_string_lossy();
+        let pool = spinup_types_test(&path).await?;

This change will provide more context if file creation fails, making debugging easier.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 6d4b99f and baf7f35.

📒 Files selected for processing (4)
  • crates/torii/graphql/src/tests/entities_test.rs (2 hunks)
  • crates/torii/graphql/src/tests/mod.rs (4 hunks)
  • crates/torii/graphql/src/tests/models_ordering_test.rs (2 hunks)
  • crates/torii/graphql/src/tests/models_test.rs (2 hunks)
🔇 Additional comments (6)
crates/torii/graphql/src/tests/models_ordering_test.rs (1)

6-6: Ohayo, sensei! Nice addition of the tempfile crate!

The introduction of NamedTempFile from the tempfile crate is a great choice for managing temporary test data. This approach enhances test isolation and cleanup.

crates/torii/graphql/src/tests/entities_test.rs (1)

8-8: Ohayo, sensei! Nice addition of the temporary file import.

The addition of tempfile::NamedTempFile is a good practice for test isolation. It ensures that the test creates and uses a temporary file, which helps prevent side effects and makes the test more robust.

crates/torii/graphql/src/tests/mod.rs (3)

30-30: LGTM! New import for Executor.

Ohayo, sensei! The new import for Executor is well-placed and necessary for the changes in the spinup_types_test function. Good job on keeping the imports organized!


278-278: Nice update to the function signature!

Ohayo, sensei! The addition of the path parameter to spinup_types_test is a great improvement. It allows for more flexibility in specifying the database location, which is especially useful for testing scenarios. Well done!


280-281: Excellent switch to file-based SQLite approach!

Ohayo, sensei! The modification to use a file-based SQLite database is a smart move. It allows for persistent storage during tests, which can be beneficial for debugging and more complex test scenarios. The create_if_missing(true) option ensures smooth operation even if the file doesn't exist initially. Nicely done!

crates/torii/graphql/src/tests/models_test.rs (1)

9-9: Ohayo, sensei! Nice addition of the tempfile crate!

The import of NamedTempFile is a great choice for improving test isolation. This will help prevent potential conflicts between test runs and make our tests more reliable.

@@ -374,6 +379,6 @@ pub async fn spinup_types_test() -> Result<SqlitePool> {
let to = account.provider().block_hash_and_number().await?.block_number;
let data = engine.fetch_range(0, to, None).await.unwrap();
engine.process_range(data).await.unwrap();

db.execute().await.unwrap();
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Consider enhancing error handling for database execution.

Ohayo, sensei! While the change to await the database execution is good, the use of unwrap() could lead to unexpected panics if an error occurs. Let's make this more robust!

Consider updating the error handling as follows:

-db.execute().await.unwrap();
+db.execute().await.map_err(|e| {
+    eprintln!("Database execution error: {:?}", e);
+    anyhow::anyhow!("Failed to execute database operations: {:?}", e)
+})?;

This change will provide more informative error messages and allow for graceful error propagation.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
db.execute().await.unwrap();
db.execute().await.map_err(|e| {
eprintln!("Database execution error: {:?}", e);
anyhow::anyhow!("Failed to execute database operations: {:?}", e)
})?;

Copy link

codecov bot commented Sep 30, 2024

Codecov Report

Attention: Patch coverage is 83.88998% with 82 lines in your changes missing coverage. Please review.

Project coverage is 68.96%. Comparing base (edc8840) to head (63cca75).
Report is 2 commits behind head on main.

Files with missing lines Patch % Lines
crates/torii/core/src/sql.rs 89.14% 24 Missing ⚠️
crates/torii/core/src/engine.rs 39.47% 23 Missing ⚠️
crates/torii/core/src/executor.rs 86.79% 21 Missing ⚠️
bin/torii/src/main.rs 0.00% 11 Missing ⚠️
...rates/torii/core/src/processors/metadata_update.rs 0.00% 2 Missing ⚠️
...tes/torii/core/src/processors/store_transaction.rs 0.00% 1 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##             main    #2460      +/-   ##
==========================================
- Coverage   68.98%   68.96%   -0.02%     
==========================================
  Files         372      372              
  Lines       48526    48569      +43     
==========================================
+ Hits        33475    33496      +21     
- Misses      15051    15073      +22     

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

bin/torii/src/main.rs Outdated Show resolved Hide resolved
crates/torii/core/src/executor.rs Show resolved Hide resolved
crates/torii/core/src/executor.rs Outdated Show resolved Hide resolved
Comment on lines 80 to 88
pub fn set_head(&mut self, head: u64) -> Result<()> {
let head = Argument::Int(head.try_into().expect("doesn't fit in u64"));
let id = Argument::FieldElement(self.world_address);
self.query_queue.enqueue(
"UPDATE contracts SET head = ? WHERE id = ?",
self.executor.send(QueryMessage::other(
"UPDATE contracts SET head = ? WHERE id = ?".to_string(),
vec![head, id],
QueryType::Other,
);
))?;

Ok(())
Copy link
Collaborator

@lambda-0x lambda-0x Oct 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is good suggestion, we should apply this. (hmm github doesn't show it properly, but i meant the coderabbit suggestion for this part)

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 9c9e0a3 and 6b6f5a6.

📒 Files selected for processing (1)
  • bin/torii/src/main.rs (4 hunks)
🔇 Additional comments (3)
bin/torii/src/main.rs (3)

30-35: Ohayo, sensei! New imports look good.

The added imports for NamedTempFile, Sender, and Executor are necessary for the new functionality introduced in the main function. They align well with the changes made in the rest of the file.


69-69: Ohayo, sensei! Good update to the Args struct.

The change of the default value for the database argument from ":memory:" to an empty string is a smart move. It allows for more flexible handling of the database path in the main function, particularly when creating a temporary file if no specific path is provided.


199-199: Ohayo, sensei! Good update to the Sql::new() call.

The addition of the sender parameter to the Sql::new() method call is consistent with the new Executor implementation. You've correctly passed the sender from the newly created Executor, which should allow for proper communication between the SQL module and the executor.

Comment on lines +168 to +171
let tempfile = NamedTempFile::new()?;
let database_path =
if args.database.is_empty() { tempfile.path().to_str().unwrap() } else { &args.database };

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo, sensei! Consider more robust error handling for temporary file path.

The new logic for handling the database path is a good improvement. However, using unwrap() on tempfile.path().to_str() could potentially panic if the path contains non-UTF-8 characters. Consider using a more robust error handling approach.

Here's a suggestion to improve error handling:

let database_path = if args.database.is_empty() {
    tempfile.path().to_str().ok_or_else(|| anyhow::anyhow!("Failed to get temporary file path"))?
} else {
    &args.database
};

This change will propagate the error up the call stack instead of panicking, allowing for more graceful error handling.

Comment on lines +194 to +198
let (mut executor, sender) = Executor::new(pool.clone(), shutdown_tx.clone()).await?;
tokio::spawn(async move {
executor.run().await.unwrap();
});

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Ohayo, sensei! Consider improving error handling in the executor task.

The creation and running of the Executor in a separate tokio task is a good approach for concurrent execution. However, using unwrap() in the spawned task could lead to a panic if an error occurs, which might cause the entire application to crash.

Consider handling potential errors more gracefully:

tokio::spawn(async move {
    if let Err(e) = executor.run().await {
        error!("Executor encountered an error: {:?}", e);
        // Optionally, you could send a shutdown signal here
        // let _ = shutdown_tx.send(());
    }
});

This change will log the error instead of panicking, allowing the application to continue running even if the executor encounters an issue.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Outside diff range and nitpick comments (13)
crates/torii/core/src/sql.rs (13)

40-64: Ohayo, sensei! LGTM with a minor suggestion

The changes in the new method align well with the new asynchronous message-passing model. The addition of the executor parameter and the use of QueryMessage::other are appropriate.

However, we could improve the error handling slightly:

Consider updating the error handling for the execute call:

-        db.execute().await?;
+        db.execute().await.map_err(|e| anyhow!("Failed to execute initial query: {}", e))?;

This change provides more context in case of an execution error.


Line range hint 104-137: Ohayo, sensei! Good changes, with room for improvement

The modifications to set_last_pending_block_world_tx and set_last_pending_block_tx are in line with the new asynchronous model. However, we can enhance the error handling:

Consider updating the error handling in both methods:

-        self.executor.send(QueryMessage::other(
+        self.executor.send(QueryMessage::other(
             "UPDATE contracts SET last_pending_block_world_tx = ? WHERE id = ?".to_string(),
             vec![last_pending_block_world_tx, id],
-        ))?;
+        )).map_err(|e| anyhow!("Failed to send last_pending_block_world_tx update: {}", e))?;

Apply a similar change to set_last_pending_block_tx. This improvement provides more context in case of send errors.


Line range hint 173-188: Ohayo, sensei! Good transition, but let's enhance error handling

The change to use QueryMessage::new with QueryType::RegisterModel is a good adaptation to the new asynchronous model. It allows for more specific handling of different query types.

However, we can improve the error handling:

-        self.executor.send(QueryMessage::new(
+        self.executor.send(QueryMessage::new(
             insert_models.to_string(),
             arguments,
             QueryType::RegisterModel,
-        ))?;
+        )).map_err(|e| anyhow!("Failed to send register model query: {}", e))?;

This change provides more context in case of a send error, making debugging easier.


Line range hint 251-272: Ohayo, sensei! Good adaptation, but let's refine error handling

The changes in the set_entity method align well with the new asynchronous model:

  1. The use of QueryMessage::new with QueryType::SetEntity allows for specific handling of entity setting operations.
  2. The separate QueryMessage::other for entity_model insertion maintains flexibility for different query types.

However, we can enhance the error handling:

-        self.executor.send(QueryMessage::new(
+        self.executor.send(QueryMessage::new(
             insert_entities.to_string(),
             arguments,
             QueryType::SetEntity(entity.clone()),
-        ))?;
+        )).map_err(|e| anyhow!("Failed to send set entity query: {}", e))?;

-        self.executor.send(QueryMessage::other(
+        self.executor.send(QueryMessage::other(
             "INSERT INTO entity_model (entity_id, model_id) VALUES (?, ?) ON CONFLICT(entity_id, \
              model_id) DO NOTHING"
                 .to_string(),
             vec![Argument::String(entity_id.clone()), Argument::String(model_id.clone())],
-        ))?;
+        )).map_err(|e| anyhow!("Failed to send entity_model insertion query: {}", e))?;

These changes provide more context in case of send errors, facilitating easier debugging and error tracing.


Line range hint 304-329: Ohayo, sensei! Good progress, but let's polish error handling

The modifications to the set_event_message method are well-aligned with the new asynchronous model:

  1. The use of QueryMessage::new with QueryType::EventMessage allows for specific handling of event message operations.
  2. The separate QueryMessage::other for event_model insertion maintains flexibility for different query types.

However, we can refine the error handling:

-        self.executor.send(QueryMessage::new(
+        self.executor.send(QueryMessage::new(
             insert_entities.to_string(),
             vec![
                 Argument::String(entity_id.clone()),
                 Argument::String(keys_str),
                 Argument::String(event_id.to_string()),
                 Argument::String(utc_dt_string_from_timestamp(block_timestamp)),
             ],
             QueryType::EventMessage(entity.clone()),
-        ))?;
+        )).map_err(|e| anyhow!("Failed to send event message query: {}", e))?;

-        self.executor.send(QueryMessage::other(
+        self.executor.send(QueryMessage::other(
             "INSERT INTO event_model (entity_id, model_id) VALUES (?, ?) ON CONFLICT(entity_id, \
              model_id) DO NOTHING"
                 .to_string(),
             vec![Argument::String(entity_id.clone()), Argument::String(model_id.clone())],
-        ))?;
+        )).map_err(|e| anyhow!("Failed to send event_model insertion query: {}", e))?;

These enhancements provide more context in case of send errors, which will aid in debugging and error tracing.


347-356: Ohayo, sensei! Solid improvements, but let's enhance error handling

The changes in the delete_entity method are well-implemented:

  1. The use of QueryMessage::new with QueryType::DeleteEntity aligns with the new asynchronous model.
  2. The introduction of DeleteEntityQuery provides a structured way to package deletion information, which is excellent for maintainability and clarity.

However, we can further improve the error handling:

-        self.executor.send(QueryMessage::new(
+        self.executor.send(QueryMessage::new(
             "DELETE FROM entity_model WHERE entity_id = ? AND model_id = ?".to_string(),
             vec![Argument::String(entity_id.clone()), Argument::String(format!("{:#x}", model_id))],
             QueryType::DeleteEntity(DeleteEntityQuery {
                 entity_id: entity_id.clone(),
                 event_id: event_id.to_string(),
                 block_timestamp: utc_dt_string_from_timestamp(block_timestamp),
                 ty: entity.clone(),
             }),
-        ))?;
+        )).map_err(|e| anyhow!("Failed to send delete entity query: {}", e))?;

This change provides more context in case of a send error, making debugging easier.


361-374: Ohayo, sensei! Good adaptation, but let's refine error handling

The changes in the set_metadata method align well with the new asynchronous model:

  1. The method now returns Result<()>, allowing for proper error propagation.
  2. The use of QueryMessage::other is consistent with the new execution model.

However, we can enhance the error handling:

-        self.executor.send(QueryMessage::other(
+        self.executor.send(QueryMessage::other(
             "INSERT INTO metadata (id, uri, executed_at) VALUES (?, ?, ?) ON CONFLICT(id) DO \
              UPDATE SET id=excluded.id, executed_at=excluded.executed_at, \
              updated_at=CURRENT_TIMESTAMP"
                 .to_string(),
             vec![resource, uri, executed_at],
-        ))?;
+        )).map_err(|e| anyhow!("Failed to send set_metadata query: {}", e))?;

This change provides more context in case of a send error, facilitating easier debugging and error tracing.


403-403: Ohayo, sensei! Good transition, but let's polish error handling

The change to use QueryMessage::other in the update_metadata method aligns well with the new asynchronous execution model.

However, we can enhance the error handling:

-        self.executor.send(QueryMessage::other(statement, arguments))?;
+        self.executor.send(QueryMessage::other(statement, arguments))
+            .map_err(|e| anyhow!("Failed to send update_metadata query: {}", e))?;

This improvement provides more context in case of a send error, making debugging easier.


Line range hint 463-481: Ohayo, sensei! Good adaptation, but let's refine error handling

The change to use QueryMessage::other in the store_transaction method is well-aligned with the new asynchronous execution model.

However, we can enhance the error handling:

-        self.executor.send(QueryMessage::other(
+        self.executor.send(QueryMessage::other(
             "INSERT OR IGNORE INTO transactions (id, transaction_hash, sender_address, calldata, \
              max_fee, signature, nonce, transaction_type, executed_at) VALUES (?, ?, ?, ?, ?, ?, \
              ?, ?, ?)"
                 .to_string(),
             vec![
                 id,
                 transaction_hash,
                 sender_address,
                 calldata,
                 max_fee,
                 signature,
                 nonce,
                 Argument::String(transaction_type.to_string()),
                 Argument::String(utc_dt_string_from_timestamp(block_timestamp)),
             ],
-        ))?;
+        )).map_err(|e| anyhow!("Failed to send store_transaction query: {}", e))?;

This improvement provides more context in case of a send error, facilitating easier debugging and error tracing.


497-505: Ohayo, sensei! Excellent transition, but let's enhance error handling

The changes in the store_event method are well-implemented:

  1. The use of QueryMessage::new with QueryType::StoreEvent aligns perfectly with the new asynchronous model.
  2. The specific QueryType::StoreEvent allows for more granular handling of different query types, which is a great improvement.

However, we can further improve the error handling:

-        self.executor.send(QueryMessage::new(
+        self.executor.send(QueryMessage::new(
             "INSERT OR IGNORE INTO events (id, keys, data, transaction_hash, executed_at) VALUES \
              (?, ?, ?, ?, ?) RETURNING *"
                 .to_string(),
             vec![id, keys, data, hash, executed_at],
             QueryType::StoreEvent,
-        ))?;
+        )).map_err(|e| anyhow!("Failed to send store_event query: {}", e))?;

This change provides more context in case of a send error, making debugging easier.


599-691: Ohayo, sensei! Good adaptation, but let's refine error handling

The changes in the build_set_entity_queries_recursive method are well-implemented:

  1. The use of executor: &mut UnboundedSender<QueryMessage> instead of &mut Self provides more flexibility and aligns with the new asynchronous model.
  2. The transition to QueryMessage::other is consistent with the new execution model.

However, we can enhance the error handling. Here's an example of how to improve it:

-            executor.send(QueryMessage::other(statement, arguments))?;
+            executor.send(QueryMessage::other(statement, arguments))
+                .map_err(|e| anyhow!("Failed to send entity query: {}", e))?;

Apply this pattern throughout the method where executor.send is called. This change provides more context in case of send errors, facilitating easier debugging and error tracing.


Line range hint 824-897: Ohayo, sensei! Good transition, but let's polish error handling

The change to use QueryMessage::other in the build_delete_entity_queries_recursive method aligns well with the new asynchronous execution model.

However, we can enhance the error handling. Here's an example of how to improve it:

-                self.executor.send(QueryMessage::other(
+                self.executor.send(QueryMessage::other(
                     statement,
                     vec![Argument::String(entity_id.to_string())],
-                ))?;
+                )).map_err(|e| anyhow!("Failed to send delete entity query: {}", e))?;

Apply this pattern throughout the method where self.executor.send is called. This change provides more context in case of send errors, making debugging easier.


Line range hint 910-1137: Ohayo, sensei! Solid adaptation, but let's enhance error handling

The transition to use QueryMessage::other in the build_model_query method is well-aligned with the new asynchronous execution model. This change maintains consistency throughout the codebase.

However, we can improve the error handling. Here's an example of how to enhance it:

-                    self.executor.send(QueryMessage::other(statement.to_string(), arguments))?;
+                    self.executor.send(QueryMessage::other(statement.to_string(), arguments))
+                        .map_err(|e| anyhow!("Failed to send model member query: {}", e))?;

Apply this pattern throughout the method where self.executor.send is called. This change provides more context in case of send errors, facilitating easier debugging and error tracing.

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 6b6f5a6 and 61f0a4b.

📒 Files selected for processing (1)
  • crates/torii/core/src/sql.rs (37 hunks)
🔇 Additional comments (4)
crates/torii/core/src/sql.rs (4)

15-15: Ohayo, sensei! LGTM: Transition to asynchronous message-passing model

The changes in imports and the Sql struct definition align well with the transition from a queue-based execution model to an asynchronous message-passing model. This shift should improve the overall performance and scalability of the system.

Also applies to: 18-18, 35-35


89-101: Ohayo, sensei! Excellent improvements in set_head

The changes in the set_head method are well-implemented:

  1. The method now returns Result<()>, allowing for proper error propagation.
  2. The use of QueryMessage::other aligns with the new asynchronous model.
  3. The improved error handling for integer conversion addresses potential overflow issues.

These changes enhance both the reliability and consistency of the code.


518-518: Ohayo, sensei! Subtle but important improvement

The change in the build_register_queries_recursive method to return Result<()> instead of () is a good improvement. This change allows for better error propagation and is consistent with the error handling enhancements throughout the file. It's a small but significant step towards more robust error management.


1142-1146: Ohayo, sensei! Excellent refactoring of the execute method

The changes in the execute method are well-implemented and align perfectly with the new asynchronous message-passing model:

  1. Changing the method signature from &mut self to &self allows for more flexible usage, as it no longer requires mutable access to the entire Sql struct.
  2. The use of QueryMessage::execute_recv() to create a new channel for execution is a clever way to handle the execution process asynchronously.
  3. Sending an execution message through the executor leverages the new message-passing model effectively.

These changes should improve the overall performance and concurrency of the system, allowing for better scalability.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

📜 Review details

Configuration used: .coderabbit.yaml
Review profile: CHILL

📥 Commits

Files that changed from the base of the PR and between 61f0a4b and 63cca75.

📒 Files selected for processing (1)
  • crates/torii/core/src/executor.rs (1 hunks)
🔇 Additional comments (4)
crates/torii/core/src/executor.rs (4)

23-30: Enums are well-defined and enhance code clarity.

Ohayo, sensei! The Argument enum effectively represents various SQL argument types, improving type safety and code readability.


33-38: BrokerMessage enum cleanly encapsulates message types.

The BrokerMessage enum provides a clear structure for different broker messages, facilitating better message handling.


138-168: Implement transaction rollback on error in run method.

Ohayo, sensei! When handle_query_type returns an error in the run method, the transaction is not rolled back, which could lead to database inconsistencies. Consider rolling back the transaction in case of errors to maintain data integrity.


178-276: Ensure transaction is properly managed in handle_query_type.

In the handle_query_type method, errors during query execution may leave the transaction open. To maintain database integrity, consider implementing error handling to rollback the transaction when an error occurs.

Comment on lines +69 to +123
pub struct QueryMessage {
pub statement: String,
pub arguments: Vec<Argument>,
pub query_type: QueryType,
tx: Option<oneshot::Sender<Result<()>>>,
}

impl QueryMessage {
pub fn new(statement: String, arguments: Vec<Argument>, query_type: QueryType) -> Self {
Self { statement, arguments, query_type, tx: None }
}

pub fn new_recv(
statement: String,
arguments: Vec<Argument>,
query_type: QueryType,
) -> (Self, oneshot::Receiver<Result<()>>) {
let (tx, rx) = oneshot::channel();
(Self { statement, arguments, query_type, tx: Some(tx) }, rx)
}

pub fn other(statement: String, arguments: Vec<Argument>) -> Self {
Self { statement, arguments, query_type: QueryType::Other, tx: None }
}

pub fn other_recv(
statement: String,
arguments: Vec<Argument>,
) -> (Self, oneshot::Receiver<Result<()>>) {
let (tx, rx) = oneshot::channel();
(Self { statement, arguments, query_type: QueryType::Other, tx: Some(tx) }, rx)
}

pub fn execute() -> Self {
Self {
statement: "".to_string(),
arguments: vec![],
query_type: QueryType::Execute,
tx: None,
}
}

pub fn execute_recv() -> (Self, oneshot::Receiver<Result<()>>) {
let (tx, rx) = oneshot::channel();
(
Self {
statement: "".to_string(),
arguments: vec![],
query_type: QueryType::Execute,
tx: Some(tx),
},
rx,
)
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Simplify QueryMessage constructors using builder pattern.

Ohayo, sensei! Consider using the builder pattern for QueryMessage to reduce the number of constructor methods and enhance code maintainability.

Comment on lines +278 to +287
async fn execute(&mut self) -> Result<()> {
let transaction = mem::replace(&mut self.transaction, self.pool.begin().await?);
transaction.commit().await?;

for message in self.publish_queue.drain(..) {
send_broker_message(message);
}

Ok(())
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Handle potential errors during transaction commit.

Ohayo, sensei! If transaction.commit().await? fails in the execute method, the replaced transaction may cause inconsistencies. Please add error handling to manage commit failures appropriately.

Apply this diff to handle errors during transaction commit:

 async fn execute(&mut self) -> Result<()> {
     let transaction = mem::replace(&mut self.transaction, self.pool.begin().await?);
-    transaction.commit().await?;
+    if let Err(e) = transaction.commit().await {
+        error!(target: LOG_TARGET, error = %e, "Failed to commit transaction.");
+        // Optionally, attempt to rollback or handle the error as needed
+        return Err(e);
+    }
 
     for message in self.publish_queue.drain(..) {
         send_broker_message(message);
     }
 
     Ok(())
 }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
async fn execute(&mut self) -> Result<()> {
let transaction = mem::replace(&mut self.transaction, self.pool.begin().await?);
transaction.commit().await?;
for message in self.publish_queue.drain(..) {
send_broker_message(message);
}
Ok(())
}
async fn execute(&mut self) -> Result<()> {
let transaction = mem::replace(&mut self.transaction, self.pool.begin().await?);
if let Err(e) = transaction.commit().await {
error!(target: LOG_TARGET, error = %e, "Failed to commit transaction.");
// Optionally, attempt to rollback or handle the error as needed
return Err(e);
}
for message in self.publish_queue.drain(..) {
send_broker_message(message);
}
Ok(())
}

Copy link
Collaborator

@glihm glihm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good job on extracting the executor @Larkooo.
Let's keep an eye on the unbounded channel behavior.

@glihm glihm merged commit c291c7a into dojoengine:main Oct 3, 2024
15 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants