diff --git a/src/sink.rs b/src/sink.rs index 43ff530..160a374 100644 --- a/src/sink.rs +++ b/src/sink.rs @@ -231,14 +231,19 @@ where mut sink_stream: Streaming, grpc_resp_tx: mpsc::Sender>, ) -> Result<(), Error> { + // loop until the global stream has been shutdown. loop { - let done = Self::process_sink_batch( + // for every batch, we need to read from the stream. The end-of-batch is + // encoded in the request. + let stream_ended = Self::process_sink_batch( sink_handle.clone(), &mut sink_stream, grpc_resp_tx.clone(), ) .await?; - if done { + + if stream_ended { + // shutting down, hence exiting the loop break; } } @@ -246,7 +251,8 @@ where } /// processes a batch of messages from the client, sends them to the sink handler and sends the - /// responses back to the client batches are separated by an EOT message + /// responses back to the client batches are separated by an EOT message. + /// Returns true if the global bidi-stream has ended, otherwise false. async fn process_sink_batch( sink_handle: Arc, sink_stream: &mut Streaming, @@ -270,12 +276,18 @@ where } }); + let mut global_stream_ended = false; + + // loop until eot happens on stream is closed. loop { let message = match sink_stream.message().await { Ok(Some(m)) => m, Ok(None) => { info!("global bidi stream ended"); - return Ok(true); // bidi stream ended + // NOTE: this will only happen during shutdown. We can be certain that there + // are no messages left hanging in the UDF. + global_stream_ended = true; + break; // bidi stream ended } Err(e) => { return Err(SinkError(InternalError(format!( @@ -315,7 +327,7 @@ where .await .map_err(|e| SinkError(UserDefinedError(e.to_string())))?; - Ok(false) + Ok(global_stream_ended) } /// handles errors from the sink handler and sends them to the client via the response channel @@ -354,7 +366,7 @@ where } } - // performs handshake with the client + /// performs handshake with the client async fn perform_handshake( &self, sink_stream: &mut Streaming,