Skip to content

Commit

Permalink
runtime: Move dekaf image check to flow_runtime_protocol
Browse files Browse the repository at this point in the history
  • Loading branch information
jshearer committed Jan 6, 2025
1 parent ab48fff commit 75cddd3
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 17 deletions.
25 changes: 9 additions & 16 deletions crates/agent/src/connector_tags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,6 @@ impl Handler for TagHandler {
/// connector_tags without having to push to a registry.
pub const LOCAL_IMAGE_TAG: &str = ":local";

/// Connectors with an image name starting with this value are Dekaf-type materializations and we should
/// not pull the image, as it won't exist. Instead, we mark them as having `connector_type: ConnectorType::Dekaf`
/// so that `Runtime` will invoke Dekaf's in-tree connector implementation
pub const DEKAF_IMAGE_NAME_PREFIX: &str = "ghcr.io/estuary/dekaf-";

impl TagHandler {
#[tracing::instrument(err, skip_all, fields(id=?row.tag_id))]
async fn process(
Expand All @@ -105,7 +100,8 @@ impl TagHandler {
);
let image_composed = format!("{}{}", row.image_name, row.image_tag);

if row.image_tag != LOCAL_IMAGE_TAG && !row.image_name.starts_with(DEKAF_IMAGE_NAME_PREFIX)
if row.image_tag != LOCAL_IMAGE_TAG
&& !row.image_name.starts_with(runtime::DEKAF_IMAGE_NAME_PREFIX)
{
// Pull the image.
let pull = jobs::run(
Expand All @@ -123,17 +119,14 @@ impl TagHandler {
}
}

let proto_type = if row.image_name.starts_with(DEKAF_IMAGE_NAME_PREFIX) {
RuntimeProtocol::Materialize
} else {
match runtime::flow_runtime_protocol(&image_composed).await {
Ok(ct) => ct,
Err(err) => {
tracing::warn!(image = %image_composed, error = %err, "failed to determine connector protocol");
return Ok((row.tag_id, JobStatus::SpecFailed));
}
let proto_type = match runtime::flow_runtime_protocol(&image_composed).await {
Ok(ct) => ct,
Err(err) => {
tracing::warn!(image = %image_composed, error = %err, "failed to determine connector protocol");
return Ok((row.tag_id, JobStatus::SpecFailed));
}
};

let log_handler =
logs::ops_handler(self.logs_tx.clone(), "spec".to_string(), row.logs_token);

Expand Down Expand Up @@ -229,7 +222,7 @@ async fn spec_materialization(
) -> anyhow::Result<ConnectorSpec> {
use proto_flow::materialize;

let connector_type = if image.starts_with(DEKAF_IMAGE_NAME_PREFIX) {
let connector_type = if image.starts_with(runtime::DEKAF_IMAGE_NAME_PREFIX) {
flow::materialization_spec::ConnectorType::Dekaf as i32
} else {
flow::materialization_spec::ConnectorType::Image as i32
Expand Down
9 changes: 9 additions & 0 deletions crates/runtime/src/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,20 @@ const PORT_PROTO_LABEL_PREFIX: &str = "dev.estuary.port-proto.";
const CONNECTOR_INIT_IMAGE: &str = "ghcr.io/estuary/flow:v0.5.7-119-g552f6c0ee2";
const CONNECTOR_INIT_IMAGE_PATH: &str = "/usr/local/bin/flow-connector-init";

/// Connectors with an image name starting with this value are Dekaf-type materializations and we should
/// not pull the image, as it won't exist. Instead, we mark them as having `connector_type: ConnectorType::Dekaf`
/// so that `Runtime` will invoke Dekaf's in-tree connector implementation
pub const DEKAF_IMAGE_NAME_PREFIX: &str = "ghcr.io/estuary/dekaf-";

/// Determines the protocol of an image. If the image has a `FLOW_RUNTIME_PROTOCOL` label,
/// then it's value is used. Otherwise, this will apply a simple heuristic based on the image name,
/// for backward compatibility purposes. An error will be returned if it fails to inspect the image
/// or parse the label. The image must already have been pulled before calling this function.
pub async fn flow_runtime_protocol(image: &str) -> anyhow::Result<RuntimeProtocol> {
if image.starts_with(DEKAF_IMAGE_NAME_PREFIX) {
return Ok(RuntimeProtocol::Materialize);
}

let inspect_output = docker_cmd(&["inspect", image])
.await
.context("inspecting image")?;
Expand Down
2 changes: 1 addition & 1 deletion crates/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ mod tokio_context;
mod unary;
pub mod uuid;

pub use container::flow_runtime_protocol;
pub use container::{flow_runtime_protocol, DEKAF_IMAGE_NAME_PREFIX};
pub use task_service::TaskService;
pub use tokio_context::TokioContext;

Expand Down

0 comments on commit 75cddd3

Please sign in to comment.