Skip to content

Commit

Permalink
feat(server): add /ws/:hostname to stream compile log
Browse files Browse the repository at this point in the history
  • Loading branch information
eatradish committed Jun 19, 2024
1 parent df6acff commit b013078
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 7 deletions.
38 changes: 36 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ buildit-utils = { path = "../buildit-utils" }
jsonwebtoken = "9.2.0"
size = "0.4.1"
dickens = { git = "https://github.com/AOSC-Dev/dickens.git", version = "0.1.0" }
axum = "0.7.4"
axum = { version = "0.7.4", features = ["ws"] }
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
tracing = "0.1.40"
tower-http = { version = "0.5.2", features = ["trace", "fs", "cors"] }
Expand Down
9 changes: 7 additions & 2 deletions server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ use server::bot::{answer, Command};
use server::recycler::recycler_worker;
use server::routes::{
dashboard_status, job_info, job_list, job_restart, ping, pipeline_info, pipeline_list,
pipeline_new_pr, worker_info, worker_job_update, worker_list, worker_poll, AppState,
pipeline_new_pr, worker_info, worker_job_update, worker_list, worker_poll, ws_handler,
AppState, PeerMap,
};
use server::routes::{pipeline_new, worker_heartbeat};
use server::routes::{pipeline_status, worker_status};
use server::{DbPool, ARGS};
use std::collections::HashMap;
use std::os::unix::fs::PermissionsExt;
use std::sync::Arc;
use std::sync::{Arc, RwLock};
use teloxide::prelude::*;
use tokio::net::unix::UCred;
use tokio::net::UnixStream;
Expand Down Expand Up @@ -99,7 +101,9 @@ async fn main() -> anyhow::Result<()> {
let state = AppState {
pool: pool.clone(),
bot,
ws_peer_map: PeerMap::new(RwLock::new(HashMap::new())),
};

let mut app = Router::new()
.route("/api/ping", get(ping))
.route("/api/pipeline/new", post(pipeline_new))
Expand All @@ -117,6 +121,7 @@ async fn main() -> anyhow::Result<()> {
.route("/api/worker/list", get(worker_list))
.route("/api/worker/info", get(worker_info))
.route("/api/dashboard/status", get(dashboard_status))
.route("/api/ws/:hostname", get(ws_handler))
.nest_service("/assets", ServeDir::new("frontend/dist/assets"))
.route_service("/favicon.ico", ServeFile::new("frontend/dist/favicon.ico"))
.fallback_service(ServeFile::new("frontend/dist/index.html"))
Expand Down
13 changes: 12 additions & 1 deletion server/src/routes/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,39 @@ use diesel::dsl::{count, sum};

use diesel::{Connection, ExpressionMethods, QueryDsl, RunQueryDsl};

use futures::channel::mpsc::UnboundedSender;
use serde::Serialize;
use std::collections::BTreeMap;
use std::{
collections::{BTreeMap, HashMap},
net::SocketAddr,
sync::{Arc, RwLock},
};

use teloxide::prelude::*;
use tracing::info;

pub mod job;
pub mod pipeline;
pub mod websocket;
pub mod worker;

pub use job::*;
pub use pipeline::*;
pub use websocket::*;
pub use worker::*;

pub async fn ping() -> &'static str {
"PONG"
}

type Tx = (UnboundedSender<axum::extract::ws::Message>, String);
pub type PeerMap = Arc<RwLock<HashMap<SocketAddr, Tx>>>;

#[derive(Clone)]
pub struct AppState {
pub pool: DbPool,
pub bot: Option<Bot>,
pub ws_peer_map: PeerMap,
}

// learned from https://github.com/tokio-rs/axum/blob/main/examples/anyhow-error-response/src/main.rs
Expand Down
73 changes: 73 additions & 0 deletions server/src/routes/websocket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
use std::net::SocketAddr;

use axum::{
extract::{
ws::{Message, WebSocket},
ConnectInfo, Path, State, WebSocketUpgrade,
},
response::IntoResponse,
};
use futures::{
channel::mpsc::{unbounded, UnboundedSender},
future, pin_mut, StreamExt, TryStreamExt,
};
use tracing::info;

use super::{AppState, PeerMap};

/// The handler for the HTTP request (this gets called when the HTTP GET lands at the start
/// of websocket negotiation). After this completes, the actual switching from HTTP to
/// websocket protocol will occur.
/// This is the last point where we can extract TCP/IP metadata such as IP address of the client
/// as well as things from HTTP headers such as user-agent of the browser etc.
pub async fn ws_handler(
Path(hostname): Path<String>,
ws: WebSocketUpgrade,
ConnectInfo(addr): ConnectInfo<SocketAddr>,
State(state): State<AppState>,
) -> impl IntoResponse {
// finalize the upgrade process by returning upgrade callback.
// we can customize the callback by sending additional info such as address.
ws.on_upgrade(move |socket| handle_socket(socket, addr, hostname, state.ws_peer_map))
}

/// Actual websocket statemachine (one will be spawned per connection)
async fn handle_socket(socket: WebSocket, who: SocketAddr, hostname: String, peer_map: PeerMap) {
let (tx, rx): (UnboundedSender<Message>, _) = unbounded();
peer_map
.write()
.unwrap()
.insert(who, (tx, hostname.clone()));

let (outgoing, incoming) = socket.split();

let broadcast_incoming = incoming.try_for_each(|msg| {
info!("Received a message from {}: {:?}", who, msg);

let peers = peer_map.read().unwrap();

// We want to broadcast the message to everyone except ourselves.
let broadcast_recipients = peers
.iter()
.filter(|(peer_addr, _)| peer_addr != &&who)
.map(|(_, (ws_sink, port))| (ws_sink, port));

for recp in broadcast_recipients {
let recp_path = recp.1;

if *recp_path == hostname {
recp.0.unbounded_send(msg.clone()).unwrap();
}
}

future::ok(())
});

let receive_from_others = rx.map(Ok).forward(outgoing);

pin_mut!(broadcast_incoming, receive_from_others);
future::select(broadcast_incoming, receive_from_others).await;

info!("{} disconnected", &who);
peer_map.write().unwrap().remove(&who);
}
2 changes: 1 addition & 1 deletion server/src/routes/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ pub async fn worker_poll(
}

pub async fn worker_job_update(
State(AppState { pool, bot }): State<AppState>,
State(AppState { pool, bot, .. }): State<AppState>,
Json(payload): Json<WorkerJobUpdateRequest>,
) -> Result<(), AnyhowError> {
if payload.worker_secret != ARGS.worker_secret {
Expand Down

0 comments on commit b013078

Please sign in to comment.