diff --git a/crates/derive-sqlite/src/connector.rs b/crates/derive-sqlite/src/connector.rs index ec173c9300..f0ad51a532 100644 --- a/crates/derive-sqlite/src/connector.rs +++ b/crates/derive-sqlite/src/connector.rs @@ -1,8 +1,7 @@ use super::{dbutil, do_validate, parse_validate, Config, Lambda, Param, Transform}; use anyhow::Context; use futures::channel::mpsc; -use futures::TryStreamExt; -use futures::{SinkExt, Stream}; +use futures::{SinkExt, StreamExt}; use prost::Message; use proto_flow::runtime::{derive_request_ext, derive_response_ext, DeriveRequestExt}; use proto_flow::{ @@ -10,12 +9,9 @@ use proto_flow::{ flow, RuntimeCheckpoint, }; -pub fn connector( - _peek_request: &Request, - request_rx: R, -) -> anyhow::Result>> +pub fn connector(request_rx: R) -> mpsc::Receiver> where - R: futures::stream::Stream> + Send + 'static, + R: futures::stream::Stream + Send + 'static, { let (mut response_tx, response_rx) = mpsc::channel(16); @@ -27,7 +23,7 @@ where }) }); - Ok(response_rx) + response_rx } async fn serve( @@ -35,7 +31,7 @@ async fn serve( response_tx: &mut mpsc::Sender>, ) -> anyhow::Result<()> where - R: futures::stream::Stream>, + R: futures::stream::Stream, { let mut request_rx = std::pin::pin!(request_rx); let tokio_handle = tokio::runtime::Handle::current(); @@ -47,7 +43,7 @@ where let mut maybe_handle: Option = None; loop { - match request_rx.try_next().await? { + match request_rx.next().await { None => return Ok(()), Some(Request { validate: Some(validate), diff --git a/crates/flowctl/src/raw/capture.rs b/crates/flowctl/src/raw/capture.rs index 7786694fa6..2d3864eab7 100644 --- a/crates/flowctl/src/raw/capture.rs +++ b/crates/flowctl/src/raw/capture.rs @@ -127,10 +127,6 @@ pub async fn do_capture( .map(|i| i.clone().into()) .unwrap_or(std::time::Duration::from_secs(1)); - let mut ticker = tokio::time::interval(interval); - ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); - _ = ticker.tick().await; // First tick is immediate. - let mut output = std::io::stdout(); // TODO(johnny): This is currently only partly implemented, but is awaiting @@ -147,15 +143,18 @@ pub async fn do_capture( // Upon a checkpoint, wait until the next tick interval has elapsed before acknowledging. if let Some(_checkpoint) = response.checkpoint { - _ = ticker.tick().await; - - request_tx - .send(Ok(capture::Request { - acknowledge: Some(capture::request::Acknowledge { checkpoints: 1 }), - ..Default::default() - })) - .await - .unwrap(); + let mut request_tx = request_tx.clone(); + tokio::spawn(async move { + () = tokio::time::sleep(interval).await; + + request_tx + .feed(Ok(capture::Request { + acknowledge: Some(capture::request::Acknowledge { checkpoints: 1 }), + ..Default::default() + })) + .await + .unwrap(); + }); } } diff --git a/crates/ops/src/lib.rs b/crates/ops/src/lib.rs index 9271cb6a73..2665975106 100644 --- a/crates/ops/src/lib.rs +++ b/crates/ops/src/lib.rs @@ -148,6 +148,14 @@ impl std::fmt::Debug for DebugJson { } } +pub fn merge_docs_and_bytes(from: &stats::DocsAndBytes, to: &mut Option) { + if from.docs_total != 0 { + let entry = to.get_or_insert_with(Default::default); + entry.docs_total += from.docs_total; + entry.bytes_total += from.bytes_total; + } +} + #[cfg(test)] mod test { use super::{Log, LogLevel}; diff --git a/crates/runtime/src/capture/connector.rs b/crates/runtime/src/capture/connector.rs new file mode 100644 index 0000000000..a8d05214a8 --- /dev/null +++ b/crates/runtime/src/capture/connector.rs @@ -0,0 +1,231 @@ +use crate::{unseal, LogHandler, Runtime}; +use anyhow::Context; +use futures::{channel::mpsc, stream::BoxStream, FutureExt, StreamExt}; +use proto_flow::{ + capture::{Request, Response}, + flow::capture_spec::ConnectorType, + runtime::CaptureRequestExt, +}; + +// Start a capture connector as indicated by the `initial` Request. +// Returns a pair of Streams for sending Requests and receiving Responses. +pub async fn start( + runtime: &Runtime, + mut initial: Request, +) -> anyhow::Result<( + mpsc::Sender, + BoxStream<'static, anyhow::Result>, +)> { + let (endpoint, log_level, config_json) = extract_endpoint(&mut initial)?; + let (mut connector_tx, connector_rx) = mpsc::channel(crate::CHANNEL_BUFFER); + + // Adjust the dynamic log level for this connector's lifecycle. + if let (Some(log_level), Some(set_log_level)) = (log_level, &runtime.set_log_level) { + (set_log_level)(log_level); + } + + fn attach_container(response: &mut Response, container: crate::image_connector::Container) { + response.set_internal(|internal| { + internal.container = Some(container); + }); + } + + fn start_rpc( + channel: tonic::transport::Channel, + rx: mpsc::Receiver, + ) -> crate::image_connector::StartRpcFuture { + async move { + proto_grpc::capture::connector_client::ConnectorClient::new(channel) + .capture(rx) + .await + } + .boxed() + } + + let connector_rx = match endpoint { + models::CaptureEndpoint::Connector(models::ConnectorConfig { + image, + config: sealed_config, + }) => { + *config_json = unseal::decrypt_sops(&sealed_config).await?.to_string(); + connector_tx.try_send(initial).unwrap(); + + crate::image_connector::serve( + attach_container, + image, + runtime.log_handler.clone(), + log_level, + &runtime.container_network, + connector_rx, + start_rpc, + &runtime.task_name, + ops::TaskType::Capture, + ) + .await? + .boxed() + } + models::CaptureEndpoint::Local(_) if !runtime.allow_local => { + return Err(tonic::Status::failed_precondition( + "Local connectors are not permitted in this context", + ) + .into()); + } + models::CaptureEndpoint::Local(models::LocalConfig { + command, + config: sealed_config, + env, + protobuf, + }) => { + *config_json = unseal::decrypt_sops(&sealed_config).await?.to_string(); + connector_tx.try_send(initial).unwrap(); + + crate::local_connector::serve( + command, + env, + runtime.log_handler.clone(), + log_level, + protobuf, + connector_rx, + )? + .boxed() + } + }; + + Ok((connector_tx, connector_rx)) +} + +fn extract_endpoint<'r>( + request: &'r mut Request, +) -> anyhow::Result<( + models::CaptureEndpoint, + Option, + &'r mut String, +)> { + let log_level = match request.get_internal() { + Ok(CaptureRequestExt { + labels: Some(labels), + .. + }) => Some(labels.log_level()), + _ => None, + }; + + let (connector_type, config_json) = match request { + Request { + spec: Some(spec), .. + } => (spec.connector_type, &mut spec.config_json), + Request { + discover: Some(discover), + .. + } => (discover.connector_type, &mut discover.config_json), + Request { + validate: Some(validate), + .. + } => (validate.connector_type, &mut validate.config_json), + Request { + apply: Some(apply), .. + } => { + let inner = apply + .capture + .as_mut() + .context("`apply` missing required `capture`")?; + + (inner.connector_type, &mut inner.config_json) + } + Request { + open: Some(open), .. + } => { + let inner = open + .capture + .as_mut() + .context("`open` missing required `capture`")?; + + (inner.connector_type, &mut inner.config_json) + } + _ => { + return Err(crate::protocol_error( + "request does not contain an endpoint", + request, + )) + } + }; + + if connector_type == ConnectorType::Image as i32 { + Ok(( + models::CaptureEndpoint::Connector( + serde_json::from_str(config_json).context("parsing connector config")?, + ), + log_level, + config_json, + )) + } else if connector_type == ConnectorType::Local as i32 { + Ok(( + models::CaptureEndpoint::Local( + serde_json::from_str(config_json).context("parsing local config")?, + ), + log_level, + config_json, + )) + } else { + anyhow::bail!("invalid connector type: {connector_type}"); + } +} + +#[cfg(test)] +mod test { + use super::*; + use serde_json::json; + + #[tokio::test] + async fn test_http_ingest_spec() { + if let Err(_) = locate_bin::locate("flow-connector-init") { + // Skip if `flow-connector-init` isn't available (yet). We're probably on CI. + // This test is useful as a sanity check for local development + // and we have plenty of other coverage during CI. + return; + } + + let spec = || { + serde_json::from_value(json!({ + "spec": { + "connectorType": "IMAGE", + "config": { + "image": "ghcr.io/estuary/source-http-ingest:dev", + "config": {}, + } + } + })) + .unwrap() + }; + + let runtime = Runtime::new( + true, + "default".to_string(), + ops::tracing_log_handler, + None, + "test".to_string(), + ); + + let (connector_tx, connector_rx) = start(&runtime, spec()).await.unwrap(); + std::mem::drop(connector_tx); + + let mut responses: Vec<_> = connector_rx.collect().await; + let resp = responses.pop().unwrap().unwrap(); + + assert!(resp.spec.is_some()); + + let container = resp + .get_internal() + .expect("internal decodes") + .container + .expect("internal has attached container"); + + assert_eq!( + container.network_ports, + [proto_flow::flow::NetworkPort { + number: 8080, + protocol: String::new(), + public: true + }] + ); + } +} diff --git a/crates/runtime/src/capture/image.rs b/crates/runtime/src/capture/image.rs deleted file mode 100644 index cebad7095d..0000000000 --- a/crates/runtime/src/capture/image.rs +++ /dev/null @@ -1,154 +0,0 @@ -use super::extract_endpoint; -use crate::{ - image_connector::{serve, Container, StartRpcFuture, UnsealFuture, Unsealed}, - unseal, -}; -use futures::{channel::mpsc, FutureExt, Stream}; -use proto_flow::{ - capture::{Request, Response}, - runtime::CaptureRequestExt, -}; - -fn unseal(mut request: Request) -> Result, Request> { - if !matches!( - request, - Request { spec: Some(_), .. } - | Request { - discover: Some(_), - .. - } - | Request { - validate: Some(_), - .. - } - | Request { apply: Some(_), .. } - | Request { open: Some(_), .. } - ) { - return Err(request); // Not an unseal-able request. - }; - - Ok(async move { - let (endpoint, config_json) = extract_endpoint(&mut request)?; - - let models::CaptureEndpoint::Connector(models::ConnectorConfig { - image, - config: sealed_config, - }) = endpoint - else { - anyhow::bail!("task connector type has changed and is no longer an image") - }; - - *config_json = unseal::decrypt_sops(&sealed_config).await?.to_string(); - - let log_level = match request.get_internal() { - Ok(CaptureRequestExt { - labels: Some(labels), - .. - }) => Some(labels.log_level()), - _ => None, - }; - - Ok(Unsealed { - image, - log_level, - request, - }) - } - .boxed()) -} - -fn start_rpc( - channel: tonic::transport::Channel, - rx: mpsc::Receiver, -) -> StartRpcFuture { - async move { - proto_grpc::capture::connector_client::ConnectorClient::new(channel) - .capture(rx) - .await - } - .boxed() -} - -fn attach_container(response: &mut Response, container: Container) { - response.set_internal(|internal| { - internal.container = Some(container); - }); -} - -pub fn connector( - log_handler: L, - network: &str, - request_rx: R, - task_name: &str, -) -> mpsc::Receiver> -where - L: Fn(&ops::Log) + Clone + Send + Sync + 'static, - R: Stream> + Send + 'static, -{ - serve( - attach_container, - log_handler, - network, - request_rx, - start_rpc, - task_name, - ops::TaskType::Capture, - unseal, - ) -} - -#[cfg(test)] -mod test { - use super::connector; - use futures::StreamExt; - use serde_json::json; - - #[tokio::test] - async fn test_http_ingest_spec() { - if let Err(_) = locate_bin::locate("flow-connector-init") { - // Skip if `flow-connector-init` isn't available (yet). We're probably on CI. - // This test is useful as a sanity check for local development - // and we have plenty of other coverage during CI. - return; - } - - let request_rx = futures::stream::repeat_with(|| { - Ok(serde_json::from_value(json!({ - "spec": { - "connectorType": "IMAGE", - "config": { - "image": "ghcr.io/estuary/source-http-ingest:dev", - "config": {}, - } - } - })) - .unwrap()) - }); - - let response_rx = connector(ops::tracing_log_handler, "", request_rx.take(2), "a-task"); - - let responses: Vec<_> = response_rx.collect().await; - assert_eq!(responses.len(), 2); - - for resp in responses { - let resp = resp.unwrap(); - - assert!(resp.spec.is_some()); - - let container = resp - .get_internal() - .expect("internal decodes") - .container - .expect("internal has attached container"); - - assert_eq!( - container.network_ports, - [proto_flow::flow::NetworkPort { - number: 8080, - protocol: String::new(), - public: true - }] - ); - } - } -} diff --git a/crates/runtime/src/capture/local.rs b/crates/runtime/src/capture/local.rs deleted file mode 100644 index 00b06ca742..0000000000 --- a/crates/runtime/src/capture/local.rs +++ /dev/null @@ -1,70 +0,0 @@ -use super::extract_endpoint; -use crate::{ - local_connector::{serve, UnsealFuture, Unsealed}, - unseal, -}; -use futures::{channel::mpsc, FutureExt, Stream}; -use proto_flow::{ - capture::{Request, Response}, - runtime::CaptureRequestExt, -}; - -fn unseal(mut request: Request) -> Result, Request> { - if !matches!( - request, - Request { spec: Some(_), .. } - | Request { - discover: Some(_), - .. - } - | Request { - validate: Some(_), - .. - } - | Request { apply: Some(_), .. } - | Request { open: Some(_), .. } - ) { - return Err(request); // Not an unseal-able request. - }; - - Ok(async move { - let (endpoint, config_json) = extract_endpoint(&mut request)?; - - let models::CaptureEndpoint::Local(models::LocalConfig { - command, - config: sealed_config, - env, - protobuf, - }) = endpoint - else { - anyhow::bail!("task connector type has changed and is no longer an image") - }; - - *config_json = unseal::decrypt_sops(&sealed_config).await?.to_string(); - - let log_level = match request.get_internal() { - Ok(CaptureRequestExt { - labels: Some(labels), - .. - }) => Some(labels.log_level()), - _ => None, - }; - - Ok(Unsealed { - command, - env, - log_level, - protobuf, - request, - }) - } - .boxed()) -} - -pub fn connector(log_handler: L, request_rx: R) -> mpsc::Receiver> -where - L: Fn(&ops::Log) + Clone + Send + Sync + 'static, - R: Stream> + Send + 'static, -{ - serve(log_handler, request_rx, unseal) -} diff --git a/crates/runtime/src/capture/mod.rs b/crates/runtime/src/capture/mod.rs index 887748c94a..b9c80a951d 100644 --- a/crates/runtime/src/capture/mod.rs +++ b/crates/runtime/src/capture/mod.rs @@ -1,195 +1,86 @@ -use super::Runtime; -use anyhow::Context; -use futures::{Stream, StreamExt, TryStreamExt}; +use ::ops::stats::DocsAndBytes; +use futures::Stream; use proto_flow::capture::{Request, Response}; -use proto_flow::flow::capture_spec::ConnectorType; -use proto_flow::ops; -use proto_flow::runtime::CaptureRequestExt; -use std::pin::Pin; -use std::sync::Arc; - -// Notes on how we can structure capture middleware: - -// Request loop: -// - Spec / Discover / Validate / Apply: Unseal. Forward request. -// - Open: Rebuild State. Unseal. Retain explicit-ack. Forward request. -// - Acknowledge: Notify response loop. Forward iff explicit-ack. - -// Response loop: -// - Spec / Discovered / Validated / Applied: Forward response. -// - Opened: Acquire State. Re-init combiners. Forward response. -// - Captured: Validate & add to combiner. -// - Checkpoint: Reduce checkpoint. -// If "full": block until Acknowledge notification is ready. -// If Acknowledge notification is ready: -// Drain combiner into forwarded Captured. -// Forward Checkpoint enriched with stats. - -mod image; -mod local; - -#[tonic::async_trait] -impl proto_grpc::capture::connector_server::Connector for Runtime -where - L: Fn(&ops::Log) + Send + Sync + Clone + 'static, -{ - type CaptureStream = futures::stream::BoxStream<'static, tonic::Result>; - - async fn capture( - &self, - request: tonic::Request>, - ) -> tonic::Result> { - let conn_info = request - .extensions() - .get::(); - tracing::debug!(?request, ?conn_info, "started capture request"); - - let request_rx = crate::stream_status_to_error(request.into_inner()); - - let response_rx = self - .clone() - .serve_capture(request_rx) - .await - .map_err(crate::anyhow_to_status)?; - - Ok(tonic::Response::new( - crate::stream_error_to_status(response_rx).boxed(), - )) - } +use std::collections::{BTreeMap, BTreeSet}; + +mod connector; +mod protocol; +mod serve; +mod task; + +pub trait RequestStream: Stream> + Send + Unpin + 'static {} +impl> + Send + Unpin + 'static> RequestStream for T {} + +pub trait ResponseStream: Stream> + Send + 'static {} +impl> + Send + 'static> ResponseStream for T {} + +// Task definition for a capture. +pub struct Task { + // Bindings of this task. + bindings: Vec, + // Does the capture connector want explicit acknowledgements? + explicit_acknowledgements: bool, + // Instant at which this Task is eligible for restart. + restart: tokio::time::Instant, + // ShardRef of this task. + shard_ref: ops::ShardRef, } -impl Runtime -where - L: Fn(&ops::Log) + Send + Sync + Clone + 'static, -{ - pub async fn serve_capture( - self, - request_rx: In, - ) -> anyhow::Result> + Send> - where - In: Stream> + Send + Unpin + 'static, - { - let mut request_rx = request_rx.peekable(); - - let mut peek_request = match Pin::new(&mut request_rx).peek().await { - Some(Ok(peek)) => peek.clone(), - Some(Err(_)) => return Err(request_rx.try_next().await.unwrap_err()), - None => return Ok(futures::stream::empty().boxed()), - }; - let (endpoint, _) = extract_endpoint(&mut peek_request).map_err(crate::anyhow_to_status)?; - - // NOTE(johnny): To debug requests / responses at any layer of this interceptor stack, try: - // let request_rx = request_rx.inspect_ok(|request| { - // eprintln!("REQUEST: {}", serde_json::to_string(request).unwrap()); - // }); - // - // let response_rx = response_rx.inspect_ok(|response| { - // eprintln!("RESPONSE: {}", serde_json::to_string(response).unwrap()); - // }); - - // Request interceptor which adjusts the dynamic log level based on internal shard labels. - let request_rx = adjust_log_level(request_rx, self.set_log_level); - - let response_rx = match endpoint { - models::CaptureEndpoint::Connector(_) => image::connector( - self.log_handler, - &self.container_network, - request_rx, - &self.task_name, - ) - .boxed(), - models::CaptureEndpoint::Local(_) if !self.allow_local => { - Err(tonic::Status::failed_precondition( - "Local connectors are not permitted in this context", - ))? - } - models::CaptureEndpoint::Local(_) => { - local::connector(self.log_handler, request_rx).boxed() - } - }; - - Ok(response_rx) - } +// Binding definition for a capture. +struct Binding { + // Target collection. + collection_name: String, + // JSON pointer at which document UUIDs are added. + document_uuid_ptr: doc::Pointer, + // Key components which are extracted from written documents. + key_extractors: Vec, + // Partition values which are extracted from written documents. + partition_extractors: Vec, + // Specification of this binding. + resource_path: Vec, + // Serialization policy for the Target collection. + ser_policy: doc::SerPolicy, + // Write schema of the target collection. + write_schema_json: String, } -pub fn adjust_log_level( - request_rx: R, - set_log_level: Option>, -) -> impl Stream> -where - R: Stream> + Send + 'static, -{ - request_rx.inspect_ok(move |request| { - let Ok(CaptureRequestExt { - labels: Some(ops::ShardLabeling { log_level, .. }), - }) = request.get_internal() - else { - return; - }; - - if let (Some(log_level), Some(set_log_level)) = - (ops::log::Level::from_i32(log_level), &set_log_level) - { - (set_log_level)(log_level); - } - }) +// Transaction of captured documents, checkpoints, and associated stats. +pub struct Transaction { + // Number of captured document bytes rolled up in this transaction. + captured_bytes: usize, + // Number of connector checkpoints rolled up in this transaction. + checkpoints: u32, + // Time of first connector Captured or Checkpoint response. + started_at: std::time::SystemTime, + // Statistics of (read documents, combined documents) for each binding. + stats: BTreeMap, + // Set of bindings which updated their inferred Shape this transaction. + updated_inferences: BTreeSet, } -// Returns the CaptureEndpoint of this Request, and a mutable reference to its inner config_json. -fn extract_endpoint<'r>( - request: &'r mut Request, -) -> anyhow::Result<(models::CaptureEndpoint, &'r mut String)> { - let (connector_type, config_json) = match request { - Request { - spec: Some(spec), .. - } => (spec.connector_type, &mut spec.config_json), - Request { - discover: Some(discover), - .. - } => (discover.connector_type, &mut discover.config_json), - Request { - validate: Some(validate), - .. - } => (validate.connector_type, &mut validate.config_json), - Request { - apply: Some(apply), .. - } => { - let inner = apply - .capture - .as_mut() - .context("`apply` missing required `capture`")?; - - (inner.connector_type, &mut inner.config_json) - } - Request { - open: Some(open), .. - } => { - let inner = open - .capture - .as_mut() - .context("`open` missing required `capture`")?; - - (inner.connector_type, &mut inner.config_json) +// LONG_POLL_TIMEOUT is the amount of time we will long-poll for a ready Chunk +// from the connector. If no Chunk arrives within this timeout then we return +// control to the client, and they must long-poll again for a ready Chunk. +// +// LONG_POLL_TIMEOUT bounds live-ness when gracefully stopping or when restarting +// a capture session due to a new request::Open, as re-opening a session can only +// happen in between long-polls. +const LONG_POLL_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(5); + +// COMBINER_BYTE_THRESHOLD is a coarse target on the documents which can be +// optimistically combined within a capture transaction, while awaiting the +// commit of a previous transaction. Upon reaching this threshold, further +// documents and checkpoints will not be folded into the transaction. +const COMBINER_BYTE_THRESHOLD: usize = 1 << 25; // 32MB. + +impl Transaction { + pub fn new() -> Self { + Self { + captured_bytes: 0, + checkpoints: 0, + started_at: std::time::SystemTime::UNIX_EPOCH, + stats: Default::default(), + updated_inferences: Default::default(), } - - _ => anyhow::bail!("request {request:?} does not contain an endpoint"), - }; - - if connector_type == ConnectorType::Image as i32 { - Ok(( - models::CaptureEndpoint::Connector( - serde_json::from_str(config_json).context("parsing connector config")?, - ), - config_json, - )) - } else if connector_type == ConnectorType::Local as i32 { - Ok(( - models::CaptureEndpoint::Local( - serde_json::from_str(config_json).context("parsing local config")?, - ), - config_json, - )) - } else { - anyhow::bail!("invalid connector type: {connector_type}"); } } diff --git a/crates/runtime/src/capture/protocol.rs b/crates/runtime/src/capture/protocol.rs new file mode 100644 index 0000000000..c517387f03 --- /dev/null +++ b/crates/runtime/src/capture/protocol.rs @@ -0,0 +1,320 @@ +use super::{Task, Transaction}; +use crate::task_state::RocksDB; +use anyhow::Context; +use prost::Message; +use proto_flow::capture::{request, response, Request, Response}; +use proto_flow::flow; +use proto_flow::runtime::{capture_request_ext, capture_response_ext, CaptureRequestExt}; +use std::collections::BTreeMap; +use std::sync::Arc; + +pub fn recv_client_first_open(open: &Request) -> anyhow::Result { + let db = RocksDB::open(open.get_internal()?.open.and_then(|o| o.rocksdb_descriptor))?; + + Ok(db) +} + +pub fn recv_client_open(open: &mut Request, db: &RocksDB) -> anyhow::Result<()> { + if let Some(state) = db.load_connector_state()? { + let open = open.open.as_mut().unwrap(); + open.state_json = state; + tracing::debug!(state=%open.state_json, "loaded and attached a persisted connector Open.state_json"); + } else { + tracing::debug!("no previously-persisted connector state was found"); + } + Ok(()) +} + +pub fn recv_connector_opened( + open: &Request, + opened: Option, + shapes_by_key: &mut BTreeMap, +) -> anyhow::Result<( + Arc, + Vec, + doc::combine::Accumulator, + doc::combine::Accumulator, + Response, +)> { + let Some(opened) = opened else { + anyhow::bail!("unexpected connector EOF reading Opened") + }; + + let task = Arc::new(Task::new(&open, &opened)?); + // Inferred document shapes, indexed by binding offset. + let shapes = task.binding_shapes_by_index(std::mem::take(shapes_by_key)); + + // Create a pair of accumulators. While one is draining, the other is accumulating. + let a1 = doc::combine::Accumulator::new(task.combine_spec()?, tempfile::tempfile()?)?; + let a2 = doc::combine::Accumulator::new(task.combine_spec()?, tempfile::tempfile()?)?; + + let opened = Response { + opened: Some(response::Opened { + explicit_acknowledgements: true, + }), + ..Default::default() + }; + Ok((task, shapes, a1, a2, opened)) +} + +pub fn send_client_poll_not_ready() -> Response { + Response { + checkpoint: Some(response::Checkpoint::default()), + ..Default::default() + } +} + +pub fn send_client_poll_ready() -> Response { + Response { + captured: Some(response::Captured::default()), + ..Default::default() + } +} + +pub fn send_connector_acknowledge(last_checkpoints: u32, task: &Task) -> Option { + if last_checkpoints != 0 && task.explicit_acknowledgements { + Some(Request { + acknowledge: Some(request::Acknowledge { + checkpoints: last_checkpoints, + }), + ..Default::default() + }) + } else { + None + } +} + +pub fn send_client_captured_or_checkpoint( + buf: &mut bytes::BytesMut, + batch: &mut rocksdb::WriteBatch, + drained: doc::combine::DrainedDoc, + shapes: &mut [doc::Shape], + task: &Task, + txn: &mut Transaction, +) -> Response { + let doc::combine::DrainedDoc { meta, root } = drained; + + let index = meta.binding(); + + if index == task.bindings.len() { + // This is a Checkpoint. + // This is a checkpoint state update. + let updated_json = + serde_json::to_string(&doc::SerPolicy::default().on_owned(&root)).unwrap(); + + () = batch.merge(RocksDB::CONNECTOR_STATE_KEY, &updated_json); + + let state = flow::ConnectorState { + merge_patch: true, + updated_json, + }; + return Response { + checkpoint: Some(response::Checkpoint { state: Some(state) }), + ..Default::default() + }; + } + + let binding = &task.bindings[index]; + + let key_packed = doc::Extractor::extract_all_owned(&root, &binding.key_extractors, buf); + let partitions_packed = + doc::Extractor::extract_all_owned(&root, &binding.partition_extractors, buf); + let doc_json = serde_json::to_string(&binding.ser_policy.on_owned(&root)) + .expect("document serialization cannot fail"); + + let stats = &mut txn.stats.entry(index as u32).or_default().1; + stats.docs_total += 1; + stats.bytes_total += doc_json.len() as u64; + + if shapes[index].widen_owned(&root) { + doc::shape::limits::enforce_shape_complexity_limit( + &mut shapes[index], + doc::shape::limits::DEFAULT_SCHEMA_COMPLEXITY_LIMIT, + ); + txn.updated_inferences.insert(index); + } + + Response { + captured: Some(response::Captured { + binding: index as u32, + doc_json, + }), + ..Default::default() + } + .with_internal_buf(buf, |internal| { + internal.captured = Some(capture_response_ext::Captured { + key_packed, + partitions_packed, + }); + }) +} + +pub fn send_client_final_checkpoint( + buf: &mut bytes::BytesMut, + task: &Task, + txn: &Transaction, +) -> Response { + let mut capture = BTreeMap::::new(); + + for (index, binding_stats) in txn.stats.iter() { + let index = *index as usize; + let entry = capture + .entry(task.bindings[index].collection_name.clone()) + .or_default(); + + ops::merge_docs_and_bytes(&binding_stats.0, &mut entry.right); + ops::merge_docs_and_bytes(&binding_stats.1, &mut entry.out); + } + + let stats = ops::Stats { + capture, + derive: None, + interval: None, + materialize: Default::default(), + meta: Some(ops::Meta { + uuid: crate::UUID_PLACEHOLDER.to_string(), + }), + open_seconds_total: txn.started_at.elapsed().unwrap().as_secs_f64(), + shard: Some(task.shard_ref.clone()), + timestamp: Some(proto_flow::as_timestamp(txn.started_at)), + txn_count: 1, + }; + + Response { + checkpoint: Some(response::Checkpoint { state: None }), + ..Default::default() + } + .with_internal_buf(buf, |internal| { + internal.checkpoint = Some(capture_response_ext::Checkpoint { stats: Some(stats) }) + }) +} + +pub fn recv_client_start_commit( + mut batch: rocksdb::WriteBatch, + db: &RocksDB, + request: Option, + shapes: &[doc::Shape], + task: &Task, + txn: &Transaction, +) -> anyhow::Result<()> { + let Some(request) = request else { + anyhow::bail!("unexpected runtime EOF reading StartCommit"); + }; + let CaptureRequestExt { + start_commit: + Some(capture_request_ext::StartCommit { + runtime_checkpoint: Some(runtime_checkpoint), + .. + }), + .. + } = request.get_internal()? + else { + return Err(crate::protocol_error( + "expected StartCommit with runtime_checkpoint", + request, + )); + }; + + // Add the runtime checkpoint to our WriteBatch. + tracing::debug!( + ?runtime_checkpoint, + "persisting StartCommit.runtime_checkpoint", + ); + batch.put(RocksDB::CHECKPOINT_KEY, &runtime_checkpoint.encode_to_vec()); + + // We're about to write out our write batch which, when written to the + // recovery log, irrevocably commits our transaction. Before doing so, + // produce structured logs of all inferred schemas that have changed + // in this transaction. + for binding in txn.updated_inferences.iter() { + let serialized = + serde_json::to_value(&doc::shape::schema::to_schema(shapes[*binding].clone())) + .expect("shape serialization should never fail"); + + tracing::info!( + schema = ?::ops::DebugJson(serialized), + collection_name = %task.bindings[*binding].collection_name, + binding = binding, + "inferred schema updated" + ); + } + + // Atomically write our commit batch. + db.write(batch) + .context("failed to write atomic RocksDB commit")?; + + Ok(()) +} + +pub fn send_client_started_commit() -> Response { + Response { + checkpoint: Some(response::Checkpoint { state: None }), + ..Default::default() + } +} + +pub fn recv_connector_captured( + accumulator: &mut doc::combine::Accumulator, + captured: response::Captured, + task: &Task, + txn: &mut Transaction, +) -> anyhow::Result<()> { + let response::Captured { binding, doc_json } = captured; + + let memtable = accumulator.memtable()?; + let alloc = memtable.alloc(); + + let mut doc = memtable + .parse_json_str(&doc_json) + .context("couldn't parse captured document as JSON")?; + + let uuid_ptr = &task + .bindings + .get(binding as usize) + .with_context(|| "invalid captured binding {binding}")? + .document_uuid_ptr; + + if let Some(node) = uuid_ptr.create_heap_node(&mut doc, alloc) { + *node = doc::HeapNode::String(doc::BumpStr::from_str(crate::UUID_PLACEHOLDER, alloc)); + } + memtable.add(binding, doc, false)?; + + let stats = txn.stats.entry(binding).or_default(); + stats.0.docs_total += 1; + stats.0.bytes_total += doc_json.len() as u64; + + txn.captured_bytes += doc_json.len(); + Ok(()) +} + +pub fn recv_connector_checkpoint( + accumulator: &mut doc::combine::Accumulator, + response: Response, + task: &Task, + txn: &mut Transaction, +) -> anyhow::Result<()> { + let Some(response::Checkpoint { state: Some(state) }) = response.checkpoint else { + return Err(crate::protocol_error( + "expected Captured or Checkpoint with state", + response, + )); + }; + let flow::ConnectorState { + updated_json, + merge_patch, + } = state; + + let memtable = accumulator.memtable()?; + let doc = memtable + .parse_json_str(&updated_json) + .context("couldn't parse connector state as JSON")?; + + // Combine over the checkpoint state. + if !merge_patch { + memtable.add(task.bindings.len() as u32, doc::HeapNode::Null, false)?; + } + memtable.add(task.bindings.len() as u32, doc, false)?; + + txn.checkpoints += 1; + Ok(()) +} diff --git a/crates/runtime/src/capture/serve.rs b/crates/runtime/src/capture/serve.rs new file mode 100644 index 0000000000..f3a2c00ca8 --- /dev/null +++ b/crates/runtime/src/capture/serve.rs @@ -0,0 +1,344 @@ +use super::{connector, protocol::*, RequestStream, ResponseStream, Task, Transaction}; +use crate::task_state::RocksDB; +use crate::{LogHandler, Runtime}; +use anyhow::Context; +use futures::channel::{mpsc, oneshot}; +use futures::future::FusedFuture; +use futures::stream::FusedStream; +use futures::{FutureExt, SinkExt, StreamExt, TryStreamExt}; +use proto_flow::capture::{request, Request, Response}; +use std::{collections::BTreeMap, sync::Arc}; + +#[tonic::async_trait] +impl proto_grpc::capture::connector_server::Connector for Runtime { + type CaptureStream = futures::stream::BoxStream<'static, tonic::Result>; + + async fn capture( + &self, + request: tonic::Request>, + ) -> tonic::Result> { + let conn_info = request + .extensions() + .get::(); + tracing::debug!(?request, ?conn_info, "started capture request"); + + let request_rx = crate::stream_status_to_error(request.into_inner()); + let response_rx = crate::stream_error_to_status(self.clone().serve_capture(request_rx)); + + Ok(tonic::Response::new(response_rx.boxed())) + } +} + +impl Runtime { + pub fn serve_capture(self, mut request_rx: impl RequestStream) -> impl ResponseStream { + coroutines::try_coroutine(move |mut co| async move { + let Some(mut open) = serve_unary(&self, &mut request_rx, &mut co).await? else { + return Ok::<(), anyhow::Error>(()); + }; + + let db = recv_client_first_open(&open)?; + let mut shapes = BTreeMap::new(); + let mut send_opened = false; + + loop { + let Some((next_open, next_opened)) = serve_session( + &mut co, + &db, + open, + &mut request_rx, + &self, + send_opened, + &mut shapes, + ) + .await? + else { + return Ok(()); + }; + + (open, send_opened) = (next_open, next_opened); + } + }) + } +} + +async fn serve_unary( + runtime: &Runtime, + request_rx: &mut impl RequestStream, + co: &mut coroutines::Suspend, +) -> anyhow::Result> { + while let Some(request) = request_rx.try_next().await? { + if request.open.is_some() { + return Ok(Some(request)); + } + let (connector_tx, mut connector_rx) = connector::start(runtime, request).await?; + std::mem::drop(connector_tx); // Send EOF. + + while let Some(response) = connector_rx.next().await { + () = co.yield_(response?).await; + } + } + Ok(None) +} + +async fn serve_session( + co: &mut coroutines::Suspend, + db: &RocksDB, + mut open: Request, + request_rx: &mut impl RequestStream, + runtime: &Runtime, + send_opened: bool, + shapes_by_key: &mut BTreeMap, +) -> anyhow::Result> { + recv_client_open(&mut open, &db)?; + + // Start connector stream and read Opened. + let (mut connector_tx, mut connector_rx) = connector::start(runtime, open.clone()).await?; + let opened = TryStreamExt::try_next(&mut connector_rx).await?; + + let (task, mut shapes, accumulator, mut next_accumulator, opened) = + recv_connector_opened(&open, opened, shapes_by_key)?; + + if send_opened { + () = co.yield_(opened).await; + } + + // Spawn a task that reads the first transaction from the connector. + let (mut yield_tx, yield_rx) = oneshot::channel(); + let mut next_txn = tokio::spawn(read_transaction( + accumulator, + connector_rx.fuse(), + task.clone(), + super::LONG_POLL_TIMEOUT, + yield_rx, + )); + let mut last_checkpoints: u32 = 0; // Checkpoints in the last transaction. + + loop { + // Receive initial request of a transaction: Acknowledge, Open, or EOF. + let _: request::Acknowledge = match request_rx.try_next().await? { + // An Acknowledge: + // * acknowledges the completion of all prior commits, and + // * long-polls for a transaction to be drained. + // + // We briefly block for up to POLL_TIMEOUT waiting for a ready transaction, + // and then reply with either: + // * an empty Captured to indicate we will drain a transaction, or + // * an empty Checkpoint to indicate there is no such transaction, + // and the caller should poll again. + // + // The caller must await our reply before sending its next message. + Some(Request { + acknowledge: Some(ack), + .. + }) => ack, + // An Open ends this session, and immediately starts a next one. + Some(open @ Request { open: Some(_), .. }) => { + *shapes_by_key = task.binding_shapes_by_key(shapes); + return Ok(Some((open, true))); // Do send client Opened. + } + // Caller sent EOF which gracefully ends this RPC. + None => return Ok(None), + // Anything else is a protocol error. + Some(request) => { + return Err(crate::protocol_error( + "expected Acknowledge, Open, or EOF", + request, + )) + } + }; + + // Acknowledge committed checkpoints to the connector. + if let Some(ack) = send_connector_acknowledge(last_checkpoints, &task) { + let _: Result<(), mpsc::SendError> = connector_tx.feed(ack).await; // Best-effort. + } + + // Signal that we're ready for the next transaction read to yield, and then await it. + std::mem::drop(yield_tx); + + let (accumulator, mut txn) = match next_txn.await.expect("read loop doesn't panic") { + ReadResult::Transaction { + accumulator, + connector_rx, + task, + txn, + } => { + // Start reading the next transaction. + let (next_yield_tx, yield_rx) = oneshot::channel(); + + next_txn = tokio::spawn(read_transaction( + next_accumulator, + connector_rx, + task, + super::LONG_POLL_TIMEOUT, + yield_rx, + )); + yield_tx = next_yield_tx; + + (accumulator, txn) + } + ReadResult::Restart => { + *shapes_by_key = task.binding_shapes_by_key(shapes); + return Ok(Some((open, false))); // Don't send client Opened. + } + ReadResult::Error(err) => return Err(err), + }; + + if txn.checkpoints == 0 { + () = co.yield_(send_client_poll_not_ready()).await; + next_accumulator = accumulator; + continue; + } + () = co.yield_(send_client_poll_ready()).await; + + // Prepare to drain `accumulator`. + let mut drainer = accumulator + .into_drainer() + .context("preparing to drain combiner")?; + let mut buf = bytes::BytesMut::new(); + + // Rocks WriteBatch into which we'll stage connector and runtime state updates. + // These will be written to the DB only upon a StartCommit message. + let mut batch = rocksdb::WriteBatch::default(); + + while let Some(drained) = drainer.drain_next()? { + let response = send_client_captured_or_checkpoint( + &mut buf, + &mut batch, + drained, + &mut shapes, + &task, + &mut txn, + ); + () = co.yield_(response).await; + } + + let checkpoint = send_client_final_checkpoint(&mut buf, &task, &txn); + () = co.yield_(checkpoint).await; + + let start_commit = request_rx.try_next().await?; + recv_client_start_commit(batch, &db, start_commit, &shapes, &task, &txn)?; + + () = co.yield_(send_client_started_commit()).await; + + last_checkpoints = txn.checkpoints; + next_accumulator = drainer.into_new_accumulator()?; + } +} + +// Result of a read of a next capture transaction. +pub enum ReadResult { + Error(anyhow::Error), + Restart, + Transaction { + accumulator: doc::combine::Accumulator, + connector_rx: R, + task: Arc, + txn: Transaction, + }, +} + +pub async fn read_transaction( + mut accumulator: doc::combine::Accumulator, + mut connector_rx: R, + task: Arc, + timeout: std::time::Duration, // How long we'll wait for a first checkpoint. + yield_rx: oneshot::Receiver<()>, // Signaled when we should return. +) -> ReadResult { + let mut saw_eof = false; // Did we receive None from `connector_rx`? + let mut txn = Transaction::new(); + let mut yield_rx = yield_rx.fuse(); // Enable `is_terminated()`. + + let poll_start = tokio::time::Instant::now(); + let poll_deadline = tokio::time::sleep_until(poll_start + timeout); + tokio::pin!(poll_deadline); + + // Loop over one or more response checkpoints. + loop { + let (woken, initial) = tokio::select! { + initial = connector_rx.next(), if !saw_eof && txn.captured_bytes < super::COMBINER_BYTE_THRESHOLD => (false, initial), + _ = &mut poll_deadline => (true, None), + _ = &mut yield_rx => (true, None), + }; + match (woken, initial) { + // Connector sent first response of a next checkpoint to read. + (false, Some(Ok(initial))) => { + if txn.checkpoints == 0 { + txn.started_at = std::time::SystemTime::now(); + } + if let Err(err) = read_checkpoint( + &mut accumulator, + &mut connector_rx, + initial, + &task, + &mut txn, + ) + .await + { + return ReadResult::Error(err); + } + } + // Connector sent an error. + (false, Some(Err(err))) => return ReadResult::Error(err), + // Connector sent EOF. + (false, None) => { + // Yield a non-empty transaction (EOF is handled on the next iteration). + if txn.checkpoints != 0 { + return ReadResult::Transaction { + accumulator, + connector_rx, + task, + txn, + }; + } + // Connector is done but can be restarted immediately. + if txn.checkpoints == 0 && poll_start > task.restart { + return ReadResult::Restart; + } + saw_eof = true; // Don't attempt further reads. + } + // We were woken. + (true, _) => { + // We have a non-empty transaction and were asked to yield. + if txn.checkpoints != 0 && yield_rx.is_terminated() { + return ReadResult::Transaction { + accumulator, + connector_rx, + task, + txn, + }; + } + // We have an empty transaction and deferred a requested yield, but reached `poll_deadline`, + if txn.checkpoints == 0 && yield_rx.is_terminated() && poll_deadline.is_elapsed() { + return ReadResult::Transaction { + accumulator, + connector_rx, + task, + txn, + }; + } + } + }; + } +} + +async fn read_checkpoint( + accumulator: &mut doc::combine::Accumulator, + connector_rx: &mut (impl ResponseStream + Unpin), + mut response: Response, + task: &Task, + txn: &mut Transaction, +) -> anyhow::Result<()> { + // Read all Captured responses of the checkpoint. + while let Some(captured) = response.captured { + recv_connector_captured(accumulator, captured, task, txn)?; + + // Read next response. + response = match connector_rx.next().await { + Some(Ok(response)) => response, + Some(Err(err)) => return Err(err), + None => anyhow::bail!("unexpected capture connector EOF while within a checkpoint"), + }; + } + + recv_connector_checkpoint(accumulator, response, task, txn) +} diff --git a/crates/runtime/src/capture/task.rs b/crates/runtime/src/capture/task.rs new file mode 100644 index 0000000000..03711f8ef1 --- /dev/null +++ b/crates/runtime/src/capture/task.rs @@ -0,0 +1,164 @@ +use super::{Binding, Task}; +use anyhow::Context; +use proto_flow::capture::{request, response, Request, Response}; +use proto_flow::flow; +use std::collections::BTreeMap; + +impl Task { + pub fn new(open: &Request, opened: &Response) -> anyhow::Result { + let request::Open { + capture: spec, + range, + state_json: _, + version: _, + } = open.clone().open.context("expected Open")?; + + let response::Opened { + explicit_acknowledgements, + } = opened.clone().opened.context("expected Opened")?; + + let flow::CaptureSpec { + bindings, + config_json: _, + connector_type: _, + interval_seconds, + name, + network_ports: _, + recovery_log_template: _, + shard_template: _, + } = spec.as_ref().context("missing capture")?; + let range = range.context("missing range")?; + + let ser_policy = doc::SerPolicy::default(); + + let bindings = bindings + .into_iter() + .enumerate() + .map(|(index, spec)| Binding::new(spec, ser_policy.clone()).context(index)) + .collect::, _>>()?; + + let restart = std::time::Duration::from_secs(*interval_seconds as u64); + let restart = tokio::time::Instant::now().checked_add(restart).unwrap(); + + let shard_ref = ops::ShardRef { + kind: ops::TaskType::Capture as i32, + name: name.clone(), + key_begin: format!("{:08x}", range.key_begin), + r_clock_begin: format!("{:08x}", range.r_clock_begin), + }; + + Ok(Self { + bindings, + explicit_acknowledgements, + restart, + shard_ref, + }) + } + + pub fn binding_shapes_by_index( + &self, + mut by_key: BTreeMap, + ) -> Vec { + let mut by_index = Vec::new(); + by_index.resize_with(self.bindings.len(), || doc::shape::Shape::nothing()); + + for (index, binding) in self.bindings.iter().enumerate() { + let key = binding.resource_path.join("\t"); // TODO(johnny): bindings PRD. + + if let Some(shape) = by_key.remove(&key) { + by_index[index] = shape; + } + } + + by_index + } + + pub fn binding_shapes_by_key(&self, by_index: Vec) -> BTreeMap { + let mut by_key = BTreeMap::new(); + + for (index, shape) in by_index.into_iter().enumerate() { + let key = self.bindings[index].resource_path.join("\t"); // TODO(johnny): bindings PRD. + by_key.insert(key, shape); + } + by_key + } + + pub fn combine_spec(&self) -> anyhow::Result { + let combiner_spec = self + .bindings + .iter() + .enumerate() + .map(|(index, binding)| binding.combiner_spec().context(index)) + .collect::, _>>()?; + + let state_schema = doc::reduce::merge_patch_schema().to_string(); + let state_schema = doc::validation::build_bundle(&state_schema).unwrap(); + let state_validator = doc::Validator::from_schema(state_schema).unwrap(); + + // Build combiner Spec with all bindings, plus one extra for state reductions. + let combiner_spec = doc::combine::Spec::with_bindings( + combiner_spec + .into_iter() + .map(|(is_full, key, validator)| (is_full, key, None, validator)) + .chain(std::iter::once((false, Vec::new(), None, state_validator))), + ); + + Ok(combiner_spec) + } +} + +impl Binding { + pub fn new( + spec: &flow::capture_spec::Binding, + ser_policy: doc::SerPolicy, + ) -> anyhow::Result { + let flow::capture_spec::Binding { + collection, + resource_config_json: _, + resource_path, + } = spec; + + let flow::CollectionSpec { + ack_template_json: _, + derivation: _, + key, + name, + partition_fields, + partition_template: _, + projections, + read_schema_json: _, + uuid_ptr, + write_schema_json, + } = collection.as_ref().context("missing collection")?; + + if uuid_ptr.is_empty() { + anyhow::bail!("uuid_ptr cannot be empty"); + } else if key.is_empty() { + anyhow::bail!("collection key cannot be empty"); + } + + let document_uuid_ptr = doc::Pointer::from(uuid_ptr); + let key_extractors = extractors::for_key(&key, &projections, &ser_policy)?; + let partition_extractors = + extractors::for_fields(&partition_fields, &projections, &ser_policy)?; + + Ok(Self { + collection_name: name.clone(), + document_uuid_ptr, + key_extractors, + partition_extractors, + resource_path: resource_path.clone(), + ser_policy, + write_schema_json: write_schema_json.clone(), + }) + } + + pub fn combiner_spec(&self) -> anyhow::Result<(bool, Vec, doc::Validator)> { + let built_schema = doc::validation::build_bundle(&self.write_schema_json) + .context("collection write_schema_json is not a JSON schema")?; + let validator = doc::Validator::from_schema(built_schema) + .context("could not build a schema validator")?; + + Ok((false, self.key_extractors.clone(), validator)) + } +} diff --git a/crates/runtime/src/container.rs b/crates/runtime/src/container.rs index f9cfa50844..1287ebee25 100644 --- a/crates/runtime/src/container.rs +++ b/crates/runtime/src/container.rs @@ -14,17 +14,14 @@ const CONNECTOR_INIT_PORT: u16 = 49092; /// Start an image connector container, returning its description and a dialed tonic Channel. /// The container is attached to the given `network`, and its logs are dispatched to `log_handler`. /// `task_name` and `task_type` are used only to label the container. -pub async fn start( +pub async fn start( image: &str, - log_handler: L, + log_handler: impl crate::LogHandler, log_level: Option, network: &str, task_name: &str, task_type: ops::TaskType, -) -> anyhow::Result<(runtime::Container, tonic::transport::Channel, Guard)> -where - L: Fn(&ops::Log) + Send + Sync + 'static, -{ +) -> anyhow::Result<(runtime::Container, tonic::transport::Channel, Guard)> { // Many operational contexts only allow for docker volume mounts // from certain locations: // * Docker for Mac restricts file shares to /User, /tmp, and a couple others. @@ -520,7 +517,8 @@ mod test { "a-task-name", proto_flow::ops::TaskType::Capture, ) - .await else { + .await + else { panic!("didn't crash") }; diff --git a/crates/runtime/src/derive/combine.rs b/crates/runtime/src/derive/combine.rs deleted file mode 100644 index e0caccfdea..0000000000 --- a/crates/runtime/src/derive/combine.rs +++ /dev/null @@ -1,400 +0,0 @@ -use anyhow::Context; -use futures::{channel::mpsc, SinkExt, Stream, StreamExt, TryStreamExt}; -use proto_flow::derive::{request, response, Request, Response}; -use proto_flow::flow::{self, collection_spec, CollectionSpec}; -use proto_flow::ops; -use proto_flow::runtime::derive_response_ext; -use std::time::SystemTime; - -pub fn adapt_requests( - _peek_request: &Request, - request_rx: R, -) -> anyhow::Result<(impl Stream>, ResponseArgs)> -where - R: Stream>, -{ - // Maximum UUID Clock value observed in request::Read documents. - let mut max_clock = 0; - // Statistics for read documents, passed to the response stream on flush. - let mut read_stats: Vec = Vec::new(); - // Time at which the current transaction was started. - let mut started_at: Option = None; - // Channel for passing request::Open to the response stream. - let (mut open_tx, open_rx) = mpsc::channel(1); - // Channel for passing statistics to the response stream on request::Flush. - let (mut flush_tx, flush_rx) = mpsc::channel(1); - - let request_rx = coroutines::try_coroutine(move |mut co| async move { - let mut request_rx = std::pin::pin!(request_rx); - - while let Some(request) = request_rx.try_next().await? { - if let Some(open) = &request.open { - // Tell the response loop about the request::Open. - // It will inspect it upon a future response::Opened message. - open_tx - .feed(open.clone()) - .await - .context("failed to send request::Open to response stream")?; - } else if let Some(_flush) = &request.flush { - // Tell the response loop about our flush statistics. - // It will inspect it upon a future response::Flushed message. - let flush = ( - max_clock, - std::mem::take(&mut read_stats), - started_at.take().unwrap_or_else(|| SystemTime::now()), - ); - flush_tx - .feed(flush) - .await - .context("failed to send request::Flush to response stream")?; - } else if let Some(read) = &request.read { - // Track start time of the transaction as time of first Read. - if started_at.is_none() { - started_at = Some(SystemTime::now()); - } - // Track the largest document clock that we've observed. - match &read.uuid { - Some(flow::UuidParts { clock, .. }) if *clock > max_clock => max_clock = *clock, - _ => (), - } - // Accumulate metrics over reads for our transforms. - if read.transform as usize >= read_stats.len() { - read_stats.resize(read.transform as usize + 1, Default::default()); - } - let read_stats = &mut read_stats[read.transform as usize]; - read_stats.docs_total += 1; - read_stats.bytes_total += read.doc_json.len() as u64; - } - - co.yield_(request).await; // Forward all requests. - } - Ok(()) - }); - - Ok((request_rx, ResponseArgs { open_rx, flush_rx })) -} - -pub struct ResponseArgs { - open_rx: mpsc::Receiver, - flush_rx: mpsc::Receiver<(u64, Vec, SystemTime)>, -} - -pub fn adapt_responses( - args: ResponseArgs, - response_rx: R, -) -> impl Stream> -where - R: Stream>, -{ - let ResponseArgs { - mut flush_rx, - mut open_rx, - } = args; - - // Statistics for documents published by us when draining. - let mut drain_stats: ops::stats::DocsAndBytes = Default::default(); - // Inferred shape of published documents. - let mut inferred_shape: doc::Shape = doc::Shape::nothing(); - // Did `inferred_shape` change during the current transaction? - let mut inferred_shape_changed: bool = false; - // State of an opened derivation. - let mut maybe_opened: Option = None; - // Statistics for documents published by the wrapped delegate. - let mut publish_stats: ops::stats::DocsAndBytes = Default::default(); - - coroutines::try_coroutine(move |mut co| async move { - let mut response_rx = std::pin::pin!(response_rx); - - while let Some(response) = response_rx.try_next().await? { - if let Some(opened) = &response.opened { - let open = open_rx - .next() - .await - .context("failed to receive request::Open from request loop")?; - - maybe_opened = Some(Opened::build(open, opened)?); - co.yield_(response).await; // Forward. - } else if let Some(published) = &response.published { - let opened = maybe_opened - .as_mut() - .context("connector sent Published before Opened")?; - - opened.combine_right(&published)?; - publish_stats.docs_total += 1; - publish_stats.bytes_total += published.doc_json.len() as u64; - // Not forwarded. - } else if let Some(_flushed) = &response.flushed { - let mut opened = maybe_opened - .take() - .context("connector sent Flushed before Opened")?; - - let (max_clock, read_stats, started_at) = flush_rx - .next() - .await - .context("failed to receive on request::Flush from request loop")?; - - // Drain Combiner into Published responses. - let doc::Combiner::Accumulator(accumulator) = opened.combiner else { - unreachable!() - }; - - let mut drainer = accumulator - .into_drainer() - .context("preparing to drain combiner")?; - let mut buf = bytes::BytesMut::new(); - - while let Some(drained) = drainer.next() { - let doc::combine::DrainedDoc { meta: _, root } = drained?; - - if inferred_shape.widen_owned(&root) { - doc::shape::limits::enforce_shape_complexity_limit( - &mut inferred_shape, - doc::shape::limits::DEFAULT_SCHEMA_COMPLEXITY_LIMIT, - ); - inferred_shape_changed = true; - } - - let key_packed = - doc::Extractor::extract_all_owned(&root, &opened.key_extractors, &mut buf); - let partitions_packed = doc::Extractor::extract_all_owned( - &root, - &opened.partition_extractors, - &mut buf, - ); - - let doc_json = serde_json::to_string(&opened.ser_policy.on_owned(&root)) - .expect("document serialization cannot fail"); - drain_stats.docs_total += 1; - drain_stats.bytes_total += doc_json.len() as u64; - - let published = Response { - published: Some(response::Published { doc_json }), - ..Default::default() - } - .with_internal_buf(&mut buf, |internal| { - internal.published = Some(derive_response_ext::Published { - max_clock, - key_packed, - partitions_packed, - }); - }); - co.yield_(published).await; - } - // Combiner is now drained and is ready to accumulate again. - opened.combiner = doc::Combiner::Accumulator(drainer.into_new_accumulator()?); - - // Next we build up statistics to yield with our own response::Flushed. - let duration = started_at.elapsed().unwrap_or_default(); - - let transforms = opened - .transforms - .iter() - .zip(read_stats.into_iter()) - .filter_map(|((name, source), read_stats)| { - if read_stats.docs_total == 0 && read_stats.bytes_total == 0 { - None - } else { - Some(( - name.clone(), - ops::stats::derive::Transform { - input: Some(read_stats), - source: source.clone(), - }, - )) - } - }) - .collect(); - - let stats = ops::Stats { - capture: Default::default(), - derive: Some(ops::stats::Derive { - transforms, - published: maybe_counts(&mut publish_stats), - out: maybe_counts(&mut drain_stats), - }), - interval: None, - materialize: Default::default(), - meta: Some(ops::Meta { - uuid: crate::UUID_PLACEHOLDER.to_string(), - }), - open_seconds_total: duration.as_secs_f64(), - shard: Some(opened.shard.clone()), - timestamp: Some(proto_flow::as_timestamp(started_at)), - txn_count: 1, - }; - - // Now send the delegate's Flushed response extended with accumulated stats. - co.yield_(response.with_internal(|internal| { - internal.flushed = Some(derive_response_ext::Flushed { stats: Some(stats) }); - })) - .await; - - // If the inferred doc::Shape was updated, log it out for continuous schema inference. - if inferred_shape_changed { - inferred_shape_changed = false; - - let serialized = serde_json::to_value(&doc::shape::schema::to_schema( - inferred_shape.clone(), - )) - .expect("shape serialization should never fail"); - - tracing::info!( - schema = ?::ops::DebugJson(serialized), - collection_name = %opened.shard.name, - "inferred schema updated" - ); - } - - maybe_opened = Some(opened); - } else { - // All other request types are forwarded. - co.yield_(response).await; - } - } - Ok(()) - }) -} - -pub struct Opened { - // Combiner of published documents. - combiner: doc::Combiner, - // JSON pointer to the derived document UUID. - document_uuid_ptr: Option, - // Key components of derived documents. - key_extractors: Vec, - // Partitions to extract when draining the Combiner. - partition_extractors: Vec, - // Document serialization policy. - ser_policy: doc::SerPolicy, - // Shard of this derivation. - shard: ops::ShardRef, - // Ordered transform (transform-name, source-collection). - transforms: Vec<(String, String)>, -} - -impl Opened { - pub fn build(open: request::Open, _opened: &response::Opened) -> anyhow::Result { - let request::Open { - collection, - range, - state_json: _, - version: _, - } = open; - - let CollectionSpec { - ack_template_json: _, - derivation, - key, - name, - partition_fields, - partition_template: _, - projections, - read_schema_json: _, - uuid_ptr: document_uuid_ptr, - write_schema_json, - } = collection.as_ref().context("missing collection")?; - - let collection_spec::Derivation { - connector_type: _, - config_json: _, - transforms, - .. - } = derivation.as_ref().context("missing derivation")?; - - // TODO(johnny): Expose to connector protocol and extract from Open/Opened. - let ser_policy = doc::SerPolicy::default(); - - let range = range.as_ref().context("missing range")?; - - if key.is_empty() { - return Err(anyhow::anyhow!("derived collection key cannot be empty").into()); - } - let key_extractors = extractors::for_key(&key, &projections, &ser_policy)?; - - let document_uuid_ptr = if document_uuid_ptr.is_empty() { - None - } else { - Some(doc::Pointer::from(&document_uuid_ptr)) - }; - - let write_schema_json = doc::validation::build_bundle(&write_schema_json) - .context("collection write_schema_json is not a JSON schema")?; - let validator = doc::Validator::from_schema(write_schema_json) - .context("could not build a schema validator")?; - - let combiner = doc::Combiner::new( - doc::combine::Spec::with_one_binding( - false, // Derivations use partial reductions. - key_extractors.clone(), - None, - validator, - ), - tempfile::tempfile().context("opening temporary spill file")?, - )?; - - // Identify ordered, partitioned projections to extract on combiner drain. - let partition_extractors = - extractors::for_fields(partition_fields, projections, &ser_policy)?; - - let transforms = transforms - .iter() - .map(|transform| { - ( - transform.name.clone(), - transform.collection.as_ref().unwrap().name.clone(), - ) - }) - .collect(); - - let shard = ops::ShardRef { - kind: ops::TaskType::Derivation as i32, - name: name.clone(), - key_begin: format!("{:08x}", range.key_begin), - r_clock_begin: format!("{:08x}", range.r_clock_begin), - }; - - Ok(Self { - combiner, - document_uuid_ptr, - key_extractors, - partition_extractors, - ser_policy, - shard, - transforms, - }) - } - - pub fn combine_right(&mut self, published: &response::Published) -> anyhow::Result<()> { - let memtable = match &mut self.combiner { - doc::Combiner::Accumulator(accumulator) => accumulator.memtable()?, - _ => panic!("implementation error: combiner is draining, not accumulating"), - }; - let alloc = memtable.alloc(); - - let mut deser = serde_json::Deserializer::from_str(&published.doc_json); - let mut doc = doc::HeapNode::from_serde(&mut deser, alloc).with_context(|| { - format!( - "couldn't parse published document as JSON: {}", - &published.doc_json - ) - })?; - - if let Some(ptr) = &self.document_uuid_ptr { - if let Some(node) = ptr.create_heap_node(&mut doc, alloc) { - *node = - doc::HeapNode::String(doc::BumpStr::from_str(crate::UUID_PLACEHOLDER, alloc)); - } - } - memtable.add(0, doc, false)?; - - Ok(()) - } -} - -fn maybe_counts(s: &mut ops::stats::DocsAndBytes) -> Option { - if s.bytes_total != 0 { - Some(std::mem::take(s)) - } else { - None - } -} diff --git a/crates/runtime/src/derive/connector.rs b/crates/runtime/src/derive/connector.rs new file mode 100644 index 0000000000..f7eef73c7d --- /dev/null +++ b/crates/runtime/src/derive/connector.rs @@ -0,0 +1,179 @@ +use crate::{unseal, LogHandler, Runtime}; +use anyhow::Context; +use futures::{channel::mpsc, stream::BoxStream, FutureExt, StreamExt}; +use proto_flow::{ + derive::{Request, Response}, + flow::collection_spec::derivation::ConnectorType, + runtime::DeriveRequestExt, +}; + +// Start a capture connector as indicated by the `initial` Request. +// Returns a pair of Streams for sending Requests and receiving Responses. +pub async fn start( + runtime: &Runtime, + mut initial: Request, +) -> anyhow::Result<( + mpsc::Sender, + BoxStream<'static, anyhow::Result>, +)> { + let (endpoint, log_level, config_json) = extract_endpoint(&mut initial)?; + let (mut connector_tx, connector_rx) = mpsc::channel(crate::CHANNEL_BUFFER); + + // Adjust the dynamic log level for this connector's lifecycle. + if let (Some(log_level), Some(set_log_level)) = (log_level, &runtime.set_log_level) { + (set_log_level)(log_level); + } + + fn attach_container(response: &mut Response, container: crate::image_connector::Container) { + response.set_internal(|internal| { + internal.container = Some(container); + }); + } + + fn start_rpc( + channel: tonic::transport::Channel, + rx: mpsc::Receiver, + ) -> crate::image_connector::StartRpcFuture { + async move { + proto_grpc::derive::connector_client::ConnectorClient::new(channel) + .derive(rx) + .await + } + .boxed() + } + + let connector_rx = match endpoint { + models::DeriveUsing::Connector(models::ConnectorConfig { + image, + config: sealed_config, + }) => { + *config_json = unseal::decrypt_sops(&sealed_config).await?.to_string(); + connector_tx.try_send(initial).unwrap(); + + crate::image_connector::serve( + attach_container, + image, + runtime.log_handler.clone(), + log_level, + &runtime.container_network, + connector_rx, + start_rpc, + &runtime.task_name, + ops::TaskType::Derivation, + ) + .await? + .boxed() + } + models::DeriveUsing::Local(_) if !runtime.allow_local => { + return Err(tonic::Status::failed_precondition( + "Local connectors are not permitted in this context", + ) + .into()); + } + models::DeriveUsing::Local(models::LocalConfig { + command, + config: sealed_config, + env, + protobuf, + }) => { + *config_json = unseal::decrypt_sops(&sealed_config).await?.to_string(); + connector_tx.try_send(initial).unwrap(); + + crate::local_connector::serve( + command, + env, + runtime.log_handler.clone(), + log_level, + protobuf, + connector_rx, + )? + .boxed() + } + models::DeriveUsing::Sqlite(_) => { + connector_tx.try_send(initial).unwrap(); + ::derive_sqlite::connector(connector_rx).boxed() + } + models::DeriveUsing::Typescript(_) => unreachable!(), + }; + + Ok((connector_tx, connector_rx)) +} + +fn extract_endpoint<'r>( + request: &'r mut Request, +) -> anyhow::Result<(models::DeriveUsing, Option, &'r mut String)> { + let log_level = match request.get_internal() { + Ok(DeriveRequestExt { + labels: Some(labels), + .. + }) => Some(labels.log_level()), + _ => None, + }; + + let (connector_type, config_json) = match request { + Request { + spec: Some(spec), .. + } => (spec.connector_type, &mut spec.config_json), + Request { + validate: Some(validate), + .. + } => (validate.connector_type, &mut validate.config_json), + Request { + open: Some(open), .. + } => { + let inner = open + .collection + .as_mut() + .context("`open` missing required `collection`")? + .derivation + .as_mut() + .context("`collection` missing required `derivation`")?; + + (inner.connector_type, &mut inner.config_json) + } + _ => { + return Err(crate::protocol_error( + "request does not contain an endpoint", + request, + )) + } + }; + + if connector_type == ConnectorType::Image as i32 { + Ok(( + models::DeriveUsing::Connector( + serde_json::from_str(config_json).context("parsing connector config")?, + ), + log_level, + config_json, + )) + } else if connector_type == ConnectorType::Local as i32 { + Ok(( + models::DeriveUsing::Local( + serde_json::from_str(config_json).context("parsing local config")?, + ), + log_level, + config_json, + )) + } else if connector_type == ConnectorType::Sqlite as i32 { + Ok(( + models::DeriveUsing::Sqlite( + serde_json::from_str(config_json).context("parsing connector config")?, + ), + log_level, + config_json, + )) + } else if connector_type == ConnectorType::Typescript as i32 { + Ok(( + models::DeriveUsing::Connector(models::ConnectorConfig { + image: "ghcr.io/estuary/derive-typescript:dev".to_string(), + config: models::RawValue::from_str(config_json) + .context("parsing connector config")?, + }), + log_level, + config_json, + )) + } else { + anyhow::bail!("invalid connector type: {connector_type}"); + } +} diff --git a/crates/runtime/src/derive/image.rs b/crates/runtime/src/derive/image.rs deleted file mode 100644 index db66881e37..0000000000 --- a/crates/runtime/src/derive/image.rs +++ /dev/null @@ -1,92 +0,0 @@ -use super::extract_endpoint; -use crate::{ - image_connector::{serve, Container, StartRpcFuture, UnsealFuture, Unsealed}, - unseal, -}; -use futures::{channel::mpsc, FutureExt, Stream}; -use proto_flow::{ - derive::{Request, Response}, - runtime::DeriveRequestExt, -}; - -fn unseal(mut request: Request) -> Result, Request> { - if !matches!( - request, - Request { spec: Some(_), .. } - | Request { - validate: Some(_), - .. - } - | Request { open: Some(_), .. } - ) { - return Err(request); // Not an unseal-able request. - }; - - Ok(async move { - let (endpoint, config_json) = extract_endpoint(&mut request)?; - - let models::DeriveUsing::Connector(models::ConnectorConfig { - image, - config: sealed_config, - }) = endpoint - else { - anyhow::bail!("task connector type has changed and is no longer an image") - }; - *config_json = unseal::decrypt_sops(&sealed_config).await?.to_string(); - - let log_level = match request.get_internal() { - Ok(DeriveRequestExt { - labels: Some(labels), - .. - }) => Some(labels.log_level()), - _ => None, - }; - - Ok(Unsealed { - image, - log_level, - request, - }) - } - .boxed()) -} - -fn start_rpc( - channel: tonic::transport::Channel, - rx: mpsc::Receiver, -) -> StartRpcFuture { - async move { - proto_grpc::derive::connector_client::ConnectorClient::new(channel) - .derive(rx) - .await - } - .boxed() -} - -fn attach_container(response: &mut Response, container: Container) { - response.set_internal(|internal| { - internal.container = Some(container); - }); -} - -pub fn connector( - log_handler: L, - network: &str, - request_rx: R, - task_name: &str, -) -> impl Stream> -where - L: Fn(&ops::Log) + Clone + Send + Sync + 'static, - R: Stream> + Send + 'static, -{ - serve( - attach_container, - log_handler, - network, - request_rx, - start_rpc, - task_name, - ops::TaskType::Derivation, - unseal, - ) -} diff --git a/crates/runtime/src/derive/local.rs b/crates/runtime/src/derive/local.rs deleted file mode 100644 index e5554ffdd0..0000000000 --- a/crates/runtime/src/derive/local.rs +++ /dev/null @@ -1,67 +0,0 @@ -use super::extract_endpoint; -use crate::{ - local_connector::{serve, UnsealFuture, Unsealed}, - unseal, -}; -use futures::{FutureExt, Stream}; -use proto_flow::{ - derive::{Request, Response}, - runtime::DeriveRequestExt, -}; - -fn unseal(mut request: Request) -> Result, Request> { - if !matches!( - request, - Request { spec: Some(_), .. } - | Request { - validate: Some(_), - .. - } - | Request { open: Some(_), .. } - ) { - return Err(request); // Not an unseal-able request. - }; - - Ok(async move { - let (endpoint, config_json) = extract_endpoint(&mut request)?; - - let models::DeriveUsing::Local(models::LocalConfig { - command, - config: sealed_config, - env, - protobuf, - }) = endpoint - else { - anyhow::bail!("task connector type has changed and is no longer an image") - }; - *config_json = unseal::decrypt_sops(&sealed_config).await?.to_string(); - - let log_level = match request.get_internal() { - Ok(DeriveRequestExt { - labels: Some(labels), - .. - }) => Some(labels.log_level()), - _ => None, - }; - - Ok(Unsealed { - command, - env, - log_level, - protobuf, - request, - }) - } - .boxed()) -} - -pub fn connector( - log_handler: L, - request_rx: R, -) -> impl Stream> -where - L: Fn(&ops::Log) + Clone + Send + Sync + 'static, - R: Stream> + Send + 'static, -{ - serve(log_handler, request_rx, unseal) -} diff --git a/crates/runtime/src/derive/mod.rs b/crates/runtime/src/derive/mod.rs index 43e93dfc5b..80a1193d45 100644 --- a/crates/runtime/src/derive/mod.rs +++ b/crates/runtime/src/derive/mod.rs @@ -1,238 +1,65 @@ -use crate::Runtime; -use anyhow::Context; -use futures::{Stream, StreamExt, TryStreamExt}; -use proto_flow::derive::{request, Request, Response}; -use proto_flow::flow::collection_spec::derivation::ConnectorType; -use proto_flow::ops; -use proto_flow::runtime::DeriveRequestExt; -use std::pin::Pin; -use std::sync::Arc; - -mod combine; -mod image; -mod local; -mod rocksdb; - -#[tonic::async_trait] -impl proto_grpc::derive::connector_server::Connector for Runtime -where - H: Fn(&ops::Log) + Send + Sync + Clone + 'static, -{ - type DeriveStream = futures::stream::BoxStream<'static, tonic::Result>; - - async fn derive( - &self, - request: tonic::Request>, - ) -> tonic::Result> { - let conn_info = request - .extensions() - .get::(); - tracing::debug!(?request, ?conn_info, "started derive request"); - - let request_rx = crate::stream_status_to_error(request.into_inner()); - - let response_rx = self - .clone() - .serve_derive(request_rx) - .await - .map_err(crate::anyhow_to_status)?; - - Ok(tonic::Response::new( - crate::stream_error_to_status(response_rx).boxed(), - )) - } +use ::ops::stats::DocsAndBytes; +use futures::Stream; +use proto_flow::derive::{Request, Response}; +use proto_gazette::consumer; +use std::collections::BTreeMap; + +mod connector; +mod protocol; +mod serve; +mod task; + +pub trait RequestStream: Stream> + Send + Unpin + 'static {} +impl> + Send + Unpin + 'static> RequestStream for T {} + +pub trait ResponseStream: Stream> + Send + 'static {} +impl> + Send + 'static> ResponseStream for T {} + +// Task definition for a capture. +pub struct Task { + // Target collection. + collection_name: String, + // JSON pointer at which document UUIDs are added. + document_uuid_ptr: doc::Pointer, + // Key components which are extracted from written documents. + key_extractors: Vec, + // Partition values which are extracted from written documents. + partition_extractors: Vec, + // Serialization policy for the Target collection. + ser_policy: doc::SerPolicy, + // ShardRef of this task. + shard_ref: ops::ShardRef, + // Transforms of this task. + transforms: Vec, } -impl Runtime -where - H: Fn(&ops::Log) + Send + Sync + Clone + 'static, -{ - pub async fn serve_derive( - self, - request_rx: In, - ) -> anyhow::Result> + Send> - where - In: Stream> + Send + Unpin + 'static, - { - let mut request_rx = request_rx.peekable(); - - let mut peek_request = match Pin::new(&mut request_rx).peek().await { - Some(Ok(peek)) => peek.clone(), - Some(Err(_status)) => return Err(request_rx.try_next().await.unwrap_err()), - None => return Ok(futures::stream::empty().boxed()), - }; - let (endpoint, _) = extract_endpoint(&mut peek_request).map_err(crate::anyhow_to_status)?; - - // NOTE(johnny): To debug requests / responses at any layer of this interceptor stack, try: - // let request_rx = request_rx.inspect_ok(|request| { - // eprintln!("REQUEST: {}", serde_json::to_string(request).unwrap()); - // }); - // - // let response_rx = response_rx.inspect_ok(|response| { - // eprintln!("RESPONSE: {}", serde_json::to_string(response).unwrap()); - // }); - - // Request interceptor which adjusts the dynamic log level with each Open. - let request_rx = adjust_log_level(request_rx, self.set_log_level); - - // Request interceptor which filters Request.Read of Ack documents. - let request_rx = request_rx.try_filter(|request| { - let keep = if let Some(request::Read { - uuid: Some(uuid), .. - }) = &request.read - { - proto_gazette::message_flags::ACK_TXN & uuid.node == 0 // Not an ACK. - } else { - true - }; - futures::future::ready(keep) - }); - - // Request interceptor for combining over documents. - let (request_rx, combine_back) = - combine::adapt_requests(&peek_request, request_rx).map_err(crate::anyhow_to_status)?; - - let response_rx = match endpoint { - models::DeriveUsing::Connector(_) => { - // Request interceptor for stateful RocksDB storage. - let (request_rx, rocks_back) = rocksdb::adapt_requests(&peek_request, request_rx) - .map_err(crate::anyhow_to_status)?; - - // Invoke the underlying image connector. - let response_rx = image::connector( - self.log_handler, - &self.container_network, - request_rx, - &self.task_name, - ); - - // Response interceptor for stateful RocksDB storage. - let response_rx = rocksdb::adapt_responses(rocks_back, response_rx); - // Response interceptor for combining over documents. - let response_rx = combine::adapt_responses(combine_back, response_rx); - - response_rx.boxed() - } - models::DeriveUsing::Local(_) if !self.allow_local => { - Err(tonic::Status::failed_precondition( - "Local connectors are not permitted in this context", - ))? - } - models::DeriveUsing::Local(_) => { - // Request interceptor for stateful RocksDB storage. - let (request_rx, rocks_back) = rocksdb::adapt_requests(&peek_request, request_rx) - .map_err(crate::anyhow_to_status)?; - - // Invoke the underlying local connector. - let response_rx = local::connector(self.log_handler, request_rx); - - // Response interceptor for stateful RocksDB storage. - let response_rx = rocksdb::adapt_responses(rocks_back, response_rx); - // Response interceptor for combining over documents. - let response_rx = combine::adapt_responses(combine_back, response_rx); - - response_rx.boxed() - } - models::DeriveUsing::Sqlite(_) => { - // Invoke the underlying SQLite connector. - let response_rx = ::derive_sqlite::connector(&peek_request, request_rx)?; - - // Response interceptor for combining over documents. - let response_rx = combine::adapt_responses(combine_back, response_rx); - - response_rx.boxed() - } - models::DeriveUsing::Typescript(_) => unreachable!(), - }; - - Ok(response_rx) - } +// Transform definition for a derivation. +struct Transform { + collection_name: String, // Source collection. + name: String, // Name of this Transform. } -fn adjust_log_level( - request_rx: R, - set_log_level: Option>, -) -> impl Stream> -where - R: Stream> + Send + 'static, -{ - request_rx.inspect_ok(move |request| { - let Ok(DeriveRequestExt { - labels: Some(ops::ShardLabeling { log_level, .. }), - .. - }) = request.get_internal() - else { - return; - }; - - if let (Some(log_level), Some(set_log_level)) = - (ops::log::Level::from_i32(log_level), &set_log_level) - { - (set_log_level)(log_level); - } - }) +// Transaction of captured documents, checkpoints, and associated stats. +pub struct Transaction { + checkpoint: consumer::Checkpoint, // Recorded checkpoint. + combined_stats: DocsAndBytes, // Combined output stats. + max_clock: u64, // Maximum clock of read documents. + publish_stats: DocsAndBytes, // Published (right) stats. + read_stats: BTreeMap, // Per-transform read document stats. + started_at: std::time::SystemTime, // Time of first Read request. + updated_inference: bool, // Did we update our inferred Shape this transaction? } -// Returns the DeriveUsing of this Request, and a mutable reference to its inner config_json. -fn extract_endpoint<'r>( - request: &'r mut Request, -) -> anyhow::Result<(models::DeriveUsing, &'r mut String)> { - let (connector_type, config_json) = match request { - Request { - spec: Some(spec), .. - } => (spec.connector_type, &mut spec.config_json), - Request { - validate: Some(validate), - .. - } => (validate.connector_type, &mut validate.config_json), - Request { - open: Some(open), .. - } => { - let inner = open - .collection - .as_mut() - .context("`open` missing required `collection`")? - .derivation - .as_mut() - .context("`collection` missing required `derivation`")?; - - (inner.connector_type, &mut inner.config_json) +impl Transaction { + pub fn new() -> Self { + Self { + checkpoint: Default::default(), + combined_stats: Default::default(), + max_clock: 0, + publish_stats: Default::default(), + read_stats: BTreeMap::new(), + started_at: std::time::SystemTime::UNIX_EPOCH, + updated_inference: false, } - - _ => anyhow::bail!("request {request:?} does not contain an endpoint"), - }; - - if connector_type == ConnectorType::Image as i32 { - Ok(( - models::DeriveUsing::Connector( - serde_json::from_str(config_json).context("parsing connector config")?, - ), - config_json, - )) - } else if connector_type == ConnectorType::Local as i32 { - Ok(( - models::DeriveUsing::Local( - serde_json::from_str(config_json).context("parsing local config")?, - ), - config_json, - )) - } else if connector_type == ConnectorType::Sqlite as i32 { - Ok(( - models::DeriveUsing::Sqlite( - serde_json::from_str(config_json).context("parsing connector config")?, - ), - config_json, - )) - } else if connector_type == ConnectorType::Typescript as i32 { - Ok(( - models::DeriveUsing::Connector(models::ConnectorConfig { - image: "ghcr.io/estuary/derive-typescript:dev".to_string(), - config: models::RawValue::from_str(config_json) - .context("parsing connector config")?, - }), - config_json, - )) - } else { - anyhow::bail!("invalid connector type: {connector_type}"); } } diff --git a/crates/runtime/src/derive/protocol.rs b/crates/runtime/src/derive/protocol.rs new file mode 100644 index 0000000000..caa5855006 --- /dev/null +++ b/crates/runtime/src/derive/protocol.rs @@ -0,0 +1,352 @@ +use super::{Task, Transaction}; +use crate::task_state::RocksDB; +use anyhow::Context; +use prost::Message; +use proto_flow::derive::{request, response, Request, Response}; +use proto_flow::flow; +use proto_flow::runtime::derive_response_ext; +use proto_gazette::consumer; +use std::collections::BTreeMap; + +pub fn recv_client_first_open(open: &Request) -> anyhow::Result { + let db = RocksDB::open(open.get_internal()?.open.and_then(|o| o.rocksdb_descriptor))?; + + Ok(db) +} + +pub fn recv_client_open(open: &mut Request, db: &RocksDB) -> anyhow::Result<()> { + if let Some(state) = db.load_connector_state()? { + let open = open.open.as_mut().unwrap(); + open.state_json = state; + tracing::debug!(state=%open.state_json, "loaded and attached a persisted connector Open.state_json"); + } else { + tracing::debug!("no previously-persisted connector state was found"); + } + Ok(()) +} + +pub fn recv_connector_opened( + db: &RocksDB, + open: &Request, + opened: Option, +) -> anyhow::Result<( + Task, + doc::combine::Accumulator, + consumer::Checkpoint, + Response, +)> { + let Some(mut opened) = opened else { + anyhow::bail!("unexpected connector EOF reading Opened") + }; + + let task = Task::new(&open, &opened)?; + let accumulator = doc::combine::Accumulator::new(task.combine_spec()?, tempfile::tempfile()?)?; + + let mut checkpoint = db + .load_checkpoint() + .context("failed to load runtime checkpoint from RocksDB")?; + + // TODO(johnny): Expose Opened.runtime_checkpoint in the public protocol. + opened.set_internal(|internal| { + if let Some(derive_response_ext::Opened { + runtime_checkpoint: Some(connector_checkpoint), + }) = &internal.opened + { + checkpoint = connector_checkpoint.clone(); + tracing::debug!( + ?checkpoint, + "using connector-provided OpenedExt.runtime_checkpoint", + ); + } else { + internal.opened = Some(derive_response_ext::Opened { + runtime_checkpoint: Some(checkpoint.clone()), + }); + tracing::debug!( + ?checkpoint, + "loaded and attached a persisted OpenedExt.runtime_checkpoint", + ); + } + }); + + Ok((task, accumulator, checkpoint, opened)) +} + +pub fn recv_client_read_or_flush( + request: Option, + txn: &mut Transaction, +) -> anyhow::Result> { + match request { + None => anyhow::bail!("unexpected client EOF while reading Read or Flush"), + Some(Request { + read: Some(read), .. + }) => { + if let Some(flow::UuidParts { clock, node }) = &read.uuid { + // Filter out message acknowledgements. + if proto_gazette::message_flags::ACK_TXN & node != 0 { + return Ok(None); + } + // Track the largest document clock that we've observed. + if *clock > txn.max_clock { + txn.max_clock = *clock; + } + } + + // Accumulate metrics over reads for our transforms. + let read_stats = &mut txn.read_stats.entry(read.transform).or_default(); + read_stats.docs_total += 1; + read_stats.bytes_total += read.doc_json.len() as u64; + + Ok(Some(( + Request { + read: Some(read), + ..Default::default() + }, + false, + ))) + } + Some(Request { + flush: Some(_flush), + .. + }) => Ok(Some(( + Request { + flush: Some(request::Flush {}), + ..Default::default() + }, + true, + ))), + Some(request) => Err(crate::protocol_error( + "expected client Read or Flush", + request, + )), + } +} + +pub fn recv_connector_published_or_flushed( + accumulator: &mut doc::combine::Accumulator, + response: Option, + saw_flush: bool, + task: &Task, + txn: &mut Transaction, +) -> anyhow::Result { + let response::Published { doc_json } = match response { + None => anyhow::bail!("unexpected connector EOF while reading Published or Flushed"), + Some(Response { + flushed: Some(response::Flushed {}), + .. + }) if saw_flush => return Ok(true), + Some(Response { + flushed: Some(_), .. + }) => { + anyhow::bail!("connector sent Flushed before receiving Flush") + } + Some(Response { + published: Some(published), + .. + }) => published, + response => { + return Err(crate::protocol_error( + "expected Published or Flushed", + response, + )) + } + }; + + let memtable = accumulator.memtable()?; + let alloc = memtable.alloc(); + + let mut doc = memtable + .parse_json_str(&doc_json) + .context("couldn't parse captured document as JSON")?; + + let uuid_ptr = &task.document_uuid_ptr; + + if let Some(node) = uuid_ptr.create_heap_node(&mut doc, alloc) { + *node = doc::HeapNode::String(doc::BumpStr::from_str(crate::UUID_PLACEHOLDER, alloc)); + } + memtable.add(0, doc, false)?; + + txn.publish_stats.docs_total += 1; + txn.publish_stats.bytes_total += doc_json.len() as u64; + + Ok(false) +} + +pub fn send_client_published( + buf: &mut bytes::BytesMut, + drained: doc::combine::DrainedDoc, + shape: &mut doc::Shape, + task: &Task, + txn: &mut Transaction, +) -> Response { + let doc::combine::DrainedDoc { meta: _, root } = drained; + + let key_packed = doc::Extractor::extract_all_owned(&root, &task.key_extractors, buf); + let partitions_packed = + doc::Extractor::extract_all_owned(&root, &task.partition_extractors, buf); + let doc_json = serde_json::to_string(&task.ser_policy.on_owned(&root)) + .expect("document serialization cannot fail"); + + txn.publish_stats.docs_total += 1; + txn.publish_stats.bytes_total += doc_json.len() as u64; + + if shape.widen_owned(&root) { + doc::shape::limits::enforce_shape_complexity_limit( + shape, + doc::shape::limits::DEFAULT_SCHEMA_COMPLEXITY_LIMIT, + ); + } + + Response { + published: Some(response::Published { doc_json }), + ..Default::default() + } + .with_internal_buf(buf, |internal| { + internal.published = Some(derive_response_ext::Published { + key_packed, + max_clock: txn.max_clock, + partitions_packed, + }); + }) +} + +pub fn send_client_flushed(buf: &mut bytes::BytesMut, task: &Task, txn: &Transaction) -> Response { + let transforms: BTreeMap<_, _> = txn + .read_stats + .iter() + .map(|(index, read_stats)| { + ( + task.transforms[*index as usize].name.clone(), + ops::stats::derive::Transform { + input: Some(read_stats.clone()), + source: task.transforms[*index as usize].collection_name.clone(), + }, + ) + }) + .collect(); + + let stats = ops::Stats { + capture: Default::default(), + derive: Some(ops::stats::Derive { + transforms, + published: Some(txn.publish_stats.clone()), + out: Some(txn.combined_stats.clone()), + }), + interval: None, + materialize: Default::default(), + meta: Some(ops::Meta { + uuid: crate::UUID_PLACEHOLDER.to_string(), + }), + open_seconds_total: txn.started_at.elapsed().unwrap().as_secs_f64(), + shard: Some(task.shard_ref.clone()), + timestamp: Some(proto_flow::as_timestamp(txn.started_at)), + txn_count: 1, + }; + + Response { + flushed: Some(response::Flushed {}), + ..Default::default() + } + .with_internal_buf(buf, |internal| { + internal.flushed = Some(derive_response_ext::Flushed { stats: Some(stats) }); + }) +} + +pub fn recv_client_start_commit( + last_checkpoint: consumer::Checkpoint, + request: Option, + txn: &mut Transaction, +) -> anyhow::Result<(Request, rocksdb::WriteBatch)> { + let Some(request) = request else { + anyhow::bail!("unexpected EOF while reading StartCommit"); + }; + + let Request { + start_commit: + Some(request::StartCommit { + runtime_checkpoint: Some(runtime_checkpoint), + }), + .. + } = &request + else { + return Err(crate::protocol_error( + "expected StartCommit with runtime_checkpoint", + request, + )); + }; + + // TODO(johnny): Diff the previous and current checkpoint to build a + // merge-able, incremental update that's written to the WriteBatch. + let _last_checkpoint = last_checkpoint; + + let mut wb = rocksdb::WriteBatch::default(); + + tracing::debug!( + ?runtime_checkpoint, + "persisting StartCommit.runtime_checkpoint", + ); + wb.put(RocksDB::CHECKPOINT_KEY, runtime_checkpoint.encode_to_vec()); + + txn.checkpoint = runtime_checkpoint.clone(); + + Ok((request, wb)) +} + +pub fn recv_connector_started_commit( + db: &RocksDB, + response: Option, + shape: &doc::Shape, + task: &Task, + txn: &Transaction, + mut wb: rocksdb::WriteBatch, +) -> anyhow::Result { + let Some(response) = response else { + anyhow::bail!("unexpected connector EOF while reading StartedCommit"); + }; + + let Response { + started_commit: Some(response::StartedCommit { state }), + .. + } = &response + else { + return Err(crate::protocol_error( + "expected StartedCommit with runtime_checkpoint", + response, + )); + }; + + if let Some(flow::ConnectorState { + merge_patch, + updated_json, + }) = state + { + let updated: models::RawValue = serde_json::from_str(updated_json) + .context("failed to decode connector state as JSON")?; + + if !*merge_patch { + wb.merge(RocksDB::CONNECTOR_STATE_KEY, "null"); + } + wb.merge(RocksDB::CONNECTOR_STATE_KEY, updated.get()); + + tracing::debug!(updated=?ops::DebugJson(updated), %merge_patch, "persisted an updated StartedCommit.state"); + } + + // We're about to write out our write batch which, when written to the + // recovery log, irrevocably commits our transaction. Before doing so, + // produce a structured log if our inferred schema changed in this + // transaction. + if txn.updated_inference { + let serialized = serde_json::to_value(&doc::shape::schema::to_schema(shape.clone())) + .expect("shape serialization should never fail"); + + tracing::info!( + schema = ?::ops::DebugJson(serialized), + collection_name = %task.collection_name, + "inferred schema updated" + ); + } + + db.write(wb) + .context("failed to write atomic RocksDB commit")?; + + Ok(response) +} diff --git a/crates/runtime/src/derive/rocksdb.rs b/crates/runtime/src/derive/rocksdb.rs deleted file mode 100644 index a76fba3e84..0000000000 --- a/crates/runtime/src/derive/rocksdb.rs +++ /dev/null @@ -1,261 +0,0 @@ -use anyhow::Context; -use futures::SinkExt; -use futures::{channel::mpsc, Stream, StreamExt, TryStreamExt}; -use prost::Message; -use proto_flow::derive::{request, Request, Response}; -use proto_flow::flow; -use proto_flow::runtime::{derive_response_ext, RocksDbDescriptor}; -use proto_gazette::consumer::Checkpoint; -use std::sync::Arc; - -pub fn adapt_requests( - peek_request: &Request, - request_rx: R, -) -> anyhow::Result<(impl Stream>, ResponseArgs)> -where - R: Stream>, -{ - // Open RocksDB based on the request::Open internal descriptor. - let db = Arc::new(RocksDB::open( - peek_request - .get_internal()? - .open - .and_then(|o| o.rocksdb_descriptor), - )?); - let response_db = db.clone(); - - // Channel for passing a StartCommit checkpoint to the response stream. - let (mut start_commit_tx, start_commit_rx) = mpsc::channel(1); - - let request_rx = coroutines::try_coroutine(move |mut co| async move { - let mut request_rx = std::pin::pin!(request_rx); - - while let Some(mut request) = request_rx.try_next().await? { - if let Some(open) = &mut request.open { - // If found, decode and attach to `open`. - if let Some(state) = db.load_connector_state()? { - open.state_json = state.to_string(); - tracing::debug!(state=%open.state_json, "loaded and attached a persisted connector Open.state_json"); - } else { - tracing::debug!("no previously-persisted connector state was found"); - } - } else if let Some(start_commit) = &request.start_commit { - // Notify response loop of a pending StartCommit checkpoint. - start_commit_tx - .feed(start_commit.clone()) - .await - .context("failed to send request::StartCommit to response stream")?; - } - - co.yield_(request).await; - } - Ok(()) - }); - - Ok(( - request_rx, - ResponseArgs { - start_commit_rx, - db: response_db, - }, - )) -} - -pub struct ResponseArgs { - start_commit_rx: mpsc::Receiver, - db: Arc, -} - -pub fn adapt_responses( - args: ResponseArgs, - response_rx: R, -) -> impl Stream> -where - R: Stream>, -{ - let ResponseArgs { - mut start_commit_rx, - db, - } = args; - - coroutines::try_coroutine(move |mut co| async move { - let mut response_rx = std::pin::pin!(response_rx); - - while let Some(mut response) = response_rx.try_next().await? { - if let Some(_opened) = &response.opened { - // Load and attach the last consumer checkpoint. - let runtime_checkpoint = db - .load_checkpoint() - .context("failed to load runtime checkpoint from RocksDB")?; - - tracing::debug!( - ?runtime_checkpoint, - "loaded and attached a persisted OpenedExt.runtime_checkpoint", - ); - - response.set_internal(|internal| { - internal.opened = Some(derive_response_ext::Opened { - runtime_checkpoint: Some(runtime_checkpoint), - }); - }); - } else if let Some(started_commit) = &response.started_commit { - let mut batch = rocksdb::WriteBatch::default(); - - let start_commit = start_commit_rx - .next() - .await - .context("failed to receive request::StartCommit from request loop")?; - - let runtime_checkpoint = start_commit - .runtime_checkpoint - .context("StartCommit without runtime checkpoint")?; - - tracing::debug!( - ?runtime_checkpoint, - "persisting StartCommit.runtime_checkpoint", - ); - batch.put(RocksDB::CHECKPOINT_KEY, &runtime_checkpoint.encode_to_vec()); - - // And add the connector checkpoint. - if let Some(flow::ConnectorState { - merge_patch, - updated_json, - }) = &started_commit.state - { - let mut updated: serde_json::Value = serde_json::from_str(updated_json) - .context("failed to decode connector state as JSON")?; - - if *merge_patch { - if let Some(mut previous) = db.load_connector_state()? { - json_patch::merge(&mut previous, &updated); - updated = previous; - } - } - - tracing::debug!(%updated, %merge_patch, "persisting an updated StartedCommit.state"); - batch.put(RocksDB::CONNECTOR_STATE_KEY, &updated.to_string()); - } - - db.write(batch) - .context("failed to write atomic RocksDB commit")?; - } - co.yield_(response).await; - } - Ok(()) - }) -} - -struct RocksDB { - db: rocksdb::DB, - _path: std::path::PathBuf, - _tmp: Option, -} - -impl std::ops::Deref for RocksDB { - type Target = rocksdb::DB; - - fn deref(&self) -> &Self::Target { - &self.db - } -} - -impl RocksDB { - pub fn open(desc: Option) -> anyhow::Result { - let (mut opts, path, _tmp) = match desc { - Some(RocksDbDescriptor { - rocksdb_path, - rocksdb_env_memptr, - }) => { - tracing::debug!( - ?rocksdb_path, - ?rocksdb_env_memptr, - "opening hooked RocksDB database" - ); - - // Re-hydrate the provided memory address into rocksdb::Env wrapping - // an owned *mut librocksdb_sys::rocksdb_env_t. - let env = unsafe { - rocksdb::Env::from_raw(rocksdb_env_memptr as *mut librocksdb_sys::rocksdb_env_t) - }; - - let mut opts = rocksdb::Options::default(); - opts.set_env(&env); - - (opts, std::path::PathBuf::from(rocksdb_path), None) - } - _ => { - let dir = tempfile::TempDir::new().unwrap(); - let opts = rocksdb::Options::default(); - - tracing::debug!( - rocksdb_path = ?dir.path(), - "opening temporary RocksDB database" - ); - - (opts, dir.path().to_owned(), Some(dir)) - } - }; - - opts.create_if_missing(true); - opts.create_missing_column_families(true); - - let column_families = match rocksdb::DB::list_cf(&opts, &path) { - Ok(cf) => cf, - // Listing column families will fail if the DB doesn't exist. - // Assume as such, as we'll otherwise fail when we attempt to open. - Err(_) => vec![rocksdb::DEFAULT_COLUMN_FAMILY_NAME.to_string()], - }; - let mut db = rocksdb::DB::open_cf(&opts, &path, column_families.iter()) - .context("failed to open RocksDB")?; - - for column_family in column_families { - // We used to use a `registers` column family for derivations, but we no longer do - // and they were never actually used in production. Rocks requires that all existing - // column families are opened, so we just open and drop any of these legacy "registers" - // column families. - if column_family == "registers" { - tracing::warn!(%column_family, "dropping legacy rocksdb column family"); - db.drop_cf(&column_family) - .context("dropping legacy column family")?; - } - } - - Ok(Self { - db, - _path: path, - _tmp, - }) - } - - pub fn load_checkpoint(&self) -> anyhow::Result { - match self.db.get_pinned(Self::CHECKPOINT_KEY)? { - Some(v) => { - Ok(Checkpoint::decode(v.as_ref()) - .context("failed to decode consumer checkpoint")?) - } - None => Ok(Checkpoint::default()), - } - } - - pub fn load_connector_state(&self) -> anyhow::Result> { - let state = self - .db - .get_pinned(Self::CONNECTOR_STATE_KEY) - .context("failed to load connector state")?; - - // If found, decode and attach to `open`. - if let Some(state) = state { - let state: serde_json::Value = - serde_json::from_slice(&state).context("failed to decode connector state")?; - - Ok(Some(state)) - } else { - Ok(None) - } - } - - // Key encoding under which a marshalled checkpoint is stored. - pub const CHECKPOINT_KEY: &[u8] = b"checkpoint"; - // Key encoding under which a connector state is stored. - pub const CONNECTOR_STATE_KEY: &[u8] = b"connector-state"; -} diff --git a/crates/runtime/src/derive/serve.rs b/crates/runtime/src/derive/serve.rs new file mode 100644 index 0000000000..cc90fb747a --- /dev/null +++ b/crates/runtime/src/derive/serve.rs @@ -0,0 +1,222 @@ +use super::{connector, protocol::*, RequestStream, ResponseStream, Transaction}; +use crate::task_state::RocksDB; +use crate::{LogHandler, Runtime}; +use anyhow::Context; +use futures::channel::mpsc; +use futures::stream::BoxStream; +use futures::{SinkExt, StreamExt, TryStreamExt}; +use proto_flow::derive::{Request, Response}; + +#[tonic::async_trait] +impl proto_grpc::derive::connector_server::Connector for Runtime { + type DeriveStream = futures::stream::BoxStream<'static, tonic::Result>; + + async fn derive( + &self, + request: tonic::Request>, + ) -> tonic::Result> { + let conn_info = request + .extensions() + .get::(); + tracing::debug!(?request, ?conn_info, "started capture request"); + + let request_rx = crate::stream_status_to_error(request.into_inner()); + let response_rx = crate::stream_error_to_status(self.clone().serve_derive(request_rx)); + + Ok(tonic::Response::new(response_rx.boxed())) + } +} + +impl Runtime { + pub fn serve_derive(self, mut request_rx: impl RequestStream) -> impl ResponseStream { + coroutines::try_coroutine(move |mut co| async move { + let Some(mut open) = serve_unary(&self, &mut request_rx, &mut co).await? else { + return Ok::<(), anyhow::Error>(()); + }; + + let db = recv_client_first_open(&open)?; + let mut shape = doc::Shape::nothing(); + + loop { + let Some(next) = + serve_session(&mut co, &db, open, &mut request_rx, &self, &mut shape).await? + else { + return Ok(()); + }; + + open = next; + } + }) + } +} + +async fn serve_unary( + runtime: &Runtime, + request_rx: &mut impl RequestStream, + co: &mut coroutines::Suspend, +) -> anyhow::Result> { + while let Some(request) = request_rx.try_next().await? { + if request.open.is_some() { + return Ok(Some(request)); + } + let (connector_tx, mut connector_rx) = connector::start(runtime, request).await?; + std::mem::drop(connector_tx); // Send EOF. + + while let Some(response) = connector_rx.next().await { + () = co.yield_(response?).await; + } + } + Ok(None) +} + +async fn serve_session( + co: &mut coroutines::Suspend, + db: &RocksDB, + mut open: Request, + request_rx: &mut impl RequestStream, + runtime: &Runtime, + shape: &mut doc::Shape, +) -> anyhow::Result> { + recv_client_open(&mut open, &db)?; + + // Start connector stream and read Opened. + let (mut connector_tx, mut connector_rx) = connector::start(runtime, open.clone()).await?; + let opened = TryStreamExt::try_next(&mut connector_rx).await?; + + let (task, mut accumulator, mut last_checkpoint, opened) = + recv_connector_opened(&db, &open, opened)?; + + () = co.yield_(opened).await; + + // Loop over transactions. + loop { + // Loop over EOF, Reset, and Open until an initial Read or Flush. + let initial: Request = loop { + match request_rx.try_next().await? { + None => { + drain_connector(connector_tx, connector_rx).await?; + return Ok(None); + } + Some(reset @ Request { reset: Some(_), .. }) => { + let _: Result<(), mpsc::SendError> = connector_tx.feed(reset).await; + } + Some(open @ Request { open: Some(_), .. }) => { + drain_connector(connector_tx, connector_rx).await?; + return Ok(Some(open)); + } + Some(read @ Request { read: Some(_), .. }) => break read, + Some(flush @ Request { flush: Some(_), .. }) => break flush, + request => { + return Err(crate::protocol_error( + "expected client EOF, Reset, Open, or Read", + request, + )) + } + } + }; + + let mut txn = Transaction::new(); + txn.started_at = std::time::SystemTime::now(); + + enum Step { + ClientRx(Option), + ConnectorRx(Option), + ConnectorTx(Result<(), mpsc::SendError>), + } + let mut saw_flush = false; + let mut saw_flushed = false; + let mut saw_reset = false; + let mut send_fut = None; + let mut step = Step::ClientRx(Some(initial)); + + // Loop over client requests and connector responses until the transaction has flushed. + while !saw_flush || !saw_flushed { + match step { + Step::ClientRx(request) => { + if let Some((send, is_flush)) = recv_client_read_or_flush(request, &mut txn)? { + saw_flush = is_flush; + send_fut = Some(connector_tx.feed(send)); + } + } + Step::ConnectorRx(response) => { + saw_flushed = recv_connector_published_or_flushed( + &mut accumulator, + response, + saw_flush, + &task, + &mut txn, + )?; + } + Step::ConnectorTx(result) => { + if let Err(_send_err) = result { + saw_reset = true; // `connector_rx` will likely have an error. + } + send_fut = None; + } + } + + step = if let Some(forward) = &mut send_fut { + tokio::select! { + result = forward => Step::ConnectorTx(result), + response = connector_rx.try_next() => Step::ConnectorRx(response?), + } + } else { + tokio::select! { + request = request_rx.try_next(), if !saw_flush => Step::ClientRx(request?), + response = connector_rx.try_next() => Step::ConnectorRx(response?), + } + }; + } + + if saw_reset { + anyhow::bail!( + "connector reset its connection unexpectedly but sent Flushed without an error" + ); + } + + // Prepare to drain `accumulator`. + let mut drainer = accumulator + .into_drainer() + .context("preparing to drain combiner")?; + let mut buf = bytes::BytesMut::new(); + + while let Some(drained) = drainer.drain_next()? { + let published = send_client_published(&mut buf, drained, shape, &task, &mut txn); + () = co.yield_(published).await; + } + () = co.yield_(send_client_flushed(&mut buf, &task, &txn)).await; + + // Read StartCommit and forward to the connector. + let start_commit = request_rx.try_next().await?; + let (start_commit, wb) = recv_client_start_commit(last_checkpoint, start_commit, &mut txn)?; + connector_tx + .try_send(start_commit) + .expect("sender is empty"); + + // Read StartedCommit and forward to the client. + let started_commit = connector_rx.try_next().await?; + let started_commit = + recv_connector_started_commit(&db, started_commit, shape, &task, &txn, wb)?; + () = co.yield_(started_commit).await; + + last_checkpoint = txn.checkpoint; + accumulator = drainer.into_new_accumulator()?; + } +} + +async fn drain_connector( + tx: mpsc::Sender, + mut rx: BoxStream<'static, anyhow::Result>, +) -> anyhow::Result<()> { + std::mem::drop(tx); + + match rx.try_next().await? { + Some(response) => { + return Err(crate::protocol_error( + "unexpected connector response during graceful closure", + response, + )) + } + None => return Ok(()), + } +} diff --git a/crates/runtime/src/derive/task.rs b/crates/runtime/src/derive/task.rs new file mode 100644 index 0000000000..6fd4d48064 --- /dev/null +++ b/crates/runtime/src/derive/task.rs @@ -0,0 +1,118 @@ +use super::{Task, Transform}; +use anyhow::Context; +use proto_flow::derive::{request, response, Request, Response}; +use proto_flow::flow; + +impl Task { + pub fn new(open: &Request, opened: &Response) -> anyhow::Result { + let request::Open { + collection, + range, + state_json: _, + version: _, + } = open.clone().open.context("expected Open")?; + + let response::Opened {} = opened.opened.as_ref().context("expected Opened")?; + + let flow::CollectionSpec { + ack_template_json: _, + derivation, + key, + name: collection_name, + partition_fields, + partition_template: _, + projections, + read_schema_json: _, + uuid_ptr, + write_schema_json: _, + } = collection.context("missing collection")?; + + let flow::collection_spec::Derivation { + config_json: _, + connector_type: _, + network_ports: _, + recovery_log_template: _, + shard_template: _, + shuffle_key_types: _, + transforms, + } = derivation.as_ref().context("missing derivation")?; + + if uuid_ptr.is_empty() { + anyhow::bail!("uuid_ptr cannot be empty"); + } else if key.is_empty() { + anyhow::bail!("collection key cannot be empty"); + } + + let range = range.context("missing range")?; + let ser_policy = doc::SerPolicy::default(); + + let document_uuid_ptr = doc::Pointer::from(uuid_ptr); + let key_extractors = extractors::for_key(&key, &projections, &ser_policy)?; + let partition_extractors = + extractors::for_fields(&partition_fields, &projections, &ser_policy)?; + + let shard_ref = ops::ShardRef { + kind: ops::TaskType::Derivation as i32, + name: collection_name.clone(), + key_begin: format!("{:08x}", range.key_begin), + r_clock_begin: format!("{:08x}", range.r_clock_begin), + }; + + let transforms = transforms + .into_iter() + .enumerate() + .map(|(index, spec)| Transform::new(spec).context(index)) + .collect::, _>>()?; + + Ok(Self { + collection_name, + document_uuid_ptr, + key_extractors, + partition_extractors, + ser_policy, + shard_ref, + transforms, + }) + } + + pub fn combine_spec(&self) -> anyhow::Result { + todo!() + } +} + +impl Transform { + pub fn new(spec: &flow::collection_spec::derivation::Transform) -> anyhow::Result { + let flow::collection_spec::derivation::Transform { + collection, + journal_read_suffix: _, + lambda_config_json: _, + name, + not_after: _, + not_before: _, + partition_selector: _, + priority: _, + read_delay_seconds: _, + read_only: _, + shuffle_key: _, + shuffle_lambda_config_json: _, + } = spec; + + let flow::CollectionSpec { + ack_template_json: _, + derivation: _, + key: _, + name: collection_name, + partition_fields: _, + partition_template: _, + projections: _, + read_schema_json: _, + uuid_ptr: _, + write_schema_json: _, + } = collection.as_ref().context("missing collection")?; + + Ok(Self { + collection_name: collection_name.clone(), + name: name.clone(), + }) + } +} diff --git a/crates/runtime/src/image_connector.rs b/crates/runtime/src/image_connector.rs index 2ad1db95be..d0409bdb0e 100644 --- a/crates/runtime/src/image_connector.rs +++ b/crates/runtime/src/image_connector.rs @@ -1,236 +1,61 @@ use super::container; -use futures::{channel::mpsc, future::BoxFuture, SinkExt, Stream, TryStreamExt}; -use tokio::task::JoinHandle; +use futures::{channel::mpsc, future::BoxFuture, Stream, TryStreamExt}; /// Container is a description of a running Container instance. pub use proto_flow::runtime::Container; -/// Unsealed is a container context that's ready to spawn. -pub struct Unsealed { - /// Image to run. - pub image: String, - /// Log-level of the container, if known. - pub log_level: Option, - /// First request of the connector stream. - pub request: Request, -} - -/// UnsealFuture is the response type of a function that unseals Requests. -pub type UnsealFuture = BoxFuture<'static, anyhow::Result>>; - /// StartRpcFuture is the response type of a function that starts a connector RPC. pub type StartRpcFuture = BoxFuture<'static, tonic::Result>>>; -/// serve a long-lived Request stream by delegating to image connector instances, -/// which each is started and stopped as required by the context of the input RPC stream. -/// -/// * Request: The RPC Request type. -/// * Response: The RPC Response type. -/// * Requests: A Stream of Request. -/// * Unseal: Attempt to Unseal a Request, returning Ok with a future that -/// resolves the Unsealed Result or, if the Request does not unseal, -/// then an Error with the unmodified Request. -/// * StartRpc: Start an RPC stream with the container channel. -/// * Attach: Attach a Container description to the first Response -/// of each delegate container lifecycle. -pub fn serve( +pub async fn serve( attach_container: Attach, // Attaches a Container description to a response. - log_handler: L, // Log handler. + image: String, // Container image to run. + log_handler: impl crate::LogHandler, // Handler for connector logs. + log_level: Option, // Log-level of the connector, if known. network: &str, // Container network to use. - request_rx: Requests, // Caller's input request stream. + request_rx: mpsc::Receiver, // Caller's input request stream. start_rpc: StartRpc, // Begins RPC over a started container channel. task_name: &str, // Name of this task, used to label container. task_type: ops::TaskType, // Type of this task, for labeling container. - unseal: Unseal, // Unseals a Request, or returns the Request if it doesn't unseal. -) -> mpsc::Receiver> +) -> anyhow::Result> + Send> where - Request: serde::Serialize + Send, + Request: serde::Serialize + Send + 'static, Response: Send + Sync + 'static, - Requests: Stream> + Send + 'static, - Unseal: Fn(Request) -> Result, Request> + Send + Sync + 'static, StartRpc: Fn(tonic::transport::Channel, mpsc::Receiver) -> StartRpcFuture + Send - + Sync + 'static, - Attach: Fn(&mut Response, Container) + Clone + Send + Sync + 'static, - L: Fn(&ops::Log) + Clone + Send + Sync + 'static, + Attach: Fn(&mut Response, Container) + Send + 'static, { - let (response_tx, response_rx) = mpsc::channel(crate::CHANNEL_BUFFER); - - let mut response_tx_clone = response_tx.clone(); - let network = network.to_string(); - let task_name = task_name.to_string(); - - // Future which consumes requests and manages connector lifecycle. - let fut = async move { - let mut request_rx = std::pin::pin!(request_rx); - let mut state = State::Idle; - - loop { - match state { - State::Idle => { - let Some(request) = request_rx.try_next().await? else { - return Ok(()); // All done. - }; - match (unseal)(request) { - Ok(unseal) => { - state = State::Starting { unseal }; - } - Err(request) => { - anyhow::bail!( - "invalid initial Request: {}", - serde_json::to_string(&request).unwrap() - ); - } - } - } - State::Starting { unseal } => { - let Unsealed { - image, - log_level, - request, - } = unseal.await.map_err(crate::anyhow_to_status)?; - - let (mut container_tx, container_rx) = mpsc::channel(crate::CHANNEL_BUFFER); - () = container_tx - .try_send(request) - .expect("can always send first request into buffered channel"); - - let (container, channel, guard) = container::start( - &image, - log_handler.clone(), - log_level, - &network, - &task_name, - task_type, - ) - .await - .map_err(crate::anyhow_to_status)?; - - // Start RPC over the container's gRPC `channel`. - let mut container_rx = (start_rpc)(channel, container_rx).await?.into_inner(); - - // Spawn task which reads and forwards connector responses. - let mut attach = Some((container, attach_container.clone())); - let mut response_tx = response_tx.clone(); - - let container_status = tokio::spawn(async move { - let _guard = guard; // Hold guard while still reading responses. - - while let Some(mut response) = container_rx.try_next().await? { - if let Some((container, attach)) = attach.take() { - (attach)(&mut response, container); - } - if let Err(_) = response_tx.send(Ok(response)).await { - anyhow::bail!( - "failed to forward response because receiver is gone" - ); - } - } - Ok(()) - }); - - state = State::Running { - container_status, - container_tx, - }; - } - State::Running { - mut container_status, - mut container_tx, - } => { - // Wait for a next request or for the container to exit. - let request = tokio::select! { - status = &mut container_status => { - () = status.unwrap()?; - anyhow::bail!( - "connector unexpectedly closed its output stream while its input stream is still open" - ); - }, - request = request_rx.try_next() => request?, - }; - - match request.map(|request| (unseal)(request)) { - // A non-sealed request that we simply forward. - Some(Err(request)) => { - container_tx.feed(request).await.map_err(|_send_err| { - anyhow::anyhow!("connector unexpectedly closed its input stream") - })?; - - state = State::Running { - container_status, - container_tx, - }; - } - // Sealed request which requires that we restart the container. - Some(Ok(unseal)) => { - let _drop_to_send_eof = container_tx; - state = State::Restarting { - container_status, - unseal, - }; - } - // End of input. - None => { - let _drop_to_send_eof = container_tx; - state = State::Draining { container_status }; - } - } - } - State::Restarting { - container_status, - unseal, - } => { - () = container_status.await.unwrap()?; - state = State::Starting { unseal }; - } - State::Draining { container_status } => { - () = container_status.await.unwrap()?; - return Ok(()); // All done. - } - } - } - }; - - // Spawn a task that awaits the future and forwards a terminal error. - tokio::spawn(async move { - let Err(err) = fut.await else { return }; - - if let Err(send_error) = response_tx_clone.send(Err(err)).await { - tracing::warn!(%send_error, "encountered terminal error but receiver is gone"); - return; + let (container, channel, guard) = container::start( + &image, + log_handler.clone(), + log_level, + &network, + &task_name, + task_type, + ) + .await?; + + // Start RPC over the container's gRPC `channel`. + let mut container_rx = (start_rpc)(channel, request_rx).await?.into_inner(); + + let container_rx = coroutines::try_coroutine(move |mut co| async move { + let _guard = guard; // Move into future. + + // Attach `container` to the first request. + let Some(mut first) = container_rx.try_next().await? else { + return Ok(()); + }; + (attach_container)(&mut first, container); + () = co.yield_(first).await; + + // Stream successive requests. + while let Some(response) = container_rx.try_next().await? { + () = co.yield_(response).await; } + Ok(()) }); - response_rx -} - -// TODO(johnny): This State can be extended with a Resumed variant when we -// finally tackle incremental container snapshots & recovery. The Resumed -// variant would introspect and hold the effective Request.Open of the resumed -// container, which would then be matched with a current Open to either resume -// or drain a recovered connector instance. -enum State { - Idle, - // We're ready to start a container. - Starting { - unseal: UnsealFuture, - }, - // Container has an active bidirectional stream. - Running { - container_status: JoinHandle>, - container_tx: mpsc::Sender, - }, - // We must restart a new container. We've sent the current one EOF, - // and are waiting to see its EOF before we begin a new instance. - Restarting { - container_status: JoinHandle>, - unseal: UnsealFuture, - }, - // Requests reached EOF. We've sent EOF into the container and are - // draining its final responses. - Draining { - container_status: JoinHandle>, - }, + Ok(container_rx) } diff --git a/crates/runtime/src/lib.rs b/crates/runtime/src/lib.rs index 493b6d0ab6..77f5e33c15 100644 --- a/crates/runtime/src/lib.rs +++ b/crates/runtime/src/lib.rs @@ -6,8 +6,10 @@ mod container; mod derive; mod image_connector; mod local_connector; -mod materialize; +//mod materialize; +// mod task_combiner; mod task_service; +mod task_state; mod tokio_context; mod unary; mod unseal; @@ -45,12 +47,19 @@ fn stream_status_to_error>>( s.map_err(anyhow::Error::new) } +#[must_use] +fn protocol_error(context: &str, msg: impl serde::Serialize) -> anyhow::Error { + let mut msg = serde_json::to_string(&msg).unwrap(); + msg.truncate(4096); + anyhow::format_err!("protocol error ({context}): {msg}") +} + +pub trait LogHandler: Fn(&ops::Log) + Send + Sync + Clone + 'static {} +impl LogHandler for T {} + /// Runtime implements the various services that constitute the Flow Runtime. #[derive(Clone)] -pub struct Runtime -where - L: Fn(&ops::Log) + Send + Sync + Clone + 'static, -{ +pub struct Runtime { allow_local: bool, container_network: String, log_handler: L, @@ -58,10 +67,7 @@ where task_name: String, } -impl Runtime -where - L: Fn(&ops::Log) + Send + Sync + Clone + 'static, -{ +impl Runtime { /// Build a new Runtime. /// * `allow_local`: Whether local connectors are permitted by this Runtime. /// * `container_network`: the Docker container network used for connector containers. @@ -86,21 +92,22 @@ where /// Build a tonic Server which includes all of the Runtime's services. pub fn build_tonic_server(self) -> tonic::transport::server::Router { - tonic::transport::Server::builder() - .add_service( - proto_grpc::capture::connector_server::ConnectorServer::new(self.clone()) - .max_decoding_message_size(usize::MAX) // Up from 4MB. Accept whatever the Go runtime sends. - .max_encoding_message_size(usize::MAX), // The default, made explicit. - ) - .add_service( - proto_grpc::derive::connector_server::ConnectorServer::new(self.clone()) - .max_decoding_message_size(usize::MAX) // Up from 4MB. Accept whatever the Go runtime sends. - .max_encoding_message_size(usize::MAX), // The default, made explicit. - ) - .add_service( - proto_grpc::materialize::connector_server::ConnectorServer::new(self) - .max_decoding_message_size(usize::MAX) // Up from 4MB. Accept whatever the Go runtime sends. - .max_encoding_message_size(usize::MAX), // The default, made explicit. - ) + tonic::transport::Server::builder().add_service( + proto_grpc::capture::connector_server::ConnectorServer::new(self.clone()) + .max_decoding_message_size(usize::MAX) // Up from 4MB. Accept whatever the Go runtime sends. + .max_encoding_message_size(usize::MAX), // The default, made explicit. + ) + /* + .add_service( + proto_grpc::derive::connector_server::ConnectorServer::new(self.clone()) + .max_decoding_message_size(usize::MAX) // Up from 4MB. Accept whatever the Go runtime sends. + .max_encoding_message_size(usize::MAX), // The default, made explicit. + ) + .add_service( + proto_grpc::materialize::connector_server::ConnectorServer::new(self) + .max_decoding_message_size(usize::MAX) // Up from 4MB. Accept whatever the Go runtime sends. + .max_encoding_message_size(usize::MAX), // The default, made explicit. + ) + */ } } diff --git a/crates/runtime/src/local_connector.rs b/crates/runtime/src/local_connector.rs index 6274262aef..6750e15a07 100644 --- a/crates/runtime/src/local_connector.rs +++ b/crates/runtime/src/local_connector.rs @@ -1,221 +1,39 @@ -use futures::{channel::mpsc, future::BoxFuture, SinkExt, Stream, StreamExt, TryStreamExt}; +use futures::{channel::mpsc, Stream, StreamExt}; use std::collections::BTreeMap; -use tokio::task::JoinHandle; -/// Container is a description of a running Container instance. -pub use proto_flow::runtime::Container; - -/// Unsealed is a container context that's ready to spawn. -pub struct Unsealed { - /// Image to run. - pub command: Vec, - /// Environment variables. - pub env: BTreeMap, - /// Log-level of the container, if known. - pub log_level: Option, - /// Whether to use protobuf. - pub protobuf: bool, - /// First request of the connector stream. - pub request: Request, -} - -/// UnsealFuture is the response type of a function that unseals Requests. -pub type UnsealFuture = BoxFuture<'static, anyhow::Result>>; - -/// serve a long-lived Request stream which delegates to local connector instances, -/// which are started and stopped as required by the context of the input RPC stream. -/// -/// * Request: The RPC Request type. -/// * Response: The RPC Response type. -/// * Requests: A Stream of Request. -/// * Unseal: Attempt to Unseal a Request, returning Ok with a future that -/// resolves the Unsealed Result or, if the Request does not unseal, -/// then an Error with the unmodified Request. -pub fn serve( - log_handler: L, // Log handler. - request_rx: Requests, // Caller's input request stream. - unseal: Unseal, // Unseals a Request, or returns the Request if it doesn't unseal. -) -> mpsc::Receiver> +pub fn serve( + command: Vec, // Connector to run. + env: BTreeMap, // Environment variables. + log_handler: impl crate::LogHandler, // Handler for connector logs. + log_level: Option, // Log-level of the container, if known. + protobuf: bool, // Whether to use protobuf codec. + request_rx: mpsc::Receiver, // Caller's input request stream. +) -> anyhow::Result> + Send> where Request: serde::Serialize + prost::Message + Send + Sync + 'static, - Response: Default + prost::Message + for<'de> serde::Deserialize<'de> + 'static, - Requests: Stream> + Send + 'static, - Unseal: Fn(Request) -> Result, Request> + Send + Sync + 'static, - L: Fn(&ops::Log) + Clone + Send + Sync + 'static, + Response: prost::Message + for<'de> serde::Deserialize<'de> + Default + 'static, { - let (response_tx, response_rx) = mpsc::channel(crate::CHANNEL_BUFFER); - - let mut response_tx_clone = response_tx.clone(); - - // Future which consumes requests and manages connector lifecycle. - let fut = async move { - let mut request_rx = std::pin::pin!(request_rx); - let mut state = State::Idle; - - loop { - match state { - State::Idle => { - let Some(request) = request_rx.try_next().await? else { - return Ok(()); // All done. - }; - match (unseal)(request) { - Ok(unseal) => { - state = State::Starting { unseal }; - } - Err(request) => { - anyhow::bail!( - "invalid initial Request: {}", - serde_json::to_string(&request).unwrap() - ); - } - } - } - State::Starting { unseal } => { - let Unsealed { - command, - env, - log_level, - protobuf, - request, - } = unseal.await.map_err(crate::anyhow_to_status)?; - - let (mut container_tx, container_rx) = mpsc::channel(crate::CHANNEL_BUFFER); - () = container_tx - .try_send(request) - .expect("can always send first request into buffered channel"); - - let codec = if protobuf { - connector_init::Codec::Proto - } else { - connector_init::Codec::Json - }; - - // Invoke the underlying local connector. - let mut connector = connector_init::rpc::new_command(&command); - connector.envs(&env); - - if let Some(log_level) = log_level { - connector.env("LOG_LEVEL", log_level.as_str_name()); - } - - let container_rx = connector_init::rpc::bidi::( - connector, - codec, - container_rx.map(Result::Ok), - log_handler.clone(), - )?; - - // Spawn task which reads and forwards connector responses. - let mut response_tx = response_tx.clone(); - - let container_status = tokio::spawn(async move { - let mut container_rx = std::pin::pin!(container_rx); - - while let Some(response) = container_rx.try_next().await? { - if let Err(_) = response_tx.send(Ok(response)).await { - anyhow::bail!( - "failed to forward response because receiver is gone" - ); - } - } - Ok(()) - }); - - state = State::Running { - container_status, - container_tx, - }; - } - State::Running { - mut container_status, - mut container_tx, - } => { - // Wait for a next request or for the container to exit. - let request = tokio::select! { - status = &mut container_status => { - () = status.unwrap()?; - anyhow::bail!( - "connector unexpectedly closed its output stream while its input stream is still open" - ); - }, - request = request_rx.try_next() => request?, - }; - - match request.map(|request| (unseal)(request)) { - // A non-sealed request that we simply forward. - Some(Err(request)) => { - container_tx.feed(request).await.map_err(|_send_err| { - anyhow::anyhow!("connector unexpectedly closed its input stream") - })?; - - state = State::Running { - container_status, - container_tx, - }; - } - // Sealed request which requires that we restart the container. - Some(Ok(unseal)) => { - let _drop_to_send_eof = container_tx; - state = State::Restarting { - container_status, - unseal, - }; - } - // End of input. - None => { - let _drop_to_send_eof = container_tx; - state = State::Draining { container_status }; - } - } - } - State::Restarting { - container_status, - unseal, - } => { - () = container_status.await.unwrap()?; - state = State::Starting { unseal }; - } - State::Draining { container_status } => { - () = container_status.await.unwrap()?; - return Ok(()); // All done. - } - } - } + let codec = if protobuf { + connector_init::Codec::Proto + } else { + connector_init::Codec::Json }; - // Spawn a task that awaits the future and forwards a terminal error. - tokio::spawn(async move { - let Err(err) = fut.await else { return }; + // Invoke the underlying local connector. + let mut connector = connector_init::rpc::new_command(&command); + connector.envs(&env); - if let Err(send_error) = response_tx_clone.send(Err(err)).await { - tracing::warn!(%send_error, "encountered terminal error but receiver is gone"); - return; - } - }); + if let Some(log_level) = log_level { + connector.env("LOG_LEVEL", log_level.as_str_name()); + } - response_rx -} + let container_rx = connector_init::rpc::bidi::( + connector, + codec, + request_rx.map(Result::Ok), + log_handler.clone(), + )?; + let container_rx = crate::stream_status_to_error(container_rx); -enum State { - Idle, - // We're ready to start a container. - Starting { - unseal: UnsealFuture, - }, - // Container has an active bidirectional stream. - Running { - container_status: JoinHandle>, - container_tx: mpsc::Sender, - }, - // We must restart a new container. We've sent the current one EOF, - // and are waiting to see its EOF before we begin a new instance. - Restarting { - container_status: JoinHandle>, - unseal: UnsealFuture, - }, - // Requests reached EOF. We've sent EOF into the container and are - // draining its final responses. - Draining { - container_status: JoinHandle>, - }, + Ok(container_rx) } diff --git a/crates/runtime/src/task_combiner.rs b/crates/runtime/src/task_combiner.rs new file mode 100644 index 0000000000..6a3ddfbaee --- /dev/null +++ b/crates/runtime/src/task_combiner.rs @@ -0,0 +1,492 @@ +use anyhow::Context; +use ops::stats::DocsAndBytes; +use proto_flow::flow::{self, CollectionSpec, FieldSelection}; +use std::collections::BTreeMap; +use std::sync::Arc; + +pub struct Task { + // Does this task use a custom state JSON-schema? + pub custom_state_schema: Option, + // ShardRef of this task. + pub shard_ref: ops::ShardRef, +} + +// Source collection which is read from by a Task. +pub struct Source { + pub spec: CollectionSpec, + // Partition or field values which are extracted from binding documents. + pub field_extractors: Vec, + // Serialization policy for documents. + pub ser_policy: doc::SerPolicy, +} + +pub struct Target { + pub spec: CollectionSpec, + // JSON pointer at which document UUIDs are added. + pub document_uuid_ptr: doc::Pointer, + // Key components which are extracted from written documents. + pub key_extractors: Vec, + // Partition values which are extracted from written documents. + pub partition_extractors: Vec, +} + +pub struct BindingSpec { + // Name of the collection bound to this binding. + pub collection: models::Collection, + // Validation context for the bound collection's documents. + pub schema: Arc, + // JSON pointer at which document UUIDs are located. + pub document_uuid_ptr: Option, + // Key components which are extracted from binding documents. + pub key_extractors: Vec, + // Serialization policy for documents. + pub ser_policy: doc::SerPolicy, + // Partition or field values which are extracted from binding documents. + pub value_extractors: Vec, +} + +pub struct BindingStats { + // Statistics for left-hand (front) binding documents fed into the combiner. + pub stats_left: DocsAndBytes, + // Statistics for documents of the binding drained out from the combiner. + pub stats_out: DocsAndBytes, + // Statistics for right-hand (back) binding documents fed into the combiner. + pub stats_right: DocsAndBytes, +} + +pub struct BindingInference { + // Inferred shape of binding documents. + pub inferred_shape: Option, + // Did `inferred_shape` change during the current transaction? + pub inferred_shape_changed: bool, +} + +impl TaskSpec { + pub fn for_capture( + open: &proto_flow::capture::request::Open, + _opened: &proto_flow::capture::response::Opened, + ) -> anyhow::Result { + let proto_flow::capture::request::Open { + capture, + range, + state_json: _, + version: _, + } = open; + + let capture = capture.as_ref().context("missing capture")?; + let range = range.as_ref().context("missing range")?; + let ser_policy = doc::SerPolicy::default(); + + let bindings = capture + .bindings + .iter() + .enumerate() + .map(|(index, spec)| { + BindingSpec::for_capture(ser_policy.clone(), spec) + .with_context(|| format!("binding {index} is invalid")) + }) + .collect::, _>>()?; + + let shard_ref = ops::ShardRef { + kind: ops::TaskType::Capture as i32, + name: capture.name.clone(), + key_begin: format!("{:08x}", range.key_begin), + r_clock_begin: format!("{:08x}", range.r_clock_begin), + }; + + // TODO(johnny): Pass in from built CaptureSpec. + let state_schema = ""; + + Self::build(bindings, shard_ref, state_schema) + } + + pub fn open_derivation( + open: &proto_flow::derive::request::Open, + _opened: &proto_flow::derive::response::Opened, + ) -> anyhow::Result<(Self, Vec<(String, models::Collection)>)> { + let proto_flow::derive::request::Open { + collection, + range, + state_json: _, + version: _, + } = open; + + let collection = collection.as_ref().context("missing collection")?; + let range = range.as_ref().context("missing range")?; + let ser_policy = doc::SerPolicy::default(); + + let bindings = vec![BindingSpec::for_written_collection(ser_policy, collection)?]; + + let shard_ref = ops::ShardRef { + kind: ops::TaskType::Derivation as i32, + name: collection.name.clone(), + key_begin: format!("{:08x}", range.key_begin), + r_clock_begin: format!("{:08x}", range.r_clock_begin), + }; + + // TODO(johnny): Pass in from built collection_spec::Derivation. + let state_schema = ""; + + // In addition to derived and combined documents, derivations must also track + // and record statistics for read collection documents. + let transforms = collection + .derivation + .as_ref() + .context("missing derivation")? + .transforms + .iter() + .map(|transform| { + ( + transform.name.clone(), + models::Collection::new(&transform.collection.as_ref().unwrap().name), + ) + }) + .collect(); + + Ok((Self::build(bindings, shard_ref, state_schema)?, transforms)) + } + + pub fn open_materialization( + open: &proto_flow::materialize::request::Open, + _opened: &proto_flow::materialize::response::Opened, + ) -> anyhow::Result { + let proto_flow::materialize::request::Open { + materialization, + range, + state_json: _, + version: _, + } = open; + + let materialization = materialization + .as_ref() + .context("missing materialization")?; + let range = range.as_ref().context("missing range")?; + + // TODO(johnny): Hack to address string truncation for these common materialization connectors + // that don't handle large strings very well. This should be negotiated via connector protocol. + // See go/runtime/materialize.go:135 + let ser_policy = if [ + "ghcr.io/estuary/materialize-snowflake", + "ghcr.io/estuary/materialize-redshift", + "ghcr.io/estuary/materialize-sqlite", + ] + .iter() + .any(|image| materialization.config_json.contains(image)) + { + doc::SerPolicy { + str_truncate_after: 1 << 16, // Truncate at 64KB. + } + } else { + doc::SerPolicy::default() + }; + + let bindings = materialization + .bindings + .iter() + .enumerate() + .map(|(index, spec)| { + BindingSpec::for_materialization(ser_policy.clone(), spec) + .with_context(|| format!("binding {index} is invalid")) + }) + .collect::, _>>()?; + + let shard_ref = ops::ShardRef { + kind: ops::TaskType::Materialization as i32, + name: materialization.name.clone(), + key_begin: format!("{:08x}", range.key_begin), + r_clock_begin: format!("{:08x}", range.r_clock_begin), + }; + + // TODO(johnny): Pass in from built MaterializationSpec. + let state_schema = ""; + + Self::build(bindings, shard_ref, state_schema) + } + + pub fn combine( + &mut self, + binding_index: u32, + doc_json: &str, + front: bool, + ) -> anyhow::Result<()> { + let binding = self + .bindings + .get_mut(binding_index as usize) + .with_context(|| "invalid combine-right binding {binding}")?; + + let memtable = match &mut self.inner { + doc::Combiner::Accumulator(accumulator) => accumulator.memtable()?, + _ => panic!("implementation error: combiner is draining, not accumulating"), + }; + let alloc = memtable.alloc(); + + let mut de = serde_json::Deserializer::from_str(doc_json); + let mut doc = doc::HeapNode::from_serde(&mut de, alloc) + .with_context(|| format!("couldn't parse published document as JSON: {doc_json}"))?; + + if let Some(ptr) = &binding.document_uuid_ptr { + if let Some(node) = ptr.create_heap_node(&mut doc, alloc) { + *node = + doc::HeapNode::String(doc::BumpStr::from_str(crate::UUID_PLACEHOLDER, alloc)); + } + } + memtable.add(binding_index, doc, front)?; + + if front { + binding.stats_left.docs_total += 1; + binding.stats_left.bytes_total += doc_json.len() as u64; + } else { + binding.stats_right.docs_total += 1; + binding.stats_right.bytes_total += doc_json.len() as u64; + } + + Ok(()) + } + + pub fn build_binding_stats(&mut self) -> BTreeMap { + let mut stats = BTreeMap::::new(); + + let merge = |from: &mut DocsAndBytes, to: &mut Option| { + if from.docs_total != 0 { + let entry = to.get_or_insert_with(Default::default); + entry.docs_total += from.docs_total; + entry.bytes_total += from.bytes_total; + *from = DocsAndBytes::default(); // Clear. + } + }; + + for binding in self.bindings.iter_mut() { + if binding.stats_out.docs_total == 0 { + continue; // Skip creating a stats map entry. + } + let entry = stats.entry(binding.collection.to_string()).or_default(); + + merge(&mut binding.stats_right, &mut entry.right); + merge(&mut binding.stats_out, &mut entry.out); + merge(&mut binding.stats_left, &mut entry.left); + } + + stats + } + + pub fn log_updated_schemas(&mut self) { + for binding in self.bindings.iter_mut() { + if binding.inferred_shape_changed { + binding.inferred_shape_changed = false; + + let serialized = serde_json::to_value(&doc::shape::schema::to_schema( + binding.inferred_shape.clone().unwrap(), + )) + .expect("shape serialization should never fail"); + + tracing::info!( + schema = ?::ops::DebugJson(serialized), + collection_name = %binding.collection, + "inferred schema updated" + ); + } + } + } + + fn build( + bindings: Vec<(BindingSpec, (bool, Vec, doc::Validator))>, + shard_ref: ops::ShardRef, + state_schema: &str, + ) -> anyhow::Result { + let (bindings, combiner_spec): (Vec, Vec<_>) = bindings.into_iter().unzip(); + + let (has_state_schema, state_validator) = if state_schema.is_empty() { + let state_schema = doc::reduce::merge_patch_schema().to_string(); + ( + false, + doc::Validator::from_schema(doc::validation::build_bundle(&state_schema).unwrap()) + .unwrap(), + ) + } else { + ( + true, + doc::Validator::from_schema(doc::validation::build_bundle(state_schema)?)?, + ) + }; + + // Initialize combiner with all bindings, plus one extra for state reductions. + let combiner_spec = doc::combine::Spec::with_bindings( + combiner_spec + .into_iter() + .map(|(is_full, key, validator)| (is_full, key, None, validator)) + .chain(std::iter::once((false, Vec::new(), None, state_validator))), + ); + let combiner = doc::Combiner::new( + combiner_spec, + tempfile::tempfile().context("opening temporary spill file")?, + )?; + + Ok(Self { + bindings, + has_state_schema, + inner: combiner, + shard_ref, + }) + } +} + +impl BindingSpec { + // Map a drained document into accumulated statistics and a + // (packed-key, packed-values, encoded-json) tuple. + pub fn drained( + &mut self, + root: &doc::OwnedNode, + buf: &mut bytes::BytesMut, + ) -> (bytes::Bytes, bytes::Bytes, String) { + if let Some(inferred_shape) = &mut self.inferred_shape { + if inferred_shape.widen_owned(root) { + doc::shape::limits::enforce_shape_complexity_limit( + inferred_shape, + doc::shape::limits::DEFAULT_SCHEMA_COMPLEXITY_LIMIT, + ); + self.inferred_shape_changed = true; + } + } + + let key_packed = doc::Extractor::extract_all_owned(root, &self.key_extractors, buf); + let val_packed = doc::Extractor::extract_all_owned(root, &self.value_extractors, buf); + let doc_json = serde_json::to_string(&self.ser_policy.on_owned(root)) + .expect("document serialization cannot fail"); + + self.stats_out.docs_total += 1; + self.stats_out.bytes_total += doc_json.len() as u64; + + (key_packed, val_packed, doc_json) + } + + fn for_capture( + ser_policy: doc::SerPolicy, + spec: &flow::capture_spec::Binding, + ) -> anyhow::Result<(Self, (bool, Vec, doc::Validator))> { + Self::for_written_collection( + ser_policy, + spec.collection + .as_ref() + .context("missing required collection")?, + ) + } + + fn for_materialization( + ser_policy: doc::SerPolicy, + spec: &flow::materialization_spec::Binding, + ) -> anyhow::Result<(Self, (bool, Vec, doc::Validator))> { + let flow::materialization_spec::Binding { + collection, + field_selection, + .. + } = spec; + + let CollectionSpec { + ack_template_json: _, + derivation: _, + key, + name, + partition_fields: _, + partition_template: _, + projections, + read_schema_json, + uuid_ptr: _, + write_schema_json, + } = collection.as_ref().context("missing required collection")?; + + // We always combine over the collection key. + if key.is_empty() { + anyhow::bail!("collection key cannot be empty"); + } + let combiner_extractors = extractors::for_key(&key, &projections, &ser_policy)?; + + // Materializations are allowed to choose a subset of key and value fields. + // Usually this matches the collection key, but that isn't required. + let FieldSelection { + keys: selected_keys, + values: selected_values, + .. + } = field_selection + .as_ref() + .context("missing required field selection")?; + + let key_extractors = extractors::for_fields(&selected_keys, &projections, &ser_policy)?; + let value_extractors = extractors::for_fields(&selected_values, projections, &ser_policy)?; + + let read_schema_json = if !read_schema_json.is_empty() { + read_schema_json + } else { + write_schema_json + }; + + let built_schema = doc::validation::build_bundle(&read_schema_json) + .context("collection read schema is not a JSON schema")?; + let validator = doc::Validator::from_schema(built_schema) + .context("could not build a schema validator")?; + + Ok(( + Self { + collection: models::Collection::new(name), + document_uuid_ptr: None, // Not added. + inferred_shape: None, // Not inferred. + inferred_shape_changed: false, + key_extractors, + ser_policy, + stats_right: Default::default(), + stats_out: Default::default(), + stats_left: Default::default(), + value_extractors, + }, + (!spec.delta_updates, combiner_extractors, validator), + )) + } + + fn for_written_collection( + ser_policy: doc::SerPolicy, + spec: &flow::CollectionSpec, + ) -> anyhow::Result<(Self, (bool, Vec, doc::Validator))> { + let CollectionSpec { + ack_template_json: _, + derivation: _, + key, + name, + partition_fields, + partition_template: _, + projections, + read_schema_json: _, + uuid_ptr, + write_schema_json, + } = spec; + + if uuid_ptr.is_empty() { + anyhow::bail!("uuid_ptr cannot be empty"); + } else if key.is_empty() { + anyhow::bail!("collection key cannot be empty"); + } + + let document_uuid_ptr = Some(doc::Pointer::from(uuid_ptr)); + let key_extractors = extractors::for_key(&key, &projections, &ser_policy)?; + let value_extractors = extractors::for_fields(partition_fields, projections, &ser_policy)?; + + let built_schema = doc::validation::build_bundle(&write_schema_json) + .context("collection write_schema_json is not a JSON schema")?; + let validator = doc::Validator::from_schema(built_schema) + .context("could not build a schema validator")?; + + Ok(( + Self { + collection: models::Collection::new(name), + document_uuid_ptr, + inferred_shape: Some(doc::Shape::nothing()), + inferred_shape_changed: false, + key_extractors: key_extractors.clone(), + ser_policy, + stats_right: Default::default(), + stats_out: Default::default(), + stats_left: Default::default(), + value_extractors, + }, + (false, key_extractors, validator), + )) + } +} diff --git a/crates/runtime/src/task_state.rs b/crates/runtime/src/task_state.rs index b3425d4405..1921441e5a 100644 --- a/crates/runtime/src/task_state.rs +++ b/crates/runtime/src/task_state.rs @@ -19,10 +19,7 @@ impl std::ops::Deref for RocksDB { } impl RocksDB { - pub fn open( - desc: Option, - state_schema: serde_json::Value, - ) -> anyhow::Result { + pub fn open(desc: Option) -> anyhow::Result { let (mut opts, path, _tmp) = match desc { Some(RocksDbDescriptor { rocksdb_path, @@ -73,6 +70,8 @@ impl RocksDB { let mut cf_opts = rocksdb::Options::default(); if name == rocksdb::DEFAULT_COLUMN_FAMILY_NAME { + let state_schema = doc::reduce::merge_patch_schema(); + set_json_schema_merge_operator( &mut cf_opts, &task_state_default_json_schema(&state_schema).to_string(), @@ -246,7 +245,7 @@ mod test { #[test] fn connector_state_merge() { - let db = RocksDB::open(None, doc::reduce::merge_patch_schema()).unwrap(); + let db = RocksDB::open(None).unwrap(); for doc in [ r#"{"a":"b","n":null}"#, diff --git a/crates/runtime/src/unary.rs b/crates/runtime/src/unary.rs index b4fea3632d..b9a49e46cd 100644 --- a/crates/runtime/src/unary.rs +++ b/crates/runtime/src/unary.rs @@ -1,21 +1,19 @@ -use super::Runtime; +use super::{LogHandler, Runtime}; use futures::{stream::BoxStream, Future, FutureExt, StreamExt, TryStreamExt}; use proto_flow::{capture, derive, materialize}; use std::time::Duration; -impl Runtime -where - L: Fn(&ops::Log) + Send + Sync + Clone + 'static, -{ +impl Runtime { pub async fn unary_capture( self, request: capture::Request, timeout: Duration, ) -> anyhow::Result { let response = self.serve_capture(unary_in(request)).boxed(); - unary_out(response, timeout).await + unary_out(futures::future::ready(Ok(response)), timeout).await } + /* pub async fn unary_derive( self, request: derive::Request, @@ -33,6 +31,7 @@ where let response = self.serve_materialize(unary_in(request)).boxed(); unary_out(response, timeout).await } + */ } fn unary_in(request: R) -> BoxStream<'static, anyhow::Result> {