Skip to content

Commit

Permalink
refactor: optimize health checker
Browse files Browse the repository at this point in the history
Signed-off-by: Xin Liu <[email protected]>
  • Loading branch information
apepkuss committed Dec 13, 2024
1 parent 6823936 commit 6779724
Showing 1 changed file with 95 additions and 61 deletions.
156 changes: 95 additions & 61 deletions src/health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use tokio::sync::RwLock;

#[derive(Debug)]
struct LogMessage {
_timestamp: DateTime<Utc>,
timestamp: DateTime<Utc>,
_level: String,
_service: String,
_file: String,
Expand Down Expand Up @@ -48,7 +48,7 @@ impl FromStr for LogMessage {
};

Ok(LogMessage {
_timestamp: timestamp,
timestamp,
_level: captures["level"].to_string(),
_service: captures["service"].to_string(),
_file: captures["file"].to_string(),
Expand Down Expand Up @@ -121,88 +121,122 @@ pub(crate) async fn check_server_health(
info!("num of new log messages: {}", new_lines.len());

// Iterate over the new lines and analyze server health
for line in new_lines.lines() {
// method: check the most recent pair of request and response
for line in new_lines.lines().rev() {
if let Ok(log_message) = LogMessage::from_str(line) {
if log_message.custom_message.starts_with("endpoint") {
// * capture request
let endpoint = log_message
if log_message.custom_message.starts_with("response_status:") {
// get the status code
let status_code = log_message
.custom_message
.split_whitespace()
.last()
.unwrap();
info!("capture a request to {}", endpoint);
log_queue.push_back(log_message);
.unwrap().to_string();
info!("Capture a response status: {}", &status_code);

// record the timestamp of the most recent response
match TIMESTAMP_LAST_RESPONSE.get() {
None => {TIMESTAMP_LAST_RESPONSE
.set(RwLock::new(log_message.timestamp))
.expect("Unable to set timestamp")},
Some(timestamp) => {
let mut timestamp = timestamp.write().await;

*timestamp = log_message.timestamp;
},
}
info!("Timestamp of the most recent response: {}", &log_message.timestamp);

match log_queue.is_empty() {
true => {
info!("Push the most recent response to the queue");
log_queue.push_back(log_message)},
false => {
match SERVER_HEALTH.get() {
Some(_) => {
let mut server_health = SERVER_HEALTH
.get()
.expect("Unable to get server health")
.write()
.await;

*server_health = false;
}
None => {
SERVER_HEALTH
.set(RwLock::new(false))
.expect("Unable to set server health");
}
};

if log_queue.len() > 1 {
// pop the last response
log_queue.pop_front();
info!("Update the server health to false");

break;
}
}
} else if log_message.custom_message.starts_with("response_status") {
// * capture response
let status = log_message
} else if log_message.custom_message.starts_with("endpoint:") {
let endpoint = log_message
.custom_message
.split_whitespace()
.last()
.unwrap();
info!("capture a response status: {}", status);

// record the timestamp of the latest response
if TIMESTAMP_LAST_RESPONSE.get().is_none() {
TIMESTAMP_LAST_RESPONSE
.set(RwLock::new(Utc::now()))
.expect("Unable to set timestamp");
} else {
let mut timestamp = TIMESTAMP_LAST_RESPONSE
.get()
.expect("Unable to get timestamp")
.write()
.await;
info!("Capture the most recent request to {}", endpoint);

*timestamp = Utc::now();
}

// push the response to the queue
log_queue.push_back(log_message);
match log_queue.is_empty() {
true => {
match SERVER_HEALTH.get() {
Some(_) => {
let mut server_health = SERVER_HEALTH
.get()
.expect("Unable to get server health")
.write()
.await;

// update the server health
if log_queue.len() > 1 {
if let Some(log) = log_queue.pop_front() {
if !log.custom_message.starts_with("endpoint") {
if SERVER_HEALTH.get().is_none() {
*server_health = false;
}
None => {
SERVER_HEALTH
.set(RwLock::new(false))
.expect("Unable to set server health");
} else {
let mut server_health = SERVER_HEALTH
.get()
.expect("Unable to get server health")
.write()
.await;
if *server_health {
*server_health = false;
}
.set(RwLock::new(false))
.expect("Unable to set server health");
}
};

info!("Update the server health to false");

break;
},
false => {
let response_log_message = log_queue.pop_front().unwrap();
let status_code = response_log_message.custom_message.split_whitespace().last().unwrap().to_string();
info!("Status code of the paired response: {}", &status_code);

let health_status = if status_code == "200" {
true
} else {
if SERVER_HEALTH.get().is_none() {
false
};

match SERVER_HEALTH.get() {
Some(healthy) => {
let mut healthy = healthy.write().await;

*healthy = health_status;
}
None => {
SERVER_HEALTH
.set(RwLock::new(true))
.set(RwLock::new(health_status))
.expect("Unable to set server health");
} else {
let mut server_health = SERVER_HEALTH
.get()
.expect("Unable to get server health")
.write()
.await;
if !*server_health {
*server_health = true;
}
}
}
};

info!("Update the server health to {}", health_status);

break;
}
}
}
}
}

} else {
//* If long time no requests coming in, then invoke `ping_server` function to send a request to /v1/chat/completions endpoint */
// compare the current timestamp with the last response timestamp
Expand Down

0 comments on commit 6779724

Please sign in to comment.