diff --git a/Cargo.lock b/Cargo.lock index ac0b449287..3388076f3a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1949,6 +1949,7 @@ dependencies = [ "crypto-common", "deadpool", "doc", + "extractors", "flow-client", "futures", "gazette", @@ -1992,6 +1993,7 @@ dependencies = [ "tower-http", "tracing", "tracing-subscriber", + "tuple", "typestate", "unseal", "url", diff --git a/crates/dekaf/Cargo.toml b/crates/dekaf/Cargo.toml index 2312b6c35c..fb5fd12a32 100644 --- a/crates/dekaf/Cargo.toml +++ b/crates/dekaf/Cargo.toml @@ -13,6 +13,8 @@ license.workspace = true allocator = { path = "../allocator" } avro = { path = "../avro" } doc = { path = "../doc" } +extractors = { path = "../extractors" } +tuple = { path = "../tuple" } flow-client = { path = "../flow-client" } gazette = { path = "../gazette" } json = { path = "../json" } diff --git a/crates/dekaf/src/connector.rs b/crates/dekaf/src/connector.rs index b6ce4d3b42..e117fc3f72 100644 --- a/crates/dekaf/src/connector.rs +++ b/crates/dekaf/src/connector.rs @@ -1,4 +1,6 @@ use anyhow::{bail, Context}; +use futures::future::try_join_all; +use models::RawValue; use proto_flow::{flow::materialization_spec, materialize}; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -116,34 +118,73 @@ pub async fn unary_materialize( parsed_outer_config.variant ))?; - // Largely copied from crates/validation/src/noop.rs - let validated_bindings = std::mem::take(&mut validate.bindings) - .into_iter() - .enumerate() - .map(|(i, b)| { - let resource_path = vec![format!("binding-{}", i)]; - let constraints = b - .collection - .expect("collection must exist") - .projections - .into_iter() - .map(|proj| { - ( - proj.field, - validated::Constraint { - r#type: validated::constraint::Type::FieldOptional as i32, - reason: "Dekaf allows everything for now".to_string(), + let validated_bindings = try_join_all( + std::mem::take(&mut validate.bindings) + .into_iter() + .map(|binding| { + let resource_config_str = binding.resource_config_json.clone(); + let endpoint_config = parsed_outer_config.clone(); + async move { + let resource_config = serde_json::from_value::( + unseal::decrypt_sops(&RawValue::from_str(&resource_config_str)?) + .await + .context(format!( + "decrypting dekaf resource config for variant {}", + endpoint_config.variant + ))? + .to_value(), + ) + .context(format!( + "validating dekaf resource config for variant {}", + endpoint_config.variant + ))?; + + let collection = binding.collection.expect("collection must exist"); + + let mut constraints = BTreeMap::new(); + + // Keys are always required + for key_field in collection.key.iter() { + constraints.insert( + // Constraint keys should not have a leading slash, while collection keys do + key_field[1..].to_string(), + validated::Constraint { + r#type: validated::constraint::Type::FieldRequired as i32, + reason: "Key fields are required for Dekaf materialization" + .to_string(), + }, + ); + } + + // Process other fields from projections + for proj in collection.projections { + // Skip if it's already a key field + if collection.key.contains(&proj.field) { + continue; + } + + // Should this be recommending locations instead of fields? + constraints.insert( + proj.field, + validated::Constraint { + r#type: validated::constraint::Type::FieldOptional as i32, + reason: "All fields are recommended for Dekaf materialization" + .to_string(), + }, + ); + } + + Ok::( + validated::Binding { + constraints, + resource_path: vec![resource_config.topic_name], + delta_updates: false, }, ) - }) - .collect::>(); - validated::Binding { - constraints, - resource_path, - delta_updates: false, - } - }) - .collect::>(); + } + }), + ) + .await?; return Ok(materialize::Response { validated: Some(materialize::response::Validated { diff --git a/crates/dekaf/src/read.rs b/crates/dekaf/src/read.rs index 7f5f3afc16..92232beb2b 100644 --- a/crates/dekaf/src/read.rs +++ b/crates/dekaf/src/read.rs @@ -2,7 +2,8 @@ use super::{Collection, Partition}; use crate::connector::DeletionMode; use anyhow::{bail, Context}; use bytes::{Buf, BufMut, BytesMut}; -use doc::{AsNode, HeapNode}; +use doc::{AsNode, BumpVec, HeapNode}; +use extractors; use futures::StreamExt; use gazette::journal::{ReadJsonLine, ReadJsonLines}; use gazette::{broker, journal, uuid}; @@ -26,6 +27,7 @@ pub struct Read { uuid_ptr: doc::Pointer, // Location of document UUID. value_schema: avro::Schema, // Avro schema when encoding values. value_schema_id: u32, // Registry ID of the value's schema. + projections: Vec, // Projections to apply // Keep these details around so we can create a new ReadRequest if we need to skip forward journal_name: String, @@ -97,6 +99,7 @@ impl Read { uuid_ptr: collection.uuid_ptr.clone(), value_schema: collection.value_schema.clone(), value_schema_id, + projections: collection.projections.clone(), journal_name: partition.spec.name.clone(), rewrite_offsets_from, @@ -266,33 +269,65 @@ impl Read { }; // Encode the value. - let value = - if is_control || (is_deletion && matches!(self.deletes, DeletionMode::Kafka)) { - None - } else { - tmp.push(0); - tmp.extend(self.value_schema_id.to_be_bytes()); - - if matches!(self.deletes, DeletionMode::CDC) { - let mut heap_node = HeapNode::from_node(root.get(), &alloc); - let foo = DELETION_INDICATOR_PTR - .create_heap_node(&mut heap_node, &alloc) - .context("Unable to add deletion meta indicator")?; - - *foo = HeapNode::PosInt(if is_deletion { 1 } else { 0 }); - - () = avro::encode(&mut tmp, &self.value_schema, &heap_node)?; - - alloc.reset(); - } else { - () = avro::encode(&mut tmp, &self.value_schema, root.get())?; - } + let value = if is_control + || (is_deletion && matches!(self.deletes, DeletionMode::Kafka)) + { + None + } else { + tmp.push(0); + tmp.extend(self.value_schema_id.to_be_bytes()); + + let mut value_doc = HeapNode::Object(BumpVec::new()); + + // Create extractors for key fields and projections + let mut projections = self.projections.clone(); + // Add key fields as projections + projections.extend(self.key_ptr.iter().map(|ptr| proto_flow::flow::Projection { + ptr: ptr.to_string(), + field: ptr.to_string(), // Using ptr as field name for key fields + inference: Some(proto_flow::flow::Inference::default()), + explicit: true, + ..Default::default() + })); + + let policy = doc::SerPolicy::noop(); + let extractors = extractors::for_fields( + &projections.iter().map(|p| &p.ptr).collect::>(), + &projections, + &policy, + ) + .context("creating field extractors")?; + + let mut value_doc = HeapNode::Object(BumpVec::new()); + + // Extract all fields by querying each extractor + for (projection, extractor) in projections.iter().zip(extractors.iter()) { + let ptr = doc::Pointer::from_str(&projection.ptr); + let extracted = match extractor.query(root.get()) { + Ok(value) => HeapNode::from_node(value, &alloc), + Err(default) => HeapNode::from_node(&default.into_owned(), &alloc), + }; + + ptr.create_heap_node(&mut value_doc, &alloc) + .map(|node| *node = extracted) + .with_context(|| format!("Failed to project field {}", projection.ptr))?; + } + + if matches!(self.deletes, DeletionMode::CDC) { + let foo = DELETION_INDICATOR_PTR + .create_heap_node(&mut value_doc, &alloc) + .context("Unable to add deletion meta indicator")?; + + *foo = HeapNode::PosInt(if is_deletion { 1 } else { 0 }); + } - record_bytes += tmp.len(); - buf.extend_from_slice(&tmp); - tmp.clear(); - Some(buf.split().freeze()) - }; + () = avro::encode(&mut tmp, &self.value_schema, &value_doc)?; + + record_bytes += tmp.len(); + buf.extend_from_slice(&tmp); + tmp.clear(); + Some(buf.split().freeze()) + }; self.offset = next_offset; diff --git a/crates/dekaf/src/topology.rs b/crates/dekaf/src/topology.rs index 8308c3b2a7..9a0df4e649 100644 --- a/crates/dekaf/src/topology.rs +++ b/crates/dekaf/src/topology.rs @@ -103,6 +103,7 @@ pub struct Collection { pub spec: flow::CollectionSpec, pub uuid_ptr: doc::Pointer, pub value_schema: avro::Schema, + pub projections: Vec, } /// Partition is a collection journal which is mapped into a stable Kafka partition order. @@ -183,6 +184,9 @@ impl Collection { spec.key.iter().map(|p| doc::Pointer::from_str(p)).collect(); let uuid_ptr = doc::Pointer::from_str(&spec.uuid_ptr); + // Extract projections from the spec + let projections = spec.projections.clone(); + let json_schema = if spec.read_schema_json.is_empty() { &spec.write_schema_json } else { @@ -193,8 +197,27 @@ impl Collection { let validator = doc::Validator::new(json_schema)?; let mut shape = doc::Shape::infer(&validator.schemas()[0], validator.schema_index()); + // Create value shape by merging all projected fields in the schema + let mut value_shape = doc::Shape::nothing(); + + // Add projected fields to value shape + for projection in &projections { + let ptr = doc::Pointer::from_str(&projection.ptr); + let (field_shape, exists) = shape.locate(&ptr); + if exists.cannot() { + tracing::warn!( + projection = projection.ptr, + "Projection field not found in schema" + ); + continue; + } + + let projected_shape = build_shape_at_pointer(&ptr, field_shape); + value_shape = doc::Shape::union(value_shape, projected_shape); + } + if matches!(auth.deletions(), DeletionMode::CDC) { - if let Some(meta) = shape + if let Some(meta) = value_shape .object .properties .iter_mut() @@ -246,6 +269,7 @@ impl Collection { spec, uuid_ptr, value_schema, + projections, })) } @@ -583,3 +607,57 @@ pub async fn extract_dekaf_config( } } } + +/// Nests the provided shape under a JSON pointer path by creating the necessary object hierarchy. +/// For example, given pointer "/a/b/c" and a field shape, creates an object structure: +/// { "a": { "b": { "c": field_shape } } } +fn build_shape_at_pointer(ptr: &doc::Pointer, shape: &doc::Shape) -> doc::Shape { + let mut current_shape = doc::Shape::nothing(); + let mut current = &mut current_shape; + + // For each component in the pointer path except the last one, + // create the object structure + for token in ptr.iter().take(ptr.0.len() - 1) { + match token { + doc::ptr::Token::Property(name) => { + let mut obj = doc::Shape::nothing(); + obj.type_ = json::schema::types::OBJECT; + + current.type_ = json::schema::types::OBJECT; + current.object.properties.push(doc::shape::ObjProperty { + name: Box::from(name.as_str()), + is_required: true, + shape: obj, + }); + + // Move to the newly created object + current = &mut current.object.properties.last_mut().unwrap().shape; + } + doc::ptr::Token::Index(_) => { + // Create an array shape with the next level nested inside + let mut array = doc::Shape::nothing(); + array.type_ = json::schema::types::ARRAY; + array.array.additional_items = Some(Box::new(doc::Shape::nothing())); + + current.type_ = json::schema::types::ARRAY; + *current = array; + + // Move to the array items shape for the next iteration + current = current.array.additional_items.as_mut().unwrap(); + } + _ => unreachable!("NextIndex/NextProperty shouldn't appear in concrete pointers"), + } + } + + // Add the actual field shape at the final position + if let Some(doc::ptr::Token::Property(name)) = ptr.iter().last() { + current.type_ = json::schema::types::OBJECT; + current.object.properties.push(doc::shape::ObjProperty { + name: Box::from(name.as_str()), + is_required: true, + shape: shape.clone(), + }); + } + + current_shape +}