From e470818b53e20a486b113e169e23e1b5aa21ba8e Mon Sep 17 00:00:00 2001 From: Joseph Shearer Date: Fri, 3 Jan 2025 18:05:01 -0500 Subject: [PATCH] runtime: Move Dekaf invocations into `Runtime::unary_materialize()` In order for field selection to work in the UI, it needs to be able to validate a draft with a Dekaf connector in it. This detects when the request is for a Dekaf materialization and re-routes it to Dekaf's `unary_materialize`. It also means that the `connector_tags` job can work for Dekaf with a small change to detect image names starting in `ghcr.io/estuary/dekaf-*` and mark them as `ConnectorType::Dekaf`. --- Cargo.lock | 1 + crates/agent/src/connector_tags.rs | 83 ++++++++---------------------- crates/build/src/lib.rs | 9 +--- crates/runtime/Cargo.toml | 1 + crates/runtime/src/unary.rs | 22 ++++++-- 5 files changed, 43 insertions(+), 73 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f17efc967e..25de0124d4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5409,6 +5409,7 @@ dependencies = [ "clap 4.5.17", "connector-init", "coroutines", + "dekaf", "derive-sqlite", "doc", "extractors", diff --git a/crates/agent/src/connector_tags.rs b/crates/agent/src/connector_tags.rs index dc63448975..ba8f827c4a 100644 --- a/crates/agent/src/connector_tags.rs +++ b/crates/agent/src/connector_tags.rs @@ -137,26 +137,20 @@ impl TagHandler { let log_handler = logs::ops_handler(self.logs_tx.clone(), "spec".to_string(), row.logs_token); - let spec_result = if row.image_name.starts_with(DEKAF_IMAGE_NAME_PREFIX) { - spec_dekaf(&image_composed).await - } else { - let runtime = Runtime::new( - self.allow_local, - self.connector_network.clone(), - log_handler, - None, // no need to change log level - "ops/connector-tags-job".to_string(), - ); - - match proto_type { - RuntimeProtocol::Capture => spec_capture(&image_composed, runtime).await, - RuntimeProtocol::Materialize => { - spec_materialization(&image_composed, runtime).await - } - RuntimeProtocol::Derive => { - tracing::warn!(image = %image_composed, "unhandled Spec RPC for derivation connector image"); - return Ok((row.tag_id, JobStatus::SpecFailed)); - } + let runtime = Runtime::new( + self.allow_local, + self.connector_network.clone(), + log_handler, + None, // no need to change log level + "ops/connector-tags-job".to_string(), + ); + + let spec_result = match proto_type { + RuntimeProtocol::Capture => spec_capture(&image_composed, runtime).await, + RuntimeProtocol::Materialize => spec_materialization(&image_composed, runtime).await, + RuntimeProtocol::Derive => { + tracing::warn!(image = %image_composed, "unhandled Spec RPC for derivation connector image"); + return Ok((row.tag_id, JobStatus::SpecFailed)); } }; @@ -235,9 +229,15 @@ async fn spec_materialization( ) -> anyhow::Result { use proto_flow::materialize; + let connector_type = if image.starts_with(DEKAF_IMAGE_NAME_PREFIX) { + flow::materialization_spec::ConnectorType::Dekaf as i32 + } else { + flow::materialization_spec::ConnectorType::Image as i32 + }; + let req = materialize::Request { spec: Some(materialize::request::Spec { - connector_type: flow::materialization_spec::ConnectorType::Image as i32, + connector_type, config_json: serde_json::to_string(&serde_json::json!({"image": image, "config":{}})) .unwrap(), }), @@ -277,47 +277,6 @@ async fn spec_materialization( }) } -async fn spec_dekaf(image: &str) -> anyhow::Result { - use proto_flow::materialize; - - let req = materialize::Request { - spec: Some(materialize::request::Spec { - connector_type: flow::materialization_spec::ConnectorType::Image as i32, - config_json: serde_json::to_string(&serde_json::json!({"image": image, "config":{}})) - .unwrap(), - }), - ..Default::default() - }; - - let spec = dekaf::connector::unary_materialize(req) - .await? - .spec - .ok_or_else(|| anyhow::anyhow!("connector didn't send expected Spec response"))?; - - let materialize::response::Spec { - protocol: _, - config_schema_json, - resource_config_schema_json, - documentation_url, - oauth2, - } = spec; - - let oauth = if let Some(oa) = oauth2 { - Some(serde_json::value::to_raw_value(&oa).expect("serializing oauth2 config")) - } else { - None - }; - Ok(ConnectorSpec { - documentation_url, - endpoint_config_schema: RawValue::from_string(config_schema_json) - .context("parsing endpoint config schema")?, - resource_config_schema: RawValue::from_string(resource_config_schema_json) - .context("parsing resource config schema")?, - resource_path_pointers: Vec::new(), - oauth2: oauth, - }) -} - async fn spec_capture( image: &str, runtime: Runtime, diff --git a/crates/build/src/lib.rs b/crates/build/src/lib.rs index 0ea4cdb116..292e6b9886 100644 --- a/crates/build/src/lib.rs +++ b/crates/build/src/lib.rs @@ -376,14 +376,7 @@ impl validation::Connectors for RuntimeConnectors { request: materialize::Request, _data_plane: &'a tables::DataPlane, ) -> BoxFuture<'a, anyhow::Result> { - match flow::materialization_spec::ConnectorType::try_from( - request.validate.as_ref().unwrap().connector_type, - ) { - Ok(flow::materialization_spec::ConnectorType::Dekaf) => { - dekaf::connector::unary_materialize(request).boxed() - } - _ => self.runtime.clone().unary_materialize(request).boxed(), - } + self.runtime.clone().unary_materialize(request).boxed() } } diff --git a/crates/runtime/Cargo.toml b/crates/runtime/Cargo.toml index 2bbe9ec921..d256d071ed 100644 --- a/crates/runtime/Cargo.toml +++ b/crates/runtime/Cargo.toml @@ -13,6 +13,7 @@ assemble = { path = "../assemble" } async-process = { path = "../async-process" } connector-init = { path = "../connector-init" } coroutines = { path = "../coroutines" } +dekaf = { path = "../dekaf" } derive-sqlite = { path = "../derive-sqlite" } doc = { path = "../doc" } extractors = { path = "../extractors" } diff --git a/crates/runtime/src/unary.rs b/crates/runtime/src/unary.rs index 7ad7e28b82..8dc4204fe8 100644 --- a/crates/runtime/src/unary.rs +++ b/crates/runtime/src/unary.rs @@ -1,6 +1,6 @@ use super::{LogHandler, Runtime}; use futures::{stream::BoxStream, StreamExt, TryStreamExt}; -use proto_flow::{capture, derive, materialize}; +use proto_flow::{capture, derive, flow::materialization_spec, materialize}; impl Runtime { pub async fn unary_capture( @@ -20,8 +20,24 @@ impl Runtime { self, request: materialize::Request, ) -> anyhow::Result { - let response = self.serve_materialize(unary_in(request)).boxed(); - unary_out(response).await + let is_dekaf = request.spec.as_ref().is_some_and(|spec| { + matches!( + spec.connector_type(), + materialization_spec::ConnectorType::Dekaf + ) + }) || request.validate.as_ref().is_some_and(|validate| { + matches!( + validate.connector_type(), + materialization_spec::ConnectorType::Dekaf + ) + }); + + if is_dekaf { + dekaf::connector::unary_materialize(request).await + } else { + let unary_resp = self.serve_materialize(unary_in(request)).boxed(); + unary_out(unary_resp).await + } } }