Skip to content

Commit

Permalink
dekaf: Add connector_tags support
Browse files Browse the repository at this point in the history
  • Loading branch information
jshearer committed Jan 3, 2025
1 parent f95fcee commit f9391c0
Show file tree
Hide file tree
Showing 2 changed files with 107 additions and 26 deletions.
99 changes: 79 additions & 20 deletions crates/agent/src/connector_tags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ impl Handler for TagHandler {
/// connector_tags without having to push to a registry.
pub const LOCAL_IMAGE_TAG: &str = ":local";

/// Connectors with an image name starting with this value are Dekaf-type materializations
/// and we should not pull the image, as it won't exist. Instead, we invoke Dekaf's own connector impl
pub const DEKAF_IMAGE_NAME_PREFIX: &str = "ghcr.io/estuary/dekaf-";

impl TagHandler {
#[tracing::instrument(err, skip_all, fields(id=?row.tag_id))]
async fn process(
Expand All @@ -100,7 +104,8 @@ impl TagHandler {
);
let image_composed = format!("{}{}", row.image_name, row.image_tag);

if row.image_tag != LOCAL_IMAGE_TAG {
if row.image_tag != LOCAL_IMAGE_TAG && !row.image_name.starts_with(DEKAF_IMAGE_NAME_PREFIX)
{
// Pull the image.
let pull = jobs::run(
"pull",
Expand All @@ -117,29 +122,40 @@ impl TagHandler {
}
}

let proto_type = match runtime::flow_runtime_protocol(&image_composed).await {
Ok(ct) => ct,
Err(err) => {
tracing::warn!(image = %image_composed, error = %err, "failed to determine connector protocol");
return Ok((row.tag_id, JobStatus::SpecFailed));
let proto_type = if row.image_name.starts_with(DEKAF_IMAGE_NAME_PREFIX) {
RuntimeProtocol::Materialize
} else {
match runtime::flow_runtime_protocol(&image_composed).await {
Ok(ct) => ct,
Err(err) => {
tracing::warn!(image = %image_composed, error = %err, "failed to determine connector protocol");
return Ok((row.tag_id, JobStatus::SpecFailed));
}
}
};
let log_handler =
logs::ops_handler(self.logs_tx.clone(), "spec".to_string(), row.logs_token);
let runtime = Runtime::new(
self.allow_local,
self.connector_network.clone(),
log_handler,
None, // no need to change log level
"ops/connector-tags-job".to_string(),
);

let spec_result = match proto_type {
RuntimeProtocol::Capture => spec_capture(&image_composed, runtime).await,
RuntimeProtocol::Materialize => spec_materialization(&image_composed, runtime).await,
RuntimeProtocol::Derive => {
tracing::warn!(image = %image_composed, "unhandled Spec RPC for derivation connector image");
return Ok((row.tag_id, JobStatus::SpecFailed));
let spec_result = if row.image_name.starts_with(DEKAF_IMAGE_NAME_PREFIX) {
spec_dekaf(&image_composed).await
} else {
let runtime = Runtime::new(
self.allow_local,
self.connector_network.clone(),
log_handler,
None, // no need to change log level
"ops/connector-tags-job".to_string(),
);

match proto_type {
RuntimeProtocol::Capture => spec_capture(&image_composed, runtime).await,
RuntimeProtocol::Materialize => {
spec_materialization(&image_composed, runtime).await
}
RuntimeProtocol::Derive => {
tracing::warn!(image = %image_composed, "unhandled Spec RPC for derivation connector image");
return Ok((row.tag_id, JobStatus::SpecFailed));
}
}
};

Expand All @@ -166,7 +182,9 @@ impl TagHandler {
// Validate that there is an x-collection-name annotation in the resource config schema
// of materialization connectors
if proto_type == RuntimeProtocol::Materialize {
if let Err(err) = crate::resource_configs::pointer_for_schema(resource_config_schema.get()) {
if let Err(err) =
crate::resource_configs::pointer_for_schema(resource_config_schema.get())
{
tracing::warn!(image = %image_composed, error = %err, "resource schema does not have x-collection-name annotation");
return Ok((row.tag_id, JobStatus::SpecFailed));
}
Expand Down Expand Up @@ -258,6 +276,47 @@ async fn spec_materialization(
})
}

async fn spec_dekaf(image: &str) -> anyhow::Result<ConnectorSpec> {
use proto_flow::materialize;

let req = materialize::Request {
spec: Some(materialize::request::Spec {
connector_type: flow::materialization_spec::ConnectorType::Image as i32,
config_json: serde_json::to_string(&serde_json::json!({"image": image, "config":{}}))
.unwrap(),
}),
..Default::default()
};

let spec = dekaf::connector::unary_materialize(req)
.await?
.spec
.ok_or_else(|| anyhow::anyhow!("connector didn't send expected Spec response"))?;

let materialize::response::Spec {
protocol: _,
config_schema_json,
resource_config_schema_json,
documentation_url,
oauth2,
} = spec;

let oauth = if let Some(oa) = oauth2 {
Some(serde_json::value::to_raw_value(&oa).expect("serializing oauth2 config"))
} else {
None
};
Ok(ConnectorSpec {
documentation_url,
endpoint_config_schema: RawValue::from_string(config_schema_json)
.context("parsing endpoint config schema")?,
resource_config_schema: RawValue::from_string(resource_config_schema_json)
.context("parsing resource config schema")?,
resource_path_pointers: Vec::new(),
oauth2: oauth,
})
}

async fn spec_capture(
image: &str,
runtime: Runtime<impl LogHandler>,
Expand Down
34 changes: 28 additions & 6 deletions crates/dekaf/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,6 @@ impl Default for DeletionMode {
/// Configures the behavior of a whole dekaf task
#[derive(Serialize, Deserialize, Clone, Debug, PartialEq, JsonSchema)]
pub struct DekafConfig {
/// Whether or not to expose topic names in a strictly Kafka-compliant format
/// for systems that require it. Off by default.
pub strict_topic_names: bool,
/// The password that will authenticate Kafka consumers to this task.
// TODO(jshearer): Uncomment when schemars 1.0 is out and we upgrade
// #[schemars(extend("secret" = true))]
Expand All @@ -39,7 +36,13 @@ pub struct DekafConfig {
/// with empty string and `_is_deleted` header set to `1`. Setting this value
/// will also cause all other non-deletions to have an `_is_deleted` header of `0`.
#[serde(default)]
#[schemars(title = "Deletion Mode")]
pub deletions: DeletionMode,
/// Whether or not to expose topic names in a strictly Kafka-compliant format
/// for systems that require it. Off by default.
#[serde(default)]
#[schemars(title = "Strict Topic Names")]
pub strict_topic_names: bool,
}

/// Configures a particular binding in a Dekaf-type materialization
Expand All @@ -54,23 +57,42 @@ pub struct DekafResourceConfig {
fn collection_name(_gen: &mut schemars::gen::SchemaGenerator) -> schemars::schema::Schema {
serde_json::from_value(serde_json::json!({
"x-collection-name": true,
"type": "string"
}))
.unwrap()
}

fn token_secret(_gen: &mut schemars::gen::SchemaGenerator) -> schemars::schema::Schema {
serde_json::from_value(serde_json::json!({
"title": "Dekaf Auth Token",
"title": "Auth Token",
"secret": true,
"type": "string",
"order": 0
}))
.unwrap()
}

pub async fn unary_materialize(
request: materialize::Request,
) -> anyhow::Result<materialize::Response> {
use proto_flow::materialize::response::validated;
if let Some(mut validate) = request.validate {
if let Some(_) = request.spec {
let config_schema = schemars::schema_for!(DekafConfig);
let resource_schema = schemars::schema_for!(DekafResourceConfig);

return Ok(materialize::Response {
spec: Some(materialize::response::Spec {
protocol: 3032023,
config_schema_json: serde_json::to_string(&config_schema)?,
resource_config_schema_json: serde_json::to_string(&resource_schema)?,
documentation_url:
"https://docs.estuary.dev/guides/dekaf_reading_collections_from_kafka"
.to_string(),
oauth2: None,
}),
..Default::default()
});
} else if let Some(mut validate) = request.validate {
use proto_flow::materialize::response::validated;
match materialization_spec::ConnectorType::try_from(validate.connector_type)? {
materialization_spec::ConnectorType::Dekaf => {}
other => bail!("invalid connector type: {}", other.as_str_name()),
Expand Down

0 comments on commit f9391c0

Please sign in to comment.