Skip to content

Commit

Permalink
More naming and related cleanup.
Browse files Browse the repository at this point in the history
  • Loading branch information
ximon18 committed Oct 12, 2023
1 parent c4d0d66 commit f231fc9
Showing 1 changed file with 49 additions and 32 deletions.
81 changes: 49 additions & 32 deletions src/serve/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ where
ProcessActionResult::CallResultReceived(
call_result,
) => {
self.write_queued_result(
self.handle_queued_result(
state,
call_result,
)
Expand Down Expand Up @@ -602,51 +602,68 @@ where
// the write queue and exit this connection handler.
result_q_rx.close();
while let Some(call_result) = result_q_rx.recv().await {
self.write_queued_result(state, call_result).await;
self.handle_queued_result(state, call_result).await;
}
}

async fn write_queued_result(
async fn handle_queued_result(
&self,
state: &mut StreamState<Stream, Buf, Svc>,
mut call_result: CallResult<Svc::ResponseOctets>,
) {
if let Some(msg) = call_result.response() {
// TODO: spawn this as a task and serialize access to write with a lock?
if let Err(err) =
state.stream_tx.write_all(msg.as_stream_slice()).await
{
eprintln!("Write error: {err}");
todo!()
}
if state.result_q_tx.capacity()
== state.result_q_tx.max_capacity()
{
state.response_queue_emptied();
}
self.metrics
.num_pending_writes
.fetch_sub(1, Ordering::Relaxed);
self.write_result_to_stream(state, msg).await;
}
if let Some(cmd) = call_result.command() {
match cmd {
ServiceCommand::CloseConnection { .. } => todo!(),
ServiceCommand::Init => todo!(),
ServiceCommand::Reconfigure { idle_timeout } => {
eprintln!(
"Reconfigured connection timeout to {idle_timeout:?}"
);
state.idle_timeout =
chrono::Duration::from_std(idle_timeout).unwrap();
// TODO: Check this unwrap()
}
ServiceCommand::Shutdown => {
state.stream_tx.shutdown().await.unwrap()
}
self.act_on_queued_command(cmd, state).await;
}
}

async fn write_result_to_stream(
&self,
state: &mut StreamState<Stream, Buf, Svc>,
msg: crate::base::StreamTarget<<Svc as Service<<Buf as BufSource>::Output>>::ResponseOctets>
) {
// TODO: spawn this as a task and serialize access to write with a lock?
if let Err(err) =
state.stream_tx.write_all(msg.as_stream_slice()).await
{
eprintln!("Write error: {err}");
todo!()
}
if state.result_q_tx.capacity()
== state.result_q_tx.max_capacity()
{
state.response_queue_emptied();
}
self.metrics
.num_pending_writes
.fetch_sub(1, Ordering::Relaxed);
}

async fn act_on_queued_command(
&self,
cmd: ServiceCommand,
state: &mut StreamState<Stream, Buf, Svc>
) {
match cmd {
ServiceCommand::CloseConnection { .. } => todo!(),
ServiceCommand::Init => todo!(),
ServiceCommand::Reconfigure { idle_timeout } => {
eprintln!(
"Reconfigured connection timeout to {idle_timeout:?}"
);
state.idle_timeout =
chrono::Duration::from_std(idle_timeout).unwrap();
// TODO: Check this unwrap()
}
ServiceCommand::Shutdown => {
state.stream_tx.shutdown().await.unwrap()
}
}
}
}


//------------ StreamServerEvent ---------------------------------------------

Expand Down

0 comments on commit f231fc9

Please sign in to comment.