Skip to content

Commit

Permalink
Add flowctl raw spec to invoke the Spec RPC.
Browse files Browse the repository at this point in the history
This allows for calling both local and container connectors' spec RPC manually, and inspecting the output. It's also useful for manually invoking the RPC to enable debugging connectors.
  • Loading branch information
jshearer committed Oct 13, 2023
1 parent cf5f62a commit e1fd0a7
Show file tree
Hide file tree
Showing 2 changed files with 98 additions and 0 deletions.
4 changes: 4 additions & 0 deletions crates/flowctl/src/raw/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::{
mod capture;
mod discover;
mod materialize_fixture;
mod spec;
mod suggest_schema;

#[derive(Debug, clap::Args)]
Expand Down Expand Up @@ -52,6 +53,8 @@ pub enum Command {
Discover(discover::Discover),
/// Run a capture connector and combine its documents
Capture(capture::Capture),
/// Get the spec output of a connector
Spec(spec::Spec),
/// Suggest a schema that would alleviate document schema violations of a specific collection
SuggestSchema(suggest_schema::SuggestSchema),
/// Emit the Flow specification JSON-Schema.
Expand Down Expand Up @@ -145,6 +148,7 @@ impl Advanced {
}
Command::Discover(args) => discover::do_discover(ctx, args).await,
Command::Capture(args) => capture::do_capture(ctx, args).await,
Command::Spec(args) => spec::do_spec(ctx, args).await,
Command::SuggestSchema(args) => suggest_schema::do_suggest_schema(ctx, args).await,
Command::JsonSchema => {
let schema = models::Catalog::root_json_schema();
Expand Down
94 changes: 94 additions & 0 deletions crates/flowctl/src/raw/spec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
use crate::local_specs;
use anyhow::Context;
use proto_flow::{capture, flow};

#[derive(Debug, clap::Args)]
#[clap(rename_all = "kebab-case")]
pub struct Spec {
/// Path or URL to a Flow specification file.
#[clap(long)]
source: String,
/// Name of the capture to discover within the Flow specification file.
/// Capture is required if there are multiple captures in --source specifications.
#[clap(long)]
capture: Option<String>,
/// Docker network to run the connector, if one exists
#[clap(long, default_value = "bridge")]
network: String,
}

pub async fn do_spec(
_ctx: &mut crate::CliContext,
Spec {
source,
capture,
network,
}: &Spec,
) -> anyhow::Result<()> {
let source = build::arg_source_to_url(source, false)?;
let mut sources = local_specs::surface_errors(local_specs::load(&source).await.into_result())?;

// Identify the capture to inspect.
let needle = if let Some(needle) = capture {
needle.as_str()
} else if sources.captures.len() == 1 {
sources.captures.first().unwrap().capture.as_str()
} else if sources.captures.is_empty() {
anyhow::bail!("sourced specification files do not contain any captures");
} else {
anyhow::bail!("sourced specification files contain multiple captures. Use --capture to identify a specific one");
};

let capture = match sources
.captures
.binary_search_by_key(&needle, |c| c.capture.as_str())
{
Ok(index) => &mut sources.captures[index],
Err(_) => anyhow::bail!("could not find the capture {needle}"),
};

let spec_req = match &capture.spec.endpoint {
models::CaptureEndpoint::Connector(config) => capture::request::Spec {
connector_type: flow::capture_spec::ConnectorType::Image as i32,
config_json: serde_json::to_string(&config).unwrap(),
},
models::CaptureEndpoint::Local(config) => capture::request::Spec {
connector_type: flow::capture_spec::ConnectorType::Local as i32,
config_json: serde_json::to_string(config).unwrap(),
},
};
let mut spec_req = capture::Request {
spec: Some(spec_req),
..Default::default()
};

if let Some(log_level) = capture
.spec
.shards
.log_level
.as_ref()
.and_then(|s| ops::LogLevel::from_str_name(s))
{
spec_req.set_internal_log_level(log_level);
}

let spec_response = runtime::Runtime::new(
true, // All local.
network.clone(),
ops::tracing_log_handler,
None,
format!("spec/{}", capture.capture),
)
.unary_capture(spec_req, build::CONNECTOR_TIMEOUT)
.await
.map_err(crate::status_to_anyhow)?
.spec
.context("connector didn't send expected Spec response")?;

let serialized =
serde_json::to_string(&spec_response).context("Failed to serialize spec response")?;

println!("{}", serialized);

Ok(())
}

0 comments on commit e1fd0a7

Please sign in to comment.