diff --git a/crates/flowctl/src/raw/mod.rs b/crates/flowctl/src/raw/mod.rs index ec626d0b69..8ff974ea88 100644 --- a/crates/flowctl/src/raw/mod.rs +++ b/crates/flowctl/src/raw/mod.rs @@ -9,6 +9,7 @@ use std::{ mod capture; mod discover; mod materialize_fixture; +mod spec; mod suggest_schema; #[derive(Debug, clap::Args)] @@ -52,6 +53,8 @@ pub enum Command { Discover(discover::Discover), /// Run a capture connector and combine its documents Capture(capture::Capture), + /// Get the spec output of a connector + Spec(spec::Spec), /// Suggest a schema that would alleviate document schema violations of a specific collection SuggestSchema(suggest_schema::SuggestSchema), /// Emit the Flow specification JSON-Schema. @@ -145,6 +148,7 @@ impl Advanced { } Command::Discover(args) => discover::do_discover(ctx, args).await, Command::Capture(args) => capture::do_capture(ctx, args).await, + Command::Spec(args) => spec::do_spec(ctx, args).await, Command::SuggestSchema(args) => suggest_schema::do_suggest_schema(ctx, args).await, Command::JsonSchema => { let schema = models::Catalog::root_json_schema(); diff --git a/crates/flowctl/src/raw/spec.rs b/crates/flowctl/src/raw/spec.rs new file mode 100644 index 0000000000..6093a0a07b --- /dev/null +++ b/crates/flowctl/src/raw/spec.rs @@ -0,0 +1,94 @@ +use crate::local_specs; +use anyhow::Context; +use proto_flow::{capture, flow}; + +#[derive(Debug, clap::Args)] +#[clap(rename_all = "kebab-case")] +pub struct Spec { + /// Path or URL to a Flow specification file. + #[clap(long)] + source: String, + /// Name of the capture to discover within the Flow specification file. + /// Capture is required if there are multiple captures in --source specifications. + #[clap(long)] + capture: Option, + /// Docker network to run the connector, if one exists + #[clap(long, default_value = "bridge")] + network: String, +} + +pub async fn do_spec( + _ctx: &mut crate::CliContext, + Spec { + source, + capture, + network, + }: &Spec, +) -> anyhow::Result<()> { + let source = build::arg_source_to_url(source, false)?; + let mut sources = local_specs::surface_errors(local_specs::load(&source).await.into_result())?; + + // Identify the capture to inspect. + let needle = if let Some(needle) = capture { + needle.as_str() + } else if sources.captures.len() == 1 { + sources.captures.first().unwrap().capture.as_str() + } else if sources.captures.is_empty() { + anyhow::bail!("sourced specification files do not contain any captures"); + } else { + anyhow::bail!("sourced specification files contain multiple captures. Use --capture to identify a specific one"); + }; + + let capture = match sources + .captures + .binary_search_by_key(&needle, |c| c.capture.as_str()) + { + Ok(index) => &mut sources.captures[index], + Err(_) => anyhow::bail!("could not find the capture {needle}"), + }; + + let spec_req = match &capture.spec.endpoint { + models::CaptureEndpoint::Connector(config) => capture::request::Spec { + connector_type: flow::capture_spec::ConnectorType::Image as i32, + config_json: serde_json::to_string(&config).unwrap(), + }, + models::CaptureEndpoint::Local(config) => capture::request::Spec { + connector_type: flow::capture_spec::ConnectorType::Local as i32, + config_json: serde_json::to_string(config).unwrap(), + }, + }; + let mut spec_req = capture::Request { + spec: Some(spec_req), + ..Default::default() + }; + + if let Some(log_level) = capture + .spec + .shards + .log_level + .as_ref() + .and_then(|s| ops::LogLevel::from_str_name(s)) + { + spec_req.set_internal_log_level(log_level); + } + + let spec_response = runtime::Runtime::new( + true, // All local. + network.clone(), + ops::tracing_log_handler, + None, + format!("spec/{}", capture.capture), + ) + .unary_capture(spec_req, build::CONNECTOR_TIMEOUT) + .await + .map_err(crate::status_to_anyhow)? + .spec + .context("connector didn't send expected Spec response")?; + + let serialized = + serde_json::to_string(&spec_response).context("Failed to serialize spec response")?; + + println!("{}", serialized); + + Ok(()) +}