From ffa5db4968fb1bba790663c8071c51d85c30e3c9 Mon Sep 17 00:00:00 2001 From: Tobias Schoofs Date: Thu, 1 Feb 2024 13:52:56 +0000 Subject: [PATCH 1/9] [feat/issue10] chat completions status endpoint sends default json response --- crates/edgen_server/src/lib.rs | 12 ++++ crates/edgen_server/src/misc.rs | 2 - crates/edgen_server/src/status.rs | 116 ++++++++++++++++++++++++++++++ 3 files changed, 128 insertions(+), 2 deletions(-) create mode 100644 crates/edgen_server/src/status.rs diff --git a/crates/edgen_server/src/lib.rs b/crates/edgen_server/src/lib.rs index 870d167..f2dfb5f 100644 --- a/crates/edgen_server/src/lib.rs +++ b/crates/edgen_server/src/lib.rs @@ -44,6 +44,7 @@ pub mod graceful_shutdown; mod llm; mod model; pub mod openai_shim; +pub mod status; pub mod util; mod whisper; @@ -173,15 +174,26 @@ async fn start_server(args: &cli::Serve) -> EdgenResult { async fn run_server(args: &cli::Serve) -> bool { let http_app = Router::new() + // -- AI endpoints ----------------------------------------------------- + // ---- Chat ----------------------------------------------------------- .route( "/v1/chat/completions", axum::routing::post(openai_shim::chat_completions), ) + // ---- Audio ---------------------------------------------------------- .route( "/v1/audio/transcriptions", axum::routing::post(openai_shim::create_transcription), ) + // -- AI status endpoints ---------------------------------------------- + // ---- Chat ----------------------------------------------------------- + .route( + "/v1/chat/completions/status", + axum::routing::get(status::chat_completions_status), + ) + // -- Miscellaneous services ------------------------------------------- .route("/v1/misc/version", axum::routing::get(misc::edgen_version)) + .layer(CorsLayer::permissive()); let uri_vector = if !args.uri.is_empty() { diff --git a/crates/edgen_server/src/misc.rs b/crates/edgen_server/src/misc.rs index 30ed001..9c6ce7a 100644 --- a/crates/edgen_server/src/misc.rs +++ b/crates/edgen_server/src/misc.rs @@ -37,8 +37,6 @@ pub struct Version { /// GET `/v1/version`: returns the current version of edgend. /// -/// [openai]: https://platform.edgen.io/docs/api-reference/version -/// /// The version is returned as json value with major, minor and patch as integer /// and build as string (which may be empty). /// For any error, the version endpoint returns "internal server error". diff --git a/crates/edgen_server/src/status.rs b/crates/edgen_server/src/status.rs new file mode 100644 index 0000000..7301bed --- /dev/null +++ b/crates/edgen_server/src/status.rs @@ -0,0 +1,116 @@ +/* Copyright 2023- The Binedge, Lda team. All rights reserved. + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//! Edgen AI service status. + +use axum::http::StatusCode; +use axum::response::{IntoResponse, Json, Response}; +use once_cell::sync::Lazy; +use serde::{Deserialize, Serialize}; +use tokio::sync::{RwLock}; +use utoipa::ToSchema; + +/// Recent Activity on a specific endpoint, e.g. Completions or Download. +#[derive(ToSchema, Deserialize, Serialize, Clone, Debug, PartialEq, Eq)] +pub enum Activity { + /// Last activity was ChatCompletions + ChatCompletions, + /// Last activity was AudioTranscriptions + AudioTranscriptions, + /// Last activity was a model download + Download, + /// No known activity was recently performed + Unknown, +} + +/// Result of the last activity on a specific endpoint, e.g. Success or Failed. +#[derive(ToSchema, Deserialize, Serialize, Clone, Debug, PartialEq, Eq)] +pub enum ActivityResult { + /// Last activity finished successfully + Success, + /// Last activity failed + Failed, + /// Result of the activity is unkown or there was no activity + Unknown, +} + +/// Current Endpoint status. +#[derive(ToSchema, Deserialize, Serialize, Clone, Debug, PartialEq, Eq)] +pub struct AIStatus { + active_model: String, + last_activity: Activity, + last_activity_result: ActivityResult, + completions_ongoing: bool, + download_ongoing: bool, + last_errors: Vec, +} + +impl Default for AIStatus { + fn default() -> AIStatus { + AIStatus { + active_model: "unknown".to_string(), + last_activity: Activity::Unknown, + last_activity_result: ActivityResult::Unknown, + completions_ongoing: false, + download_ongoing: false, + last_errors: vec![], + } + } +} + +/// Get a protected chat completions status. +/// Call read() or write() on the returned value to get either read or write access. +pub fn get_chat_completions_status() -> &'static RwLock { + &AISTATES.endpoints[EP_CHAT_COMPLETIONS] +} + +/// Get a protected audio transcriptions status. +/// Call read() or write() on the returned value to get either read or write access. +pub fn get_audio_transcriptions_status() -> &'static RwLock { + &AISTATES.endpoints[EP_AUDIO_TRANSCRIPTIONS] +} + +// axum provides shared state but using this shared state would force us +// to pass the state on to all function that may change the state. +static AISTATES: Lazy = Lazy::new(Default::default); + +const EP_CHAT_COMPLETIONS: usize = 0; +const EP_AUDIO_TRANSCRIPTIONS: usize = 1; + +struct AIStates { + endpoints: Vec>, +} + +impl Default for AIStates { + fn default() -> AIStates { + AIStates { + endpoints: vec![RwLock::new(Default::default()), + RwLock::new(Default::default())] + } + } +} + +/// GET `/v1/chat/completions/status`: returns the current status of the /chat/completions endpoint. +/// +/// The status is returned as json value AIStatus. +/// For any error, the version endpoint returns "internal server error". +pub async fn chat_completions_status() -> Response { + let rwstate = get_chat_completions_status(); + let locked = rwstate.read().await; + Json(locked.clone()).into_response() +} + +#[allow(dead_code)] +fn internal_server_error(msg: &str) -> Response { + eprintln!("[ERROR] {}", msg); + StatusCode::INTERNAL_SERVER_ERROR.into_response() +} From 8254dad9ecaf5797098afb5761ff49f44a2778a9 Mon Sep 17 00:00:00 2001 From: Tobias Schoofs Date: Thu, 1 Feb 2024 20:14:47 +0000 Subject: [PATCH 2/9] [feat/issue10] progress mechanism working --- crates/edgen_server/src/model.rs | 22 +++++-- crates/edgen_server/src/status.rs | 105 ++++++++++++++++++++++++++---- 2 files changed, 112 insertions(+), 15 deletions(-) diff --git a/crates/edgen_server/src/model.rs b/crates/edgen_server/src/model.rs index 20c65af..0282be6 100644 --- a/crates/edgen_server/src/model.rs +++ b/crates/edgen_server/src/model.rs @@ -82,11 +82,25 @@ impl Model { .build() .map_err(move |e| ModelError::API(e.to_string()))?; let api = api.model(self.repo.to_string()); - let path = api - .get(&self.name) - .map_err(move |e| ModelError::API(e.to_string()))?; - self.path = path; + // progress observer + let progress_handle = crate::status::observe_chat_completions_progress(&self.dir).await; + + let name = self.name.clone(); + let download_handle = tokio::spawn(async move { + crate::status::set_chat_completions_download(true).await; + let path = api + .get(&name) + .map_err(move |e| ModelError::API(e.to_string())); + crate::status::set_chat_completions_progress(100.0).await; + crate::status::set_chat_completions_download(false).await; + return path; + }); + + let _ = progress_handle.await.unwrap(); + let path = download_handle.await.unwrap(); + + self.path = path?; self.preloaded = true; Ok(()) diff --git a/crates/edgen_server/src/status.rs b/crates/edgen_server/src/status.rs index 7301bed..2689071 100644 --- a/crates/edgen_server/src/status.rs +++ b/crates/edgen_server/src/status.rs @@ -12,6 +12,8 @@ //! Edgen AI service status. +use std::path::PathBuf; + use axum::http::StatusCode; use axum::response::{IntoResponse, Json, Response}; use once_cell::sync::Lazy; @@ -44,13 +46,14 @@ pub enum ActivityResult { } /// Current Endpoint status. -#[derive(ToSchema, Deserialize, Serialize, Clone, Debug, PartialEq, Eq)] +#[derive(ToSchema, Deserialize, Serialize, Clone, Debug, PartialEq)] pub struct AIStatus { active_model: String, last_activity: Activity, last_activity_result: ActivityResult, completions_ongoing: bool, download_ongoing: bool, + download_progress: f64, last_errors: Vec, } @@ -62,6 +65,7 @@ impl Default for AIStatus { last_activity_result: ActivityResult::Unknown, completions_ongoing: false, download_ongoing: false, + download_progress: 0.0, last_errors: vec![], } } @@ -79,6 +83,36 @@ pub fn get_audio_transcriptions_status() -> &'static RwLock { &AISTATES.endpoints[EP_AUDIO_TRANSCRIPTIONS] } +/// Set download ongoing +pub async fn set_chat_completions_download(ongoing: bool) { + let rwstate = get_chat_completions_status(); + let mut state = rwstate.write().await; + state.download_ongoing = ongoing; +} + +/// Set download progress +pub async fn set_chat_completions_progress(progress: f64) { + let rwstate = get_chat_completions_status(); + let mut state = rwstate.write().await; + state.download_progress = progress; +} + +/// Set download progress +pub async fn observe_chat_completions_progress(tempdir: &PathBuf) -> tokio::task::JoinHandle<()> { + observe_progress(tempdir).await +} + + +/// GET `/v1/chat/completions/status`: returns the current status of the /chat/completions endpoint. +/// +/// The status is returned as json value AIStatus. +/// For any error, the version endpoint returns "internal server error". +pub async fn chat_completions_status() -> Response { + let rwstate = get_chat_completions_status(); + let locked = rwstate.read().await; + Json(locked.clone()).into_response() +} + // axum provides shared state but using this shared state would force us // to pass the state on to all function that may change the state. static AISTATES: Lazy = Lazy::new(Default::default); @@ -99,18 +133,67 @@ impl Default for AIStates { } } -/// GET `/v1/chat/completions/status`: returns the current status of the /chat/completions endpoint. -/// -/// The status is returned as json value AIStatus. -/// For any error, the version endpoint returns "internal server error". -pub async fn chat_completions_status() -> Response { - let rwstate = get_chat_completions_status(); - let locked = rwstate.read().await; - Json(locked.clone()).into_response() -} - #[allow(dead_code)] fn internal_server_error(msg: &str) -> Response { eprintln!("[ERROR] {}", msg); StatusCode::INTERNAL_SERVER_ERROR.into_response() } + +// This is a mess +async fn observe_progress(tempfile: &PathBuf) -> tokio::task::JoinHandle<()> { + let tmp = tempfile.join("tmp"); + let progress_handle = tokio::spawn(async move { + let mut d = tokio::fs::metadata(&tmp).await; + for _ in 0 .. 3 { + if d.is_ok() { + break; + }; + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + d = tokio::fs::metadata(&tmp).await; + }; + if d.is_err() { + return; + }; + tokio::time::sleep(std::time::Duration::from_millis(500)).await; // sloppy + let mut t = None; + for _ in 0 .. 10 { + let es = std::fs::read_dir(&tmp).unwrap(); // .await.unwrap(); + let mut e = None; + for x in es { + let y = x.unwrap(); + println!("file: {:?}", y.path()); + e = Some(y); + break; + }; + if e.is_some() { + t = e; + break; + } + }; + + if t.is_none() { + return; + } + let f = t.unwrap(); + + let mut m = tokio::fs::metadata(&f.path()).await; + for _ in 0 .. 3 { + if m.is_ok() { + break; + }; + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + m = tokio::fs::metadata(&f.path()).await; + }; + while m.is_ok() { + let z = m.unwrap(); + let s = z.len() as f64; + let t = 1173610336 as f64; + let p = (s * 100.0) / t; + crate::status::set_chat_completions_progress(p).await; + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + m = tokio::fs::metadata(&f.path()).await; + } + }); + + progress_handle +} From 2581dbc5540081ea0a693427c9964b771c7f192a Mon Sep 17 00:00:00 2001 From: Tobias Schoofs Date: Mon, 5 Feb 2024 11:30:08 +0000 Subject: [PATCH 3/9] [feat/issue10] file size and download flag --- Cargo.lock | 1 + crates/edgen_server/Cargo.toml | 1 + crates/edgen_server/src/model.rs | 29 +++++++++++++++++++---- crates/edgen_server/src/status.rs | 39 +++++++++++++++++++++++-------- 4 files changed, 56 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 5c797a5..b9e9c86 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1405,6 +1405,7 @@ dependencies = [ "levenshtein", "once_cell", "pin-project", + "reqwest", "rubato", "serde", "serde_derive", diff --git a/crates/edgen_server/Cargo.toml b/crates/edgen_server/Cargo.toml index 3c50598..75b32cb 100644 --- a/crates/edgen_server/Cargo.toml +++ b/crates/edgen_server/Cargo.toml @@ -20,6 +20,7 @@ hyper = { workspace = true } hyper-util = { workspace = true } once_cell = { workspace = true } pin-project = { workspace = true } +reqwest = { workspace = true } rubato = "0.14.1" serde = { workspace = true } serde_derive = { workspace = true } diff --git a/crates/edgen_server/src/model.rs b/crates/edgen_server/src/model.rs index 0282be6..f56aebd 100644 --- a/crates/edgen_server/src/model.rs +++ b/crates/edgen_server/src/model.rs @@ -84,16 +84,27 @@ impl Model { let api = api.model(self.repo.to_string()); // progress observer - let progress_handle = crate::status::observe_chat_completions_progress(&self.dir).await; + let download = hf_hub::Cache::new(self.dir.clone()) + .model(self.repo.to_string()) + .get(&self.name) + .is_none(); + let size = self.get_size(&api).await; + let progress_handle = crate::status::observe_chat_completions_progress(&self.dir, size, download).await; let name = self.name.clone(); let download_handle = tokio::spawn(async move { - crate::status::set_chat_completions_download(true).await; + if download { + crate::status::set_chat_completions_download(true).await; + } + let path = api .get(&name) .map_err(move |e| ModelError::API(e.to_string())); - crate::status::set_chat_completions_progress(100.0).await; - crate::status::set_chat_completions_download(false).await; + + if download { + crate::status::set_chat_completions_progress(100).await; + crate::status::set_chat_completions_download(false).await; + } return path; }); @@ -106,6 +117,16 @@ impl Model { Ok(()) } + async fn get_size(&self, api: &hf_hub::api::sync::ApiRepo) -> Option { + let metadata = reqwest::Client::new() + .get(api.url(&self.name)) + .header("Content-Range", "bytes 0-0") + .header("Range", "bytes 0-0") + .send() + .await.unwrap(); + return metadata.content_length(); + } + /// Returns a [`PathBuf`] pointing to the local model file. pub fn file_path(&self) -> Result { if self.preloaded { diff --git a/crates/edgen_server/src/status.rs b/crates/edgen_server/src/status.rs index 2689071..d1d10b8 100644 --- a/crates/edgen_server/src/status.rs +++ b/crates/edgen_server/src/status.rs @@ -19,6 +19,7 @@ use axum::response::{IntoResponse, Json, Response}; use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; use tokio::sync::{RwLock}; +use tracing::{warn, info}; use utoipa::ToSchema; /// Recent Activity on a specific endpoint, e.g. Completions or Download. @@ -53,7 +54,7 @@ pub struct AIStatus { last_activity_result: ActivityResult, completions_ongoing: bool, download_ongoing: bool, - download_progress: f64, + download_progress: u64, last_errors: Vec, } @@ -65,7 +66,7 @@ impl Default for AIStatus { last_activity_result: ActivityResult::Unknown, completions_ongoing: false, download_ongoing: false, - download_progress: 0.0, + download_progress: 0, last_errors: vec![], } } @@ -85,21 +86,26 @@ pub fn get_audio_transcriptions_status() -> &'static RwLock { /// Set download ongoing pub async fn set_chat_completions_download(ongoing: bool) { + if ongoing { + info!("starting model download"); + } else { + info!("model download finished"); + }; let rwstate = get_chat_completions_status(); let mut state = rwstate.write().await; state.download_ongoing = ongoing; } /// Set download progress -pub async fn set_chat_completions_progress(progress: f64) { +pub async fn set_chat_completions_progress(progress: u64) { let rwstate = get_chat_completions_status(); let mut state = rwstate.write().await; state.download_progress = progress; } -/// Set download progress -pub async fn observe_chat_completions_progress(tempdir: &PathBuf) -> tokio::task::JoinHandle<()> { - observe_progress(tempdir).await +/// Observe download progress +pub async fn observe_chat_completions_progress(tempdir: &PathBuf, size: Option, download: bool) -> tokio::task::JoinHandle<()> { + observe_progress(tempdir, size, download).await } @@ -140,9 +146,21 @@ fn internal_server_error(msg: &str) -> Response { } // This is a mess -async fn observe_progress(tempfile: &PathBuf) -> tokio::task::JoinHandle<()> { +async fn observe_progress(tempfile: &PathBuf, size: Option, download: bool) -> tokio::task::JoinHandle<()> { let tmp = tempfile.join("tmp"); + let progress_handle = tokio::spawn(async move { + if !download { + info!("progress observer: no download necessary, file is already there"); + return; + } + + if size.is_none() { + warn!("progress observer: unknown file size. No progress reported on download"); + return; + } + let size = size.unwrap(); + let mut d = tokio::fs::metadata(&tmp).await; for _ in 0 .. 3 { if d.is_ok() { @@ -186,9 +204,10 @@ async fn observe_progress(tempfile: &PathBuf) -> tokio::task::JoinHandle<()> { }; while m.is_ok() { let z = m.unwrap(); - let s = z.len() as f64; - let t = 1173610336 as f64; - let p = (s * 100.0) / t; + let s = z.len() as u64; + let t = size; + let p = (s * 100) / t; + crate::status::set_chat_completions_progress(p).await; tokio::time::sleep(std::time::Duration::from_secs(1)).await; m = tokio::fs::metadata(&f.path()).await; From 00c08dfaf6584f80c399ff3d7903bcbcefb2f793 Mon Sep 17 00:00:00 2001 From: Tobias Schoofs Date: Mon, 5 Feb 2024 13:50:01 +0000 Subject: [PATCH 4/9] [feat/issue10] code improvement --- crates/edgen_server/src/model.rs | 3 + crates/edgen_server/src/status.rs | 134 ++++++++++++++++++++---------- 2 files changed, 91 insertions(+), 46 deletions(-) diff --git a/crates/edgen_server/src/model.rs b/crates/edgen_server/src/model.rs index f56aebd..3c0f120 100644 --- a/crates/edgen_server/src/model.rs +++ b/crates/edgen_server/src/model.rs @@ -66,6 +66,7 @@ impl Model { preloaded: false, } } + /// Checks if a file of the model is already present locally, and if not, downloads it. pub async fn preload(&mut self) -> Result<(), ModelError> { if self.path.is_file() { @@ -105,6 +106,7 @@ impl Model { crate::status::set_chat_completions_progress(100).await; crate::status::set_chat_completions_download(false).await; } + return path; }); @@ -117,6 +119,7 @@ impl Model { Ok(()) } + // get size of the remote file when we download. async fn get_size(&self, api: &hf_hub::api::sync::ApiRepo) -> Option { let metadata = reqwest::Client::new() .get(api.url(&self.name)) diff --git a/crates/edgen_server/src/status.rs b/crates/edgen_server/src/status.rs index d1d10b8..d85b1cf 100644 --- a/crates/edgen_server/src/status.rs +++ b/crates/edgen_server/src/status.rs @@ -12,14 +12,17 @@ //! Edgen AI service status. +use std::collections::VecDeque; +use std::error::Error; use std::path::PathBuf; +use std::time::{Instant, Duration}; use axum::http::StatusCode; use axum::response::{IntoResponse, Json, Response}; use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; use tokio::sync::{RwLock}; -use tracing::{warn, info}; +use tracing::{error, warn, info}; use utoipa::ToSchema; /// Recent Activity on a specific endpoint, e.g. Completions or Download. @@ -55,7 +58,7 @@ pub struct AIStatus { completions_ongoing: bool, download_ongoing: bool, download_progress: u64, - last_errors: Vec, + last_errors: VecDeque, } impl Default for AIStatus { @@ -67,7 +70,7 @@ impl Default for AIStatus { completions_ongoing: false, download_ongoing: false, download_progress: 0, - last_errors: vec![], + last_errors: VecDeque::from([]), } } } @@ -104,8 +107,20 @@ pub async fn set_chat_completions_progress(progress: u64) { } /// Observe download progress -pub async fn observe_chat_completions_progress(tempdir: &PathBuf, size: Option, download: bool) -> tokio::task::JoinHandle<()> { - observe_progress(tempdir, size, download).await +pub async fn observe_chat_completions_progress(datadir: &PathBuf, size: Option, download: bool) -> tokio::task::JoinHandle<()> { + observe_progress(datadir, size, download).await +} + +/// Add an error to the last errors +pub async fn add_chat_completions_error(e: E) +where E: Error +{ + let rwstate = get_chat_completions_status(); + let mut state = rwstate.write().await; + if state.last_errors.len() > 32 { + state.last_errors.pop_front(); + } + state.last_errors.push_back(format!("{:?}", e)); } @@ -145,9 +160,17 @@ fn internal_server_error(msg: &str) -> Response { StatusCode::INTERNAL_SERVER_ERROR.into_response() } -// This is a mess -async fn observe_progress(tempfile: &PathBuf, size: Option, download: bool) -> tokio::task::JoinHandle<()> { - let tmp = tempfile.join("tmp"); +// helper function to observe download progress. +// It spawns a new tokio task which +// - waits for the tmp directory to appear in dir +// - waits for the tempfile to appear in that directory +// - repeatedly reads the size of this tempfile +// - calculates the percentage relative to size +// - sets the percentage in the status.download_progress +// - until the tempfile disappears or no progress was made for 1 minute. +// TODO: This code should go to the module manager. +async fn observe_progress(datadir: &PathBuf, size: Option, download: bool) -> tokio::task::JoinHandle<()> { + let tmp = datadir.join("tmp"); let progress_handle = tokio::spawn(async move { if !download { @@ -161,58 +184,77 @@ async fn observe_progress(tempfile: &PathBuf, size: Option, download: bool) } let size = size.unwrap(); - let mut d = tokio::fs::metadata(&tmp).await; - for _ in 0 .. 3 { - if d.is_ok() { - break; - }; - tokio::time::sleep(std::time::Duration::from_millis(10)).await; - d = tokio::fs::metadata(&tmp).await; - }; - if d.is_err() { + if !have_tempdir(&tmp).await { return; - }; - tokio::time::sleep(std::time::Duration::from_millis(500)).await; // sloppy - let mut t = None; - for _ in 0 .. 10 { - let es = std::fs::read_dir(&tmp).unwrap(); // .await.unwrap(); - let mut e = None; - for x in es { - let y = x.unwrap(); - println!("file: {:?}", y.path()); - e = Some(y); - break; - }; - if e.is_some() { - t = e; - break; - } - }; - + } + + let t = wait_for_tempfile(&tmp).await; if t.is_none() { return; } + let f = t.unwrap(); let mut m = tokio::fs::metadata(&f.path()).await; - for _ in 0 .. 3 { - if m.is_ok() { - break; - }; - tokio::time::sleep(std::time::Duration::from_millis(10)).await; - m = tokio::fs::metadata(&f.path()).await; - }; + let mut last_size = 0; + let mut timestamp = Instant::now(); while m.is_ok() { - let z = m.unwrap(); - let s = z.len() as u64; + let s = m.unwrap().len() as u64; let t = size; let p = (s * 100) / t; - crate::status::set_chat_completions_progress(p).await; - tokio::time::sleep(std::time::Duration::from_secs(1)).await; + if size > last_size { + last_size = size; + timestamp = Instant::now(); + } else if Instant::now().duration_since(timestamp) > Duration::from_secs(60) { + warn!("progress observer: no download progress in a minute. Giving up"); + return; + }; + + set_chat_completions_progress(p).await; + tokio::time::sleep(std::time::Duration::from_millis(500)).await; m = tokio::fs::metadata(&f.path()).await; } }); progress_handle } + +async fn have_tempdir(tmp: &PathBuf) -> bool { + let mut d = tokio::fs::metadata(&tmp).await; + for _ in 0 .. 10 { + if d.is_ok() { + break; + }; + tokio::time::sleep(std::time::Duration::from_millis(10)).await; + d = tokio::fs::metadata(&tmp).await; + }; + if d.is_err() { + error!("progress observer: can't read tmp directory ({:?}). Giving up", d); + add_chat_completions_error(d.unwrap_err()).await; + return false; + }; + + true +} + +// TODO: we use the first file we find in the tmp directory. +// we should instead *know* the name of the file. +async fn wait_for_tempfile(tmp: &PathBuf) -> Option { + for _ in 0 .. 30 { + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + let es = std::fs::read_dir(&tmp); + if es.is_err() { + error!("progress observer: cannot read tmp directory ({:?}). Giving up", es); + add_chat_completions_error(es.unwrap_err()).await; + return None; + }; + for e in es.unwrap() { + if e.is_ok() { + return Some(e.unwrap()); + } + }; + }; + + None +} From c6cff0279ce64ebf675ffcc58c2d0042c84368f8 Mon Sep 17 00:00:00 2001 From: Tobias Schoofs Date: Mon, 5 Feb 2024 13:59:14 +0000 Subject: [PATCH 5/9] [feat/issue10] code fmt'd --- crates/edgen_server/src/lib.rs | 1 - crates/edgen_server/src/model.rs | 14 ++-- crates/edgen_server/src/status.rs | 132 +++++++++++++++++------------- 3 files changed, 83 insertions(+), 64 deletions(-) diff --git a/crates/edgen_server/src/lib.rs b/crates/edgen_server/src/lib.rs index f2dfb5f..8861f58 100644 --- a/crates/edgen_server/src/lib.rs +++ b/crates/edgen_server/src/lib.rs @@ -193,7 +193,6 @@ async fn run_server(args: &cli::Serve) -> bool { ) // -- Miscellaneous services ------------------------------------------- .route("/v1/misc/version", axum::routing::get(misc::edgen_version)) - .layer(CorsLayer::permissive()); let uri_vector = if !args.uri.is_empty() { diff --git a/crates/edgen_server/src/model.rs b/crates/edgen_server/src/model.rs index 3c0f120..1929115 100644 --- a/crates/edgen_server/src/model.rs +++ b/crates/edgen_server/src/model.rs @@ -16,6 +16,8 @@ use serde_derive::Serialize; use thiserror::Error; use utoipa::ToSchema; +use crate::status; + #[derive(Serialize, Error, ToSchema, Debug)] pub enum ModelError { #[error("the provided model file name does does not exist, or isn't a file: ({0})")] @@ -90,12 +92,13 @@ impl Model { .get(&self.name) .is_none(); let size = self.get_size(&api).await; - let progress_handle = crate::status::observe_chat_completions_progress(&self.dir, size, download).await; + let progress_handle = + status::observe_chat_completions_progress(&self.dir, size, download).await; let name = self.name.clone(); let download_handle = tokio::spawn(async move { if download { - crate::status::set_chat_completions_download(true).await; + status::set_chat_completions_download(true).await; } let path = api @@ -103,8 +106,8 @@ impl Model { .map_err(move |e| ModelError::API(e.to_string())); if download { - crate::status::set_chat_completions_progress(100).await; - crate::status::set_chat_completions_download(false).await; + status::set_chat_completions_progress(100).await; + status::set_chat_completions_download(false).await; } return path; @@ -126,7 +129,8 @@ impl Model { .header("Content-Range", "bytes 0-0") .header("Range", "bytes 0-0") .send() - .await.unwrap(); + .await + .unwrap(); return metadata.content_length(); } diff --git a/crates/edgen_server/src/status.rs b/crates/edgen_server/src/status.rs index d85b1cf..7cd7130 100644 --- a/crates/edgen_server/src/status.rs +++ b/crates/edgen_server/src/status.rs @@ -15,14 +15,14 @@ use std::collections::VecDeque; use std::error::Error; use std::path::PathBuf; -use std::time::{Instant, Duration}; +use std::time::{Duration, Instant}; use axum::http::StatusCode; use axum::response::{IntoResponse, Json, Response}; use once_cell::sync::Lazy; use serde::{Deserialize, Serialize}; -use tokio::sync::{RwLock}; -use tracing::{error, warn, info}; +use tokio::sync::RwLock; +use tracing::{error, info, warn}; use utoipa::ToSchema; /// Recent Activity on a specific endpoint, e.g. Completions or Download. @@ -107,13 +107,18 @@ pub async fn set_chat_completions_progress(progress: u64) { } /// Observe download progress -pub async fn observe_chat_completions_progress(datadir: &PathBuf, size: Option, download: bool) -> tokio::task::JoinHandle<()> { +pub async fn observe_chat_completions_progress( + datadir: &PathBuf, + size: Option, + download: bool, +) -> tokio::task::JoinHandle<()> { observe_progress(datadir, size, download).await } /// Add an error to the last errors pub async fn add_chat_completions_error(e: E) -where E: Error +where + E: Error, { let rwstate = get_chat_completions_status(); let mut state = rwstate.write().await; @@ -123,7 +128,6 @@ where E: Error state.last_errors.push_back(format!("{:?}", e)); } - /// GET `/v1/chat/completions/status`: returns the current status of the /chat/completions endpoint. /// /// The status is returned as json value AIStatus. @@ -148,8 +152,10 @@ struct AIStates { impl Default for AIStates { fn default() -> AIStates { AIStates { - endpoints: vec![RwLock::new(Default::default()), - RwLock::new(Default::default())] + endpoints: vec![ + RwLock::new(Default::default()), + RwLock::new(Default::default()), + ], } } } @@ -169,83 +175,93 @@ fn internal_server_error(msg: &str) -> Response { // - sets the percentage in the status.download_progress // - until the tempfile disappears or no progress was made for 1 minute. // TODO: This code should go to the module manager. -async fn observe_progress(datadir: &PathBuf, size: Option, download: bool) -> tokio::task::JoinHandle<()> { - let tmp = datadir.join("tmp"); +async fn observe_progress( + datadir: &PathBuf, + size: Option, + download: bool, +) -> tokio::task::JoinHandle<()> { + let tmp = datadir.join("tmp"); + + let progress_handle = tokio::spawn(async move { + if !download { + info!("progress observer: no download necessary, file is already there"); + return; + } - let progress_handle = tokio::spawn(async move { - if !download { - info!("progress observer: no download necessary, file is already there"); - return; - } + if size.is_none() { + warn!("progress observer: unknown file size. No progress reported on download"); + return; + } + let size = size.unwrap(); - if size.is_none() { - warn!("progress observer: unknown file size. No progress reported on download"); - return; - } - let size = size.unwrap(); + if !have_tempdir(&tmp).await { + return; + } - if !have_tempdir(&tmp).await { - return; - } + let t = wait_for_tempfile(&tmp).await; + if t.is_none() { + return; + } - let t = wait_for_tempfile(&tmp).await; - if t.is_none() { + let f = t.unwrap(); + + let mut m = tokio::fs::metadata(&f.path()).await; + let mut last_size = 0; + let mut timestamp = Instant::now(); + while m.is_ok() { + let s = m.unwrap().len() as u64; + let t = size; + let p = (s * 100) / t; + + if size > last_size { + last_size = size; + timestamp = Instant::now(); + } else if Instant::now().duration_since(timestamp) > Duration::from_secs(60) { + warn!("progress observer: no download progress in a minute. Giving up"); return; - } + }; - let f = t.unwrap(); - - let mut m = tokio::fs::metadata(&f.path()).await; - let mut last_size = 0; - let mut timestamp = Instant::now(); - while m.is_ok() { - let s = m.unwrap().len() as u64; - let t = size; - let p = (s * 100) / t; - - if size > last_size { - last_size = size; - timestamp = Instant::now(); - } else if Instant::now().duration_since(timestamp) > Duration::from_secs(60) { - warn!("progress observer: no download progress in a minute. Giving up"); - return; - }; - - set_chat_completions_progress(p).await; - tokio::time::sleep(std::time::Duration::from_millis(500)).await; - m = tokio::fs::metadata(&f.path()).await; - } - }); + set_chat_completions_progress(p).await; + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + m = tokio::fs::metadata(&f.path()).await; + } + }); - progress_handle + progress_handle } async fn have_tempdir(tmp: &PathBuf) -> bool { let mut d = tokio::fs::metadata(&tmp).await; - for _ in 0 .. 10 { + for _ in 0..10 { if d.is_ok() { break; }; tokio::time::sleep(std::time::Duration::from_millis(10)).await; d = tokio::fs::metadata(&tmp).await; - }; + } if d.is_err() { - error!("progress observer: can't read tmp directory ({:?}). Giving up", d); + error!( + "progress observer: can't read tmp directory ({:?}). Giving up", + d + ); add_chat_completions_error(d.unwrap_err()).await; return false; }; - + true } // TODO: we use the first file we find in the tmp directory. // we should instead *know* the name of the file. async fn wait_for_tempfile(tmp: &PathBuf) -> Option { - for _ in 0 .. 30 { + for _ in 0..30 { tokio::time::sleep(std::time::Duration::from_millis(100)).await; let es = std::fs::read_dir(&tmp); if es.is_err() { - error!("progress observer: cannot read tmp directory ({:?}). Giving up", es); + error!( + "progress observer: cannot read tmp directory ({:?}). Giving up", + es + ); add_chat_completions_error(es.unwrap_err()).await; return None; }; @@ -253,8 +269,8 @@ async fn wait_for_tempfile(tmp: &PathBuf) -> Option { if e.is_ok() { return Some(e.unwrap()); } - }; - }; + } + } None } From 310ca66b9a5b4f1e8dbd3e684888e98d3bb764d1 Mon Sep 17 00:00:00 2001 From: Tobias Schoofs Date: Mon, 5 Feb 2024 17:08:36 +0000 Subject: [PATCH 6/9] [feat/issue10] tests added to status --- crates/edgen_server/src/status.rs | 120 +++++++++++++++++++++++++++++- 1 file changed, 119 insertions(+), 1 deletion(-) diff --git a/crates/edgen_server/src/status.rs b/crates/edgen_server/src/status.rs index 7cd7130..7d8c7ad 100644 --- a/crates/edgen_server/src/status.rs +++ b/crates/edgen_server/src/status.rs @@ -75,6 +75,8 @@ impl Default for AIStatus { } } +const MAX_ERRORS: usize = 32; + /// Get a protected chat completions status. /// Call read() or write() on the returned value to get either read or write access. pub fn get_chat_completions_status() -> &'static RwLock { @@ -122,7 +124,7 @@ where { let rwstate = get_chat_completions_status(); let mut state = rwstate.write().await; - if state.last_errors.len() > 32 { + if state.last_errors.len() > MAX_ERRORS { state.last_errors.pop_front(); } state.last_errors.push_back(format!("{:?}", e)); @@ -274,3 +276,119 @@ async fn wait_for_tempfile(tmp: &PathBuf) -> Option { None } + +#[cfg(test)] +mod tests { + use super::*; + use std::io::{Error, ErrorKind}; + + #[tokio::test] + async fn test_get_status_untouched() { + let status = get_chat_completions_status().read().await; + assert_eq!(*status, AIStatus::default()); + } + + #[tokio::test] + async fn test_get_status_changed() { + // default + let mut expected = AIStatus::default(); + + { + let status = get_chat_completions_status().read().await; + assert_eq!(*status, AIStatus::default()); + } + + // download ongoing + expected.download_ongoing = true; + set_chat_completions_download(true).await; + + { + let status = get_chat_completions_status().read().await; + assert_eq!(*status, expected); + } + + // download progress + expected.download_progress = 42; + set_chat_completions_progress(42).await; + + { + let status = get_chat_completions_status().read().await; + assert_eq!(*status, expected); + } + } + + #[tokio::test] + async fn test_get_status_errors() { + // default + let mut expected = AIStatus::default(); + + { + let status = get_chat_completions_status().read().await; + assert_eq!(*status, AIStatus::default()); + } + + // errors + let e1 = Error::new(ErrorKind::Interrupted, "couldn't finish"); + expected.last_errors.push_back(format!("{:?}", e1)); + add_chat_completions_error(e1).await; + + { + let status = get_chat_completions_status().read().await; + assert_eq!(*status, expected); + } + + let e2 = Error::new(ErrorKind::NotFound, "I still haven't found"); + expected.last_errors.push_back(format!("{:?}", e2)); + add_chat_completions_error(e2).await; + + assert_eq!(expected.last_errors.len(), 2); + + { + let status = get_chat_completions_status().read().await; + assert_eq!(*status, expected); + } + + let e3 = Error::new(ErrorKind::PermissionDenied, "verboten"); + expected.last_errors.push_back(format!("{:?}", e3)); + add_chat_completions_error(e3).await; + + assert_eq!(expected.last_errors.len(), 3); + + { + let status = get_chat_completions_status().read().await; + assert_eq!(*status, expected); + } + + // make sure there are at most MAX_ERRORS + for i in 0 .. 29 { + let message = format!("{} times verboten", i+1); + let e = Error::new(ErrorKind::PermissionDenied, message); + expected.last_errors.push_back(format!("{:?}", e)); + add_chat_completions_error(e).await; + } + + assert_eq!(expected.last_errors.len(), MAX_ERRORS); + + { + let status = get_chat_completions_status().read().await; + assert_eq!(*status, expected); + } + + for i in 0 .. 10 { + let message = format!("{} times more verboten", i+1); + let e = Error::new(ErrorKind::PermissionDenied, message); + expected.last_errors.pop_front(); + expected.last_errors.push_back(format!("{:?}", e)); + add_chat_completions_error(e).await; + } + + assert_eq!(expected.last_errors.len(), MAX_ERRORS); + + { + let status = get_chat_completions_status().read().await; + let mut v1 = Vec::from(status.last_errors.clone()); + let mut v2 = Vec::from(expected.last_errors.clone()); + assert_eq!(v1.sort(), v2.sort()); + } + } +} From a55c15d51e5b69c05cee58d2c858ea3d577425b4 Mon Sep 17 00:00:00 2001 From: Tobias Schoofs Date: Mon, 5 Feb 2024 17:30:55 +0000 Subject: [PATCH 7/9] [feat/issue10] documentation for chat/completions/status added --- docs/src/app/api-reference/chat/page.mdx | 50 ++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/docs/src/app/api-reference/chat/page.mdx b/docs/src/app/api-reference/chat/page.mdx index df71d7a..1d17db5 100644 --- a/docs/src/app/api-reference/chat/page.mdx +++ b/docs/src/app/api-reference/chat/page.mdx @@ -118,3 +118,53 @@ Generate text from text. {{ className: 'lead' }} + +--- + +## Chat completion Status {{ tag: 'GET', label: 'http://localhost:33322/v1/chat/completions/status' }} + + + + Shows the current status of the chat completions endpoint (e.g. downloads). + + + + + + ```bash {{ title: 'cURL' }} + curl http://localhost:33322/v1/chat/completions/status \ + -H "Content-Type: application/json" \ + -H "Authorization: Bearer no-key-required" \ + ``` + + ```python + from edgen import Edgen + client = Edgen() + + status = client.chat.completions.status.create() + print(status) + + ``` + + ```ts + import Edgen from "edgen"; + + const client = new Edgen(); + + async function main() { + const status = await client.chat.completions.status.create(); + + console.log(status); + } + + main(); + ``` + + + + ```json {{ title: 'Response' }} + {"active_model":"neural-chat-7b-v3-3.Q4_K_M.gguf","last_activity":"download","last_activity_result":"success","completions_ongoing":false,"download_ongoing":false,"download_progress":100,"last_errors":["Custom { kind: PermissionDenied, error: \"verboten\" }]} + ``` + + + From f23de76a4dd5643cead620fe265d6dd4359b31fc Mon Sep 17 00:00:00 2001 From: francis2tm Date: Mon, 5 Feb 2024 17:47:16 +0000 Subject: [PATCH 8/9] docs: changed chat/completions/status endpoint title in docs --- docs/src/app/api-reference/chat/page.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/src/app/api-reference/chat/page.mdx b/docs/src/app/api-reference/chat/page.mdx index 1d17db5..0f93286 100644 --- a/docs/src/app/api-reference/chat/page.mdx +++ b/docs/src/app/api-reference/chat/page.mdx @@ -121,7 +121,7 @@ Generate text from text. {{ className: 'lead' }} --- -## Chat completion Status {{ tag: 'GET', label: 'http://localhost:33322/v1/chat/completions/status' }} +## Chat completion status {{ tag: 'GET', label: 'http://localhost:33322/v1/chat/completions/status' }} From 930797668a6409029610398f5342380e2d551cf2 Mon Sep 17 00:00:00 2001 From: francis2tm Date: Mon, 5 Feb 2024 17:49:12 +0000 Subject: [PATCH 9/9] docs: changed chat/completions/status endpoint tag to GET --- docs/src/app/api-reference/chat/page.mdx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/src/app/api-reference/chat/page.mdx b/docs/src/app/api-reference/chat/page.mdx index 0f93286..055bbd3 100644 --- a/docs/src/app/api-reference/chat/page.mdx +++ b/docs/src/app/api-reference/chat/page.mdx @@ -129,7 +129,7 @@ Generate text from text. {{ className: 'lead' }} - + ```bash {{ title: 'cURL' }} curl http://localhost:33322/v1/chat/completions/status \