Skip to content

Commit

Permalink
minor changes
Browse files Browse the repository at this point in the history
  • Loading branch information
ssrlive committed Dec 26, 2024
1 parent 730fc9f commit 2aa891a
Showing 1 changed file with 11 additions and 14 deletions.
25 changes: 11 additions & 14 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,25 +163,24 @@ async fn handle_socks5_cmd_connection(connect: Connect<NeedReply>, target_addr:
Ok(())
}

async fn client_traffic_loop<T: AsyncRead + AsyncWrite + Unpin, S: AsyncRead + AsyncWrite + Unpin>(
mut incoming: T,
mut ws_stream: WebSocketStream<S>,
peer_addr: SocketAddr,
target_addr: Address,
) -> Result<()> {
async fn client_traffic_loop<T, S>(mut incoming: T, mut ws_stream: WebSocketStream<S>, src: SocketAddr, dest: Address) -> Result<()>
where
T: AsyncRead + AsyncWrite + Unpin,
S: AsyncRead + AsyncWrite + Unpin,
{
let mut timer = tokio::time::interval(std::time::Duration::from_secs(30));
loop {
let mut buf = BytesMut::with_capacity(crate::STREAM_BUFFER_SIZE);
tokio::select! {
result = incoming.read_buf(&mut buf) => {
let len = result?;
if len == 0 {
log::trace!("{} -> {} incoming closed", peer_addr, target_addr);
log::trace!("{} -> {} incoming closed", src, dest);
ws_stream.send(Message::Close(None)).await?;
break;
}
ws_stream.send(Message::binary(buf.to_vec())).await?;
log::trace!("{} -> {} length {}", peer_addr, target_addr, buf.len());
log::trace!("{} -> {} length {}", src, dest, buf.len());

if let Err(e) = crate::traffic_status::traffic_status_update(len, 0) {
log::error!("{}", e);
Expand All @@ -199,21 +198,19 @@ async fn client_traffic_loop<T: AsyncRead + AsyncWrite + Unpin, S: AsyncRead + A
match msg {
Message::Binary(data) => {
incoming.write_all(&data).await?;
log::trace!("{} <- {} length {}", peer_addr, target_addr, data.len());
log::trace!("{} <- {} length {}", src, dest, data.len());
}
Message::Close(_) => {
log::trace!("{} <- {} ws closed", peer_addr, target_addr);
log::trace!("{} <- {} ws closed, exiting", src, dest);
break;
}
Message::Pong(_) => {
log::trace!("{} <- {} Websocket pong from remote", peer_addr, target_addr);
},
Message::Pong(_) => log::trace!("{} <- {} Websocket pong from remote", src, dest),
_ => {}
}
}
_ = timer.tick() => {
ws_stream.send(Message::Ping(vec![].into())).await?;
log::trace!("{} -> {} Websocket ping from local", peer_addr, target_addr);
log::trace!("{} -> {} Websocket ping from local", src, dest);
}
}
}
Expand Down

0 comments on commit 2aa891a

Please sign in to comment.