From e1fd0a7eff6b0752a5a740c45d826da1b2a1a0fe Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Fri, 13 Oct 2023 12:02:41 -0400 Subject: [PATCH] Add `flowctl raw spec` to invoke the Spec RPC. This allows for calling both local and container connectors' spec RPC manually, and inspecting the output. It's also useful for manually invoking the RPC to enable debugging connectors. --- crates/flowctl/src/raw/mod.rs | 4 ++ crates/flowctl/src/raw/spec.rs | 94 ++++++++++++++++++++++++++++++++++ 2 files changed, 98 insertions(+) create mode 100644 crates/flowctl/src/raw/spec.rs diff --git a/crates/flowctl/src/raw/mod.rs b/crates/flowctl/src/raw/mod.rs index ec626d0b69..8ff974ea88 100644 --- a/crates/flowctl/src/raw/mod.rs +++ b/crates/flowctl/src/raw/mod.rs @@ -9,6 +9,7 @@ use std::{ mod capture; mod discover; mod materialize_fixture; +mod spec; mod suggest_schema; #[derive(Debug, clap::Args)] @@ -52,6 +53,8 @@ pub enum Command { Discover(discover::Discover), /// Run a capture connector and combine its documents Capture(capture::Capture), + /// Get the spec output of a connector + Spec(spec::Spec), /// Suggest a schema that would alleviate document schema violations of a specific collection SuggestSchema(suggest_schema::SuggestSchema), /// Emit the Flow specification JSON-Schema. @@ -145,6 +148,7 @@ impl Advanced { } Command::Discover(args) => discover::do_discover(ctx, args).await, Command::Capture(args) => capture::do_capture(ctx, args).await, + Command::Spec(args) => spec::do_spec(ctx, args).await, Command::SuggestSchema(args) => suggest_schema::do_suggest_schema(ctx, args).await, Command::JsonSchema => { let schema = models::Catalog::root_json_schema(); diff --git a/crates/flowctl/src/raw/spec.rs b/crates/flowctl/src/raw/spec.rs new file mode 100644 index 0000000000..6093a0a07b --- /dev/null +++ b/crates/flowctl/src/raw/spec.rs @@ -0,0 +1,94 @@ +use crate::local_specs; +use anyhow::Context; +use proto_flow::{capture, flow}; + +#[derive(Debug, clap::Args)] +#[clap(rename_all = "kebab-case")] +pub struct Spec { + /// Path or URL to a Flow specification file. + #[clap(long)] + source: String, + /// Name of the capture to discover within the Flow specification file. + /// Capture is required if there are multiple captures in --source specifications. + #[clap(long)] + capture: Option, + /// Docker network to run the connector, if one exists + #[clap(long, default_value = "bridge")] + network: String, +} + +pub async fn do_spec( + _ctx: &mut crate::CliContext, + Spec { + source, + capture, + network, + }: &Spec, +) -> anyhow::Result<()> { + let source = build::arg_source_to_url(source, false)?; + let mut sources = local_specs::surface_errors(local_specs::load(&source).await.into_result())?; + + // Identify the capture to inspect. + let needle = if let Some(needle) = capture { + needle.as_str() + } else if sources.captures.len() == 1 { + sources.captures.first().unwrap().capture.as_str() + } else if sources.captures.is_empty() { + anyhow::bail!("sourced specification files do not contain any captures"); + } else { + anyhow::bail!("sourced specification files contain multiple captures. Use --capture to identify a specific one"); + }; + + let capture = match sources + .captures + .binary_search_by_key(&needle, |c| c.capture.as_str()) + { + Ok(index) => &mut sources.captures[index], + Err(_) => anyhow::bail!("could not find the capture {needle}"), + }; + + let spec_req = match &capture.spec.endpoint { + models::CaptureEndpoint::Connector(config) => capture::request::Spec { + connector_type: flow::capture_spec::ConnectorType::Image as i32, + config_json: serde_json::to_string(&config).unwrap(), + }, + models::CaptureEndpoint::Local(config) => capture::request::Spec { + connector_type: flow::capture_spec::ConnectorType::Local as i32, + config_json: serde_json::to_string(config).unwrap(), + }, + }; + let mut spec_req = capture::Request { + spec: Some(spec_req), + ..Default::default() + }; + + if let Some(log_level) = capture + .spec + .shards + .log_level + .as_ref() + .and_then(|s| ops::LogLevel::from_str_name(s)) + { + spec_req.set_internal_log_level(log_level); + } + + let spec_response = runtime::Runtime::new( + true, // All local. + network.clone(), + ops::tracing_log_handler, + None, + format!("spec/{}", capture.capture), + ) + .unary_capture(spec_req, build::CONNECTOR_TIMEOUT) + .await + .map_err(crate::status_to_anyhow)? + .spec + .context("connector didn't send expected Spec response")?; + + let serialized = + serde_json::to_string(&spec_response).context("Failed to serialize spec response")?; + + println!("{}", serialized); + + Ok(()) +}