Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
jgraettinger committed Nov 8, 2023
1 parent 6b01669 commit 3fec844
Show file tree
Hide file tree
Showing 26 changed files with 2,691 additions and 1,946 deletions.
16 changes: 6 additions & 10 deletions crates/derive-sqlite/src/connector.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,17 @@
use super::{dbutil, do_validate, parse_validate, Config, Lambda, Param, Transform};
use anyhow::Context;
use futures::channel::mpsc;
use futures::TryStreamExt;
use futures::{SinkExt, Stream};
use futures::{SinkExt, StreamExt};
use prost::Message;
use proto_flow::runtime::{derive_request_ext, derive_response_ext, DeriveRequestExt};
use proto_flow::{
derive::{request, response, Request, Response},
flow, RuntimeCheckpoint,
};

pub fn connector<R>(
_peek_request: &Request,
request_rx: R,
) -> anyhow::Result<impl Stream<Item = anyhow::Result<Response>>>
pub fn connector<R>(request_rx: R) -> mpsc::Receiver<anyhow::Result<Response>>
where
R: futures::stream::Stream<Item = anyhow::Result<Request>> + Send + 'static,
R: futures::stream::Stream<Item = Request> + Send + 'static,
{
let (mut response_tx, response_rx) = mpsc::channel(16);

Expand All @@ -27,15 +23,15 @@ where
})
});

Ok(response_rx)
response_rx
}

async fn serve<R>(
mut request_rx: R,
response_tx: &mut mpsc::Sender<anyhow::Result<Response>>,
) -> anyhow::Result<()>
where
R: futures::stream::Stream<Item = anyhow::Result<Request>>,
R: futures::stream::Stream<Item = Request>,
{
let mut request_rx = std::pin::pin!(request_rx);
let tokio_handle = tokio::runtime::Handle::current();
Expand All @@ -47,7 +43,7 @@ where
let mut maybe_handle: Option<Handle> = None;

loop {
match request_rx.try_next().await? {
match request_rx.next().await {
None => return Ok(()),
Some(Request {
validate: Some(validate),
Expand Down
25 changes: 12 additions & 13 deletions crates/flowctl/src/raw/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,6 @@ pub async fn do_capture(
.map(|i| i.clone().into())
.unwrap_or(std::time::Duration::from_secs(1));

let mut ticker = tokio::time::interval(interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
_ = ticker.tick().await; // First tick is immediate.

let mut output = std::io::stdout();

// TODO(johnny): This is currently only partly implemented, but is awaiting
Expand All @@ -147,15 +143,18 @@ pub async fn do_capture(

// Upon a checkpoint, wait until the next tick interval has elapsed before acknowledging.
if let Some(_checkpoint) = response.checkpoint {
_ = ticker.tick().await;

request_tx
.send(Ok(capture::Request {
acknowledge: Some(capture::request::Acknowledge { checkpoints: 1 }),
..Default::default()
}))
.await
.unwrap();
let mut request_tx = request_tx.clone();
tokio::spawn(async move {
() = tokio::time::sleep(interval).await;

request_tx
.feed(Ok(capture::Request {
acknowledge: Some(capture::request::Acknowledge { checkpoints: 1 }),
..Default::default()
}))
.await
.unwrap();
});
}
}

Expand Down
8 changes: 8 additions & 0 deletions crates/ops/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,14 @@ impl<S: Serialize> std::fmt::Debug for DebugJson<S> {
}
}

pub fn merge_docs_and_bytes(from: &stats::DocsAndBytes, to: &mut Option<stats::DocsAndBytes>) {
if from.docs_total != 0 {
let entry = to.get_or_insert_with(Default::default);
entry.docs_total += from.docs_total;
entry.bytes_total += from.bytes_total;
}
}

#[cfg(test)]
mod test {
use super::{Log, LogLevel};
Expand Down
231 changes: 231 additions & 0 deletions crates/runtime/src/capture/connector.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,231 @@
use crate::{unseal, LogHandler, Runtime};
use anyhow::Context;
use futures::{channel::mpsc, stream::BoxStream, FutureExt, StreamExt};
use proto_flow::{
capture::{Request, Response},
flow::capture_spec::ConnectorType,
runtime::CaptureRequestExt,
};

// Start a capture connector as indicated by the `initial` Request.
// Returns a pair of Streams for sending Requests and receiving Responses.
pub async fn start<L: LogHandler>(
runtime: &Runtime<L>,
mut initial: Request,
) -> anyhow::Result<(
mpsc::Sender<Request>,
BoxStream<'static, anyhow::Result<Response>>,
)> {
let (endpoint, log_level, config_json) = extract_endpoint(&mut initial)?;
let (mut connector_tx, connector_rx) = mpsc::channel(crate::CHANNEL_BUFFER);

// Adjust the dynamic log level for this connector's lifecycle.
if let (Some(log_level), Some(set_log_level)) = (log_level, &runtime.set_log_level) {
(set_log_level)(log_level);
}

fn attach_container(response: &mut Response, container: crate::image_connector::Container) {
response.set_internal(|internal| {
internal.container = Some(container);
});
}

fn start_rpc(
channel: tonic::transport::Channel,
rx: mpsc::Receiver<Request>,
) -> crate::image_connector::StartRpcFuture<Response> {
async move {
proto_grpc::capture::connector_client::ConnectorClient::new(channel)
.capture(rx)
.await
}
.boxed()
}

let connector_rx = match endpoint {
models::CaptureEndpoint::Connector(models::ConnectorConfig {
image,
config: sealed_config,
}) => {
*config_json = unseal::decrypt_sops(&sealed_config).await?.to_string();
connector_tx.try_send(initial).unwrap();

crate::image_connector::serve(
attach_container,
image,
runtime.log_handler.clone(),
log_level,
&runtime.container_network,
connector_rx,
start_rpc,
&runtime.task_name,
ops::TaskType::Capture,
)
.await?
.boxed()
}
models::CaptureEndpoint::Local(_) if !runtime.allow_local => {
return Err(tonic::Status::failed_precondition(
"Local connectors are not permitted in this context",
)
.into());
}
models::CaptureEndpoint::Local(models::LocalConfig {
command,
config: sealed_config,
env,
protobuf,
}) => {
*config_json = unseal::decrypt_sops(&sealed_config).await?.to_string();
connector_tx.try_send(initial).unwrap();

crate::local_connector::serve(
command,
env,
runtime.log_handler.clone(),
log_level,
protobuf,
connector_rx,
)?
.boxed()
}
};

Ok((connector_tx, connector_rx))
}

fn extract_endpoint<'r>(
request: &'r mut Request,
) -> anyhow::Result<(
models::CaptureEndpoint,
Option<ops::LogLevel>,
&'r mut String,
)> {
let log_level = match request.get_internal() {
Ok(CaptureRequestExt {
labels: Some(labels),
..
}) => Some(labels.log_level()),
_ => None,
};

let (connector_type, config_json) = match request {
Request {
spec: Some(spec), ..
} => (spec.connector_type, &mut spec.config_json),
Request {
discover: Some(discover),
..
} => (discover.connector_type, &mut discover.config_json),
Request {
validate: Some(validate),
..
} => (validate.connector_type, &mut validate.config_json),
Request {
apply: Some(apply), ..
} => {
let inner = apply
.capture
.as_mut()
.context("`apply` missing required `capture`")?;

(inner.connector_type, &mut inner.config_json)
}
Request {
open: Some(open), ..
} => {
let inner = open
.capture
.as_mut()
.context("`open` missing required `capture`")?;

(inner.connector_type, &mut inner.config_json)
}
_ => {
return Err(crate::protocol_error(
"request does not contain an endpoint",
request,
))
}
};

if connector_type == ConnectorType::Image as i32 {
Ok((
models::CaptureEndpoint::Connector(
serde_json::from_str(config_json).context("parsing connector config")?,
),
log_level,
config_json,
))
} else if connector_type == ConnectorType::Local as i32 {
Ok((
models::CaptureEndpoint::Local(
serde_json::from_str(config_json).context("parsing local config")?,
),
log_level,
config_json,
))
} else {
anyhow::bail!("invalid connector type: {connector_type}");
}
}

#[cfg(test)]
mod test {
use super::*;
use serde_json::json;

#[tokio::test]
async fn test_http_ingest_spec() {
if let Err(_) = locate_bin::locate("flow-connector-init") {
// Skip if `flow-connector-init` isn't available (yet). We're probably on CI.
// This test is useful as a sanity check for local development
// and we have plenty of other coverage during CI.
return;
}

let spec = || {
serde_json::from_value(json!({
"spec": {
"connectorType": "IMAGE",
"config": {
"image": "ghcr.io/estuary/source-http-ingest:dev",
"config": {},
}
}
}))
.unwrap()
};

let runtime = Runtime::new(
true,
"default".to_string(),
ops::tracing_log_handler,
None,
"test".to_string(),
);

let (connector_tx, connector_rx) = start(&runtime, spec()).await.unwrap();
std::mem::drop(connector_tx);

let mut responses: Vec<_> = connector_rx.collect().await;
let resp = responses.pop().unwrap().unwrap();

assert!(resp.spec.is_some());

let container = resp
.get_internal()
.expect("internal decodes")
.container
.expect("internal has attached container");

assert_eq!(
container.network_ports,
[proto_flow::flow::NetworkPort {
number: 8080,
protocol: String::new(),
public: true
}]
);
}
}
Loading

0 comments on commit 3fec844

Please sign in to comment.