Skip to content

Commit

Permalink
- fix: async log collection and execution timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
agallardol committed Nov 18, 2024
1 parent b44d051 commit 746476a
Show file tree
Hide file tree
Showing 5 changed files with 137 additions and 94 deletions.
16 changes: 10 additions & 6 deletions libs/shinkai-tools-runner/src/lib.test.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::env;
use std::time::Duration;

use serde_json::json;

Expand Down Expand Up @@ -161,24 +162,28 @@ async fn max_execution_time() {
.is_test(true)
.try_init();
let js_code = r#"
async function run(configurations, parameters) {
async function run() {
let startedAt = Date.now();
const sleepMs = 100;
while (true) {
const elapse = Date.now() - startedAt;
console.log(`while true every ${500}ms, elapse ${elapse} ms`);
console.log(`while true sleeping ${sleepMs}ms, elapse ${elapse} ms`);
await new Promise(async (resolve) => {
setTimeout(() => {
resolve();
}, parameters.timeoutMs);
}, sleepMs);
});
}
return { data: true };
}
"#;
let tool = Tool::new(js_code.to_string(), serde_json::Value::Null, None);
let run_result = tool
.run(None, serde_json::json!({ "timeoutMs": 3200 }), Some(3000))
.run(
None,
serde_json::json!({ "timeoutMs": 100 }),
Some(Duration::from_secs(2)),
)
.await;
assert!(run_result.is_err());
assert!(run_result.err().unwrap().message().contains("timed out"));
Expand Down Expand Up @@ -360,7 +365,6 @@ async fn shinkai_tool_duckduckgo_search() {

#[tokio::test]
async fn shinkai_tool_playwright_example() {

let _ = env_logger::builder()
.filter_level(log::LevelFilter::Info)
.is_test(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use nanoid::nanoid;

use super::execution_context::ExecutionContext;

#[derive(Default)]
#[derive(Default, Clone)]
pub struct DenoExecutionStorage {
pub context: ExecutionContext,
pub code_id: String,
Expand Down
201 changes: 118 additions & 83 deletions libs/shinkai-tools-runner/src/tools/deno_runner.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use tokio::{
io::{AsyncBufReadExt, BufReader},
select,
sync::Mutex,
};

use crate::tools::deno_execution_storage::DenoExecutionStorage;

use super::{container_utils::DockerStatus, deno_runner_options::DenoRunnerOptions};
use std::{collections::HashMap, time::Duration};
use std::{collections::HashMap, sync::Arc, time::Duration};

#[derive(Default)]
pub struct DenoRunner {
Expand Down Expand Up @@ -50,24 +51,24 @@ impl DenoRunner {
&mut self,
code: &str,
envs: Option<HashMap<String, String>>,
max_execution_time_ms: Option<u64>,
max_execution_timeout: Option<Duration>,
) -> anyhow::Result<Vec<String>> {
let force_deno_in_host =
std::env::var("CI_FORCE_DENO_IN_HOST").unwrap_or(String::from("false")) == *"true";
if !force_deno_in_host
&& super::container_utils::is_docker_available() == DockerStatus::Running
{
self.run_in_docker(code, envs, max_execution_time_ms).await
self.run_in_docker(code, envs, max_execution_timeout).await
} else {
self.run_in_host(code, envs, max_execution_time_ms).await
self.run_in_host(code, envs, max_execution_timeout).await
}
}

async fn run_in_docker(
&mut self,
code: &str,
envs: Option<HashMap<String, String>>,
max_execution_time_ms: Option<u64>,
max_execution_timeout: Option<Duration>,
) -> anyhow::Result<Vec<String>> {
log::info!(
"using deno from container image:{:?}",
Expand Down Expand Up @@ -127,45 +128,54 @@ impl DenoRunner {
e
})?;

let stdout = child.stdout.take().expect("failed to get stdout");
let stdout = child.stdout.take().expect("Failed to get stdout");
let mut stdout_stream = BufReader::new(stdout).lines();

let stderr = child.stderr.take().expect("failed to get stderr");
let stderr = child.stderr.take().expect("Failed to get stderr");
let mut stderr_stream = BufReader::new(stderr).lines();

let mut stdout_lines = Vec::new();
let mut stderr_lines = Vec::new();

loop {
select! {
Ok(line) = stdout_stream.next_line() => match line {
Some(line) => {
stdout_lines.push(line.clone());
let _ = execution_storage.append_log(line.as_str());
},
None => break,
},
Ok(line) = stderr_stream.next_line() => match line {
Some(line) => {
stderr_lines.push(line.clone());
let _ = execution_storage.append_log(line.as_str());
},
None => break,
},
else => break,
}
}
let stdout_lines = Arc::new(Mutex::new(Vec::<String>::new()));
let stderr_lines = Arc::new(Mutex::new(Vec::<String>::new()));
let execution_storage_clone = execution_storage.clone();

let output = if let Some(timeout) = max_execution_time_ms {
let timeout_duration = std::time::Duration::from_millis(timeout);
log::info!("executing command with {}ms timeout", timeout);
match tokio::time::timeout(timeout_duration, child.wait_with_output()).await {
let stdout_lines_clone = stdout_lines.clone();
let stderr_lines_clone = stderr_lines.clone();
let execution_storage_clone2 = execution_storage_clone.clone();

let stdout_task = tokio::task::spawn_blocking(move || {
tokio::runtime::Runtime::new().unwrap().block_on(async {
while let Ok(Some(line)) = stdout_stream.next_line().await {
println!("{}", line);
stdout_lines_clone.lock().await.push(line.clone());
let _ = execution_storage_clone.append_log(line.as_str());
}
});
});

let stderr_task = tokio::task::spawn_blocking(move || {
tokio::runtime::Runtime::new().unwrap().block_on(async {
while let Ok(Some(line)) = stderr_stream.next_line().await {
println!("{}", line);
stderr_lines_clone.lock().await.push(line.clone());
let _ = execution_storage_clone2.append_log(line.as_str());
}
});
});

#[allow(clippy::let_underscore_future)]
let _ = tokio::spawn(async move {
let _ = futures::future::join_all(vec![stdout_task, stderr_task]).await;
});

let output = if let Some(timeout) = max_execution_timeout {
log::info!("executing command with {}[s] timeout", timeout.as_secs());
match tokio::time::timeout(timeout, child.wait_with_output()).await {
Ok(result) => result?,
Err(_) => {
log::error!("command execution timed out after {}ms", timeout);
log::error!("command execution timed out after {}[s]", timeout.as_secs());
return Err(anyhow::anyhow!(
"process timed out after {} seconds",
timeout
"process timed out after {}[s]",
timeout.as_secs()
));
}
}
Expand All @@ -174,23 +184,24 @@ impl DenoRunner {
child.wait_with_output().await?
};
if !output.status.success() {
let error = String::from_utf8_lossy(&output.stdout);
log::error!("command execution failed: {}", error);
return Err(anyhow::anyhow!("command execution failed: {}", error));
let stderr = stderr_lines.lock().await.to_vec().join("\n");
log::error!("command execution failed: {}", stderr);
return Err(anyhow::Error::new(std::io::Error::new(
std::io::ErrorKind::Other,
stderr.to_string(),
)));
}

log::debug!(
"command completed successfully with output: {:?}",
stdout_lines
);
Ok(stdout_lines)
let stdout: Vec<String> = stdout_lines.lock().await.to_vec();
log::info!("command completed successfully with output: {:?}", stdout);
Ok(stdout)
}

async fn run_in_host(
&mut self,
code: &str,
envs: Option<HashMap<String, String>>,
max_execution_time_ms: Option<u64>,
max_execution_timeout: Option<Duration>,
) -> anyhow::Result<Vec<String>> {
log::info!(
"using deno from host at path: {:?}",
Expand All @@ -204,14 +215,20 @@ impl DenoRunner {
let home_permissions =
format!("--allow-write={}", execution_storage.home.to_string_lossy());
let deno_permissions_host: Vec<&str> = vec![
// Basically all non-file related permissions
"--allow-env",
"--allow-run",
"--allow-net",
"--allow-sys",
"--allow-scripts",
"--allow-ffi",
"--allow-import",

// Engine folders
"--allow-read=.",
"--allow-write=./home",

// Playwright/Chrome folders
"--allow-write=/var/folders",
"--allow-read=/var/folders",
"--allow-read=/Applications/Google Chrome.app/Contents/MacOS/Google Chrome",
Expand All @@ -223,11 +240,18 @@ impl DenoRunner {
home_permissions.as_str(),
];

let mut command = tokio::process::Command::new(binary_path);
let mut command = tokio::process::Command::new(binary_path.canonicalize().unwrap());
let command = command
.args(["run", "--ext", "ts"])
.args(deno_permissions_host)
.arg(execution_storage.code_entrypoint.clone())
.arg(
execution_storage
.code_entrypoint
.canonicalize()
.unwrap()
.clone(),
)
.current_dir(execution_storage.root.canonicalize().unwrap().clone())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.kill_on_drop(true);
Expand All @@ -237,46 +261,59 @@ impl DenoRunner {
command.envs(envs);
}
log::info!("prepared command with arguments: {:?}", command);
let mut child = command.spawn()?;
let mut child = command.spawn().map_err(|e| {
log::error!("failed to spawn command: {}", e);
e
})?;

let stdout = child.stdout.take().expect("Failed to get stdout");
let mut stdout_stream = BufReader::new(stdout).lines();

let stderr = child.stderr.take().expect("Failed to get stderr");
let mut stderr_stream = BufReader::new(stderr).lines();

let mut stdout_lines = Vec::new();
let mut stderr_lines = Vec::new();
loop {
select! {
Ok(line) = stdout_stream.next_line() => match line {
Some(line) => {
stdout_lines.push(line.clone());
let _ = execution_storage.append_log(line.as_str());
},
None => break,
},
Ok(line) = stderr_stream.next_line() => match line {
Some(line) => {
stderr_lines.push(line.clone());
let _ = execution_storage.append_log(line.as_str());
},
None => break,
},
else => break,
}
}
let stdout_lines = Arc::new(Mutex::new(Vec::<String>::new()));
let stderr_lines = Arc::new(Mutex::new(Vec::<String>::new()));
let execution_storage_clone = execution_storage.clone();

let stdout_lines_clone = stdout_lines.clone();
let stderr_lines_clone = stderr_lines.clone();
let execution_storage_clone2 = execution_storage_clone.clone();

let output = if let Some(timeout) = max_execution_time_ms {
let timeout_duration = std::time::Duration::from_millis(timeout);
log::info!("executing command with {}ms timeout", timeout);
match tokio::time::timeout(timeout_duration, child.wait_with_output()).await {
let stdout_task = tokio::task::spawn_blocking(move || {
tokio::runtime::Runtime::new().unwrap().block_on(async {
while let Ok(Some(line)) = stdout_stream.next_line().await {
println!("{}", line);
stdout_lines_clone.lock().await.push(line.clone());
let _ = execution_storage_clone.append_log(line.as_str());
}
});
});

let stderr_task = tokio::task::spawn_blocking(move || {
tokio::runtime::Runtime::new().unwrap().block_on(async {
while let Ok(Some(line)) = stderr_stream.next_line().await {
println!("{}", line);
stderr_lines_clone.lock().await.push(line.clone());
let _ = execution_storage_clone2.append_log(line.as_str());
}
});
});

#[allow(clippy::let_underscore_future)]
let _ = tokio::spawn(async move {
let _ = futures::future::join_all(vec![stdout_task, stderr_task]).await;
});

let output = if let Some(timeout) = max_execution_timeout {
log::info!("executing command with {}[s] timeout", timeout.as_secs());
match tokio::time::timeout(timeout, child.wait_with_output()).await {
Ok(result) => result?,
Err(_) => {
log::error!("command execution timed out after {}ms", timeout);
log::error!("command execution timed out after {}[s]", timeout.as_secs());
return Err(anyhow::Error::new(std::io::Error::new(
std::io::ErrorKind::TimedOut,
format!("process timed out after {} seconds", timeout),
format!("process timed out after {}[s]", timeout.as_secs()),
)));
}
}
Expand All @@ -285,18 +322,16 @@ impl DenoRunner {
child.wait_with_output().await?
};
if !output.status.success() {
let error = String::from_utf8_lossy(&output.stderr);
log::error!("command execution failed: {}", error);
let stderr = stderr_lines.lock().await.to_vec().join("\n");
log::error!("command execution failed: {}", stderr);
return Err(anyhow::Error::new(std::io::Error::new(
std::io::ErrorKind::Other,
error.to_string(),
stderr.to_string(),
)));
}
log::info!(
"command completed successfully with output: {:?}",
stdout_lines
);
Ok(stdout_lines)
let stdout: Vec<String> = stdout_lines.lock().await.to_vec();
log::info!("command completed successfully with output: {:?}", stdout);
Ok(stdout)
}
}

Expand Down
6 changes: 3 additions & 3 deletions libs/shinkai-tools-runner/src/tools/tool.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::{collections::HashMap, time::Duration};

use serde_json::Value;

Expand Down Expand Up @@ -76,7 +76,7 @@ impl Tool {
&self,
envs: Option<HashMap<String, String>>,
parameters: Value,
max_execution_time_ms: Option<u64>,
max_execution_timeout: Option<Duration>,
) -> Result<RunResult, ExecutionError> {
log::info!("preparing to run tool");
log::info!("configurations: {}", self.configurations.to_string());
Expand All @@ -103,7 +103,7 @@ impl Tool {
.replace("\\", "\\\\"),
);
let result = deno_runner
.run(&code, envs, max_execution_time_ms)
.run(&code, envs, max_execution_timeout)
.await
.map_err(|e| ExecutionError::new(format!("failed to run deno: {}", e), None))?;

Expand Down
Loading

0 comments on commit 746476a

Please sign in to comment.