Skip to content

Commit

Permalink
runtime: complete switching to anyhow::Result streams over tonic::Result
Browse files Browse the repository at this point in the history
anyhow::Result preserves error structure, and can be downcast to an
underlying tonic::Status if that's the causal error.
  • Loading branch information
jgraettinger committed Nov 3, 2023
1 parent f4ad5e8 commit a79a319
Show file tree
Hide file tree
Showing 16 changed files with 448 additions and 540 deletions.
10 changes: 2 additions & 8 deletions crates/build/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,8 +389,7 @@ where
.runtime
.clone()
.unary_capture(request, CONNECTOR_TIMEOUT)
.await
.map_err(status_to_anyhow)?)
.await?)
}
}
.boxed()
Expand Down Expand Up @@ -430,18 +429,13 @@ where
.runtime
.clone()
.unary_materialize(request, CONNECTOR_TIMEOUT)
.await
.map_err(status_to_anyhow)?)
.await?)
}
}
.boxed()
}
}

fn status_to_anyhow(status: tonic::Status) -> anyhow::Error {
anyhow::anyhow!(status.message().to_string())
}

pub const FETCH_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(30);
pub const CONNECTOR_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(300); // Five minutes.
pub const STDIN_URL: &str = "stdin://root/flow.yaml";
6 changes: 2 additions & 4 deletions crates/flowctl/src/generate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,7 @@ async fn generate_missing_capture_configs(
},
build::CONNECTOR_TIMEOUT,
)
.await
.map_err(crate::status_to_anyhow)?
.await?
.spec
.context("connector didn't send expected Spec response")?;

Expand Down Expand Up @@ -322,8 +321,7 @@ async fn generate_missing_materialization_configs(
},
build::CONNECTOR_TIMEOUT,
)
.await
.map_err(crate::status_to_anyhow)?
.await?
.spec
.context("connector didn't send expected Spec response")?;

Expand Down
4 changes: 0 additions & 4 deletions crates/flowctl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,3 @@ fn format_user(email: Option<String>, full_name: Option<String>, id: Option<uuid
id = id.map(|id| id.to_string()).unwrap_or_default(),
)
}

fn status_to_anyhow(status: tonic::Status) -> anyhow::Error {
anyhow::anyhow!(status.message().to_string())
}
16 changes: 6 additions & 10 deletions crates/flowctl/src/raw/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ pub async fn do_capture(
}: &Capture,
) -> anyhow::Result<()> {
let client = ctx.controlplane_client().await?;
let (sources, validations) = local_specs::load_and_validate_full(client, &source, &network).await?;
let (sources, validations) =
local_specs::load_and_validate_full(client, &source, &network).await?;

// Identify the capture to discover.
let needle = if let Some(needle) = capture {
Expand Down Expand Up @@ -99,8 +100,7 @@ pub async fn do_capture(
let capture::response::Applied { action_description } = runtime
.clone()
.unary_capture(apply, build::CONNECTOR_TIMEOUT)
.await
.map_err(crate::status_to_anyhow)?
.await?
.applied
.context("connector didn't send expected Applied response")?;

Expand All @@ -109,16 +109,12 @@ pub async fn do_capture(
let (mut request_tx, request_rx) = futures::channel::mpsc::channel(runtime::CHANNEL_BUFFER);
request_tx.send(Ok(open)).await.unwrap();

let mut response_rx = runtime
.serve_capture(request_rx)
.await
.map_err(crate::status_to_anyhow)?;
let mut response_rx = runtime.serve_capture(request_rx).await?;

let opened = response_rx
.next()
.await
.context("expected Opened, not EOF")?
.map_err(crate::status_to_anyhow)?
.context("expected Opened, not EOF")??
.opened
.context("expected Opened")?;

Expand All @@ -140,7 +136,7 @@ pub async fn do_capture(
// TODO(johnny): This is currently only partly implemented, but is awaiting
// accompanying changes to the `runtime` crate.
while let Some(response) = response_rx.next().await {
let response = response.map_err(crate::status_to_anyhow)?;
let response = response?;

let _internal = response
.get_internal()
Expand Down
3 changes: 1 addition & 2 deletions crates/flowctl/src/raw/discover.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,7 @@ pub async fn do_discover(
format!("discover/{}", capture.capture),
)
.unary_capture(discover, build::CONNECTOR_TIMEOUT)
.await
.map_err(crate::status_to_anyhow)?
.await?
.discovered
.context("connector didn't send expected Discovered response")?;

Expand Down
36 changes: 18 additions & 18 deletions crates/runtime/src/capture/image.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::extract_endpoint;
use crate::{
image_connector::{Connector, Container, StartRpcFuture, UnsealFuture, Unsealed},
image_connector::{serve, Container, StartRpcFuture, UnsealFuture, Unsealed},
unseal,
};
use futures::{channel::mpsc, FutureExt, Stream};
Expand Down Expand Up @@ -33,7 +33,8 @@ fn unseal(mut request: Request) -> Result<UnsealFuture<Request>, Request> {
let models::CaptureEndpoint::Connector(models::ConnectorConfig {
image,
config: sealed_config,
}) = endpoint else {
}) = endpoint
else {
anyhow::bail!("task connector type has changed and is no longer an image")
};

Expand Down Expand Up @@ -79,12 +80,12 @@ pub fn connector<L, R>(
network: &str,
request_rx: R,
task_name: &str,
) -> mpsc::Receiver<tonic::Result<Response>>
) -> mpsc::Receiver<anyhow::Result<Response>>
where
L: Fn(&ops::Log) + Clone + Send + Sync + 'static,
R: Stream<Item = tonic::Result<Request>> + Send + Unpin + 'static,
R: Stream<Item = anyhow::Result<Request>> + Send + 'static,
{
let (connector, response_rx) = Connector::new(
serve(
attach_container,
log_handler,
network,
Expand All @@ -93,10 +94,7 @@ where
task_name,
ops::TaskType::Capture,
unseal,
);
tokio::spawn(async move { connector.run().await });

response_rx
)
}

#[cfg(test)]
Expand All @@ -114,16 +112,18 @@ mod test {
return;
}

let request_rx = futures::stream::repeat(Ok(serde_json::from_value(json!({
"spec": {
"connectorType": "IMAGE",
"config": {
"image": "ghcr.io/estuary/source-http-ingest:dev",
"config": {},
let request_rx = futures::stream::repeat_with(|| {
Ok(serde_json::from_value(json!({
"spec": {
"connectorType": "IMAGE",
"config": {
"image": "ghcr.io/estuary/source-http-ingest:dev",
"config": {},
}
}
}
}))
.unwrap()));
}))
.unwrap())
});

let response_rx = connector(ops::tracing_log_handler, "", request_rx.take(2), "a-task");

Expand Down
13 changes: 6 additions & 7 deletions crates/runtime/src/capture/local.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use super::extract_endpoint;
use crate::{
local_connector::{Connector, UnsealFuture, Unsealed},
local_connector::{serve, UnsealFuture, Unsealed},
unseal,
};
use futures::{channel::mpsc, FutureExt, Stream};
Expand Down Expand Up @@ -35,7 +35,8 @@ fn unseal(mut request: Request) -> Result<UnsealFuture<Request>, Request> {
config: sealed_config,
env,
protobuf,
}) = endpoint else {
}) = endpoint
else {
anyhow::bail!("task connector type has changed and is no longer an image")
};

Expand All @@ -60,12 +61,10 @@ fn unseal(mut request: Request) -> Result<UnsealFuture<Request>, Request> {
.boxed())
}

pub fn connector<L, R>(log_handler: L, request_rx: R) -> mpsc::Receiver<tonic::Result<Response>>
pub fn connector<L, R>(log_handler: L, request_rx: R) -> mpsc::Receiver<anyhow::Result<Response>>
where
L: Fn(&ops::Log) + Clone + Send + Sync + 'static,
R: Stream<Item = tonic::Result<Request>> + Send + Unpin + 'static,
R: Stream<Item = anyhow::Result<Request>> + Send + 'static,
{
let (connector, response_rx) = Connector::new(log_handler, request_rx, unseal);
tokio::spawn(async move { connector.run().await });
response_rx
serve(log_handler, request_rx, unseal)
}
40 changes: 27 additions & 13 deletions crates/runtime/src/capture/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,12 @@ use std::sync::Arc;
mod image;
mod local;

pub type BoxStream = futures::stream::BoxStream<'static, tonic::Result<Response>>;

#[tonic::async_trait]
impl<L> proto_grpc::capture::connector_server::Connector for Runtime<L>
where
L: Fn(&ops::Log) + Send + Sync + Clone + 'static,
{
type CaptureStream = BoxStream;
type CaptureStream = futures::stream::BoxStream<'static, tonic::Result<Response>>;

async fn capture(
&self,
Expand All @@ -46,25 +44,36 @@ where
.get::<tonic::transport::server::UdsConnectInfo>();
tracing::debug!(?request, ?conn_info, "started capture request");

let response_rx = self.clone().serve_capture(request.into_inner()).await?;
let request_rx = crate::stream_status_to_error(request.into_inner());

let response_rx = self
.clone()
.serve_capture(request_rx)
.await
.map_err(crate::anyhow_to_status)?;

Ok(tonic::Response::new(response_rx))
Ok(tonic::Response::new(
crate::stream_error_to_status(response_rx).boxed(),
))
}
}

impl<L> Runtime<L>
where
L: Fn(&ops::Log) + Send + Sync + Clone + 'static,
{
pub async fn serve_capture<In>(self, request_rx: In) -> tonic::Result<BoxStream>
pub async fn serve_capture<In>(
self,
request_rx: In,
) -> anyhow::Result<impl Stream<Item = anyhow::Result<Response>> + Send>
where
In: Stream<Item = tonic::Result<Request>> + Send + Unpin + 'static,
In: Stream<Item = anyhow::Result<Request>> + Send + Unpin + 'static,
{
let mut request_rx = request_rx.peekable();

let mut peek_request = match Pin::new(&mut request_rx).peek().await {
Some(Ok(peek)) => peek.clone(),
Some(Err(status)) => return Err(status.clone()),
Some(Err(_)) => return Err(request_rx.try_next().await.unwrap_err()),
None => return Ok(futures::stream::empty().boxed()),
};
let (endpoint, _) = extract_endpoint(&mut peek_request).map_err(crate::anyhow_to_status)?;
Expand All @@ -90,9 +99,9 @@ where
)
.boxed(),
models::CaptureEndpoint::Local(_) if !self.allow_local => {
return Err(tonic::Status::failed_precondition(
Err(tonic::Status::failed_precondition(
"Local connectors are not permitted in this context",
))
))?
}
models::CaptureEndpoint::Local(_) => {
local::connector(self.log_handler, request_rx).boxed()
Expand All @@ -106,12 +115,17 @@ where
pub fn adjust_log_level<R>(
request_rx: R,
set_log_level: Option<Arc<dyn Fn(ops::log::Level) + Send + Sync>>,
) -> impl Stream<Item = tonic::Result<Request>>
) -> impl Stream<Item = anyhow::Result<Request>>
where
R: Stream<Item = tonic::Result<Request>> + Send + 'static,
R: Stream<Item = anyhow::Result<Request>> + Send + 'static,
{
request_rx.inspect_ok(move |request| {
let Ok(CaptureRequestExt{labels: Some(ops::ShardLabeling { log_level, .. })}) = request.get_internal() else { return };
let Ok(CaptureRequestExt {
labels: Some(ops::ShardLabeling { log_level, .. }),
}) = request.get_internal()
else {
return;
};

if let (Some(log_level), Some(set_log_level)) =
(ops::log::Level::from_i32(log_level), &set_log_level)
Expand Down
12 changes: 4 additions & 8 deletions crates/runtime/src/derive/image.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use super::extract_endpoint;
use crate::{
image_connector::{Connector, Container, StartRpcFuture, UnsealFuture, Unsealed},
image_connector::{serve, Container, StartRpcFuture, UnsealFuture, Unsealed},
unseal,
};
use futures::{channel::mpsc, FutureExt, Stream, StreamExt};
use futures::{channel::mpsc, FutureExt, Stream};
use proto_flow::{
derive::{Request, Response},
runtime::DeriveRequestExt,
Expand Down Expand Up @@ -79,8 +79,7 @@ where
L: Fn(&ops::Log) + Clone + Send + Sync + 'static,
R: Stream<Item = anyhow::Result<Request>> + Send + 'static,
{
let request_rx = crate::stream_error_to_status(request_rx).boxed();
let (connector, response_rx) = Connector::new(
serve(
attach_container,
log_handler,
network,
Expand All @@ -89,8 +88,5 @@ where
task_name,
ops::TaskType::Derivation,
unseal,
);
tokio::spawn(async move { connector.run().await });

crate::stream_status_to_error(response_rx)
)
}
9 changes: 3 additions & 6 deletions crates/runtime/src/derive/local.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use super::extract_endpoint;
use crate::{
local_connector::{Connector, UnsealFuture, Unsealed},
local_connector::{serve, UnsealFuture, Unsealed},
unseal,
};
use futures::{FutureExt, Stream, StreamExt};
use futures::{FutureExt, Stream};
use proto_flow::{
derive::{Request, Response},
runtime::DeriveRequestExt,
Expand Down Expand Up @@ -63,8 +63,5 @@ where
L: Fn(&ops::Log) + Clone + Send + Sync + 'static,
R: Stream<Item = anyhow::Result<Request>> + Send + 'static,
{
let request_rx = crate::stream_error_to_status(request_rx).boxed();
let (connector, response_rx) = Connector::new(log_handler, request_rx, unseal);
tokio::spawn(async move { connector.run().await });
crate::stream_status_to_error(response_rx)
serve(log_handler, request_rx, unseal)
}
Loading

0 comments on commit a79a319

Please sign in to comment.