Skip to content

Commit

Permalink
dekaf: Implement materialization projection support
Browse files Browse the repository at this point in the history
* Connector projections: emit required constraints for all keys, and recommended constraints for all other fields
* Schema: Build a schema from the collection's projections that will match the extracted documents
* Extraction: Implement field extraction using the `extractors` crate to emit documents that match the collection's projections
  • Loading branch information
jshearer committed Jan 6, 2025
1 parent 69ba295 commit 1f1205b
Show file tree
Hide file tree
Showing 5 changed files with 212 additions and 54 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/dekaf/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ license.workspace = true
allocator = { path = "../allocator" }
avro = { path = "../avro" }
doc = { path = "../doc" }
extractors = { path = "../extractors" }
tuple = { path = "../tuple" }
flow-client = { path = "../flow-client" }
gazette = { path = "../gazette" }
json = { path = "../json" }
Expand Down
93 changes: 67 additions & 26 deletions crates/dekaf/src/connector.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use anyhow::{bail, Context};
use futures::future::try_join_all;
use models::RawValue;
use proto_flow::{flow::materialization_spec, materialize};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -116,34 +118,73 @@ pub async fn unary_materialize(
parsed_outer_config.variant
))?;

// Largely copied from crates/validation/src/noop.rs
let validated_bindings = std::mem::take(&mut validate.bindings)
.into_iter()
.enumerate()
.map(|(i, b)| {
let resource_path = vec![format!("binding-{}", i)];
let constraints = b
.collection
.expect("collection must exist")
.projections
.into_iter()
.map(|proj| {
(
proj.field,
validated::Constraint {
r#type: validated::constraint::Type::FieldOptional as i32,
reason: "Dekaf allows everything for now".to_string(),
let validated_bindings = try_join_all(
std::mem::take(&mut validate.bindings)
.into_iter()
.map(|binding| {
let resource_config_str = binding.resource_config_json.clone();
let endpoint_config = parsed_outer_config.clone();
async move {
let resource_config = serde_json::from_value::<DekafResourceConfig>(
unseal::decrypt_sops(&RawValue::from_str(&resource_config_str)?)
.await
.context(format!(
"decrypting dekaf resource config for variant {}",
endpoint_config.variant
))?
.to_value(),
)
.context(format!(
"validating dekaf resource config for variant {}",
endpoint_config.variant
))?;

let collection = binding.collection.expect("collection must exist");

let mut constraints = BTreeMap::new();

// Keys are always required
for key_field in collection.key.iter() {
constraints.insert(
// Constraint keys should not have a leading slash, while collection keys do
key_field[1..].to_string(),
validated::Constraint {
r#type: validated::constraint::Type::FieldRequired as i32,
reason: "Key fields are required for Dekaf materialization"
.to_string(),
},
);
}

// Process other fields from projections
for proj in collection.projections {
// Skip if it's already a key field
if collection.key.contains(&proj.field) {
continue;
}

// Should this be recommending locations instead of fields?
constraints.insert(
proj.field,
validated::Constraint {
r#type: validated::constraint::Type::FieldOptional as i32,
reason: "All fields are recommended for Dekaf materialization"
.to_string(),
},
);
}

Ok::<proto_flow::materialize::response::validated::Binding, anyhow::Error>(
validated::Binding {
constraints,
resource_path: vec![resource_config.topic_name],
delta_updates: false,
},
)
})
.collect::<BTreeMap<_, _>>();
validated::Binding {
constraints,
resource_path,
delta_updates: false,
}
})
.collect::<Vec<_>>();
}
}),
)
.await?;

return Ok(materialize::Response {
validated: Some(materialize::response::Validated {
Expand Down
89 changes: 62 additions & 27 deletions crates/dekaf/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,8 @@ use super::{Collection, Partition};
use crate::connector::DeletionMode;
use anyhow::{bail, Context};
use bytes::{Buf, BufMut, BytesMut};
use doc::{AsNode, HeapNode};
use doc::{AsNode, BumpVec, HeapNode};
use extractors;
use futures::StreamExt;
use gazette::journal::{ReadJsonLine, ReadJsonLines};
use gazette::{broker, journal, uuid};
Expand All @@ -26,6 +27,7 @@ pub struct Read {
uuid_ptr: doc::Pointer, // Location of document UUID.
value_schema: avro::Schema, // Avro schema when encoding values.
value_schema_id: u32, // Registry ID of the value's schema.
projections: Vec<proto_flow::flow::Projection>, // Projections to apply

// Keep these details around so we can create a new ReadRequest if we need to skip forward
journal_name: String,
Expand Down Expand Up @@ -97,6 +99,7 @@ impl Read {
uuid_ptr: collection.uuid_ptr.clone(),
value_schema: collection.value_schema.clone(),
value_schema_id,
projections: collection.projections.clone(),

journal_name: partition.spec.name.clone(),
rewrite_offsets_from,
Expand Down Expand Up @@ -266,33 +269,65 @@ impl Read {
};

// Encode the value.
let value =
if is_control || (is_deletion && matches!(self.deletes, DeletionMode::Kafka)) {
None
} else {
tmp.push(0);
tmp.extend(self.value_schema_id.to_be_bytes());

if matches!(self.deletes, DeletionMode::CDC) {
let mut heap_node = HeapNode::from_node(root.get(), &alloc);
let foo = DELETION_INDICATOR_PTR
.create_heap_node(&mut heap_node, &alloc)
.context("Unable to add deletion meta indicator")?;

*foo = HeapNode::PosInt(if is_deletion { 1 } else { 0 });

() = avro::encode(&mut tmp, &self.value_schema, &heap_node)?;

alloc.reset();
} else {
() = avro::encode(&mut tmp, &self.value_schema, root.get())?;
}
let value = if is_control
|| (is_deletion && matches!(self.deletes, DeletionMode::Kafka))
{
None
} else {
tmp.push(0);
tmp.extend(self.value_schema_id.to_be_bytes());

let mut value_doc = HeapNode::Object(BumpVec::new());

// Create extractors for key fields and projections
let mut projections = self.projections.clone();
// Add key fields as projections
projections.extend(self.key_ptr.iter().map(|ptr| proto_flow::flow::Projection {
ptr: ptr.to_string(),
field: ptr.to_string(), // Using ptr as field name for key fields
inference: Some(proto_flow::flow::Inference::default()),
explicit: true,
..Default::default()
}));

let policy = doc::SerPolicy::noop();
let extractors = extractors::for_fields(
&projections.iter().map(|p| &p.ptr).collect::<Vec<_>>(),
&projections,
&policy,
)
.context("creating field extractors")?;

let mut value_doc = HeapNode::Object(BumpVec::new());

// Extract all fields by querying each extractor
for (projection, extractor) in projections.iter().zip(extractors.iter()) {
let ptr = doc::Pointer::from_str(&projection.ptr);
let extracted = match extractor.query(root.get()) {
Ok(value) => HeapNode::from_node(value, &alloc),
Err(default) => HeapNode::from_node(&default.into_owned(), &alloc),
};

ptr.create_heap_node(&mut value_doc, &alloc)
.map(|node| *node = extracted)
.with_context(|| format!("Failed to project field {}", projection.ptr))?;
}

if matches!(self.deletes, DeletionMode::CDC) {
let foo = DELETION_INDICATOR_PTR
.create_heap_node(&mut value_doc, &alloc)
.context("Unable to add deletion meta indicator")?;

*foo = HeapNode::PosInt(if is_deletion { 1 } else { 0 });
}

record_bytes += tmp.len();
buf.extend_from_slice(&tmp);
tmp.clear();
Some(buf.split().freeze())
};
() = avro::encode(&mut tmp, &self.value_schema, &value_doc)?;

record_bytes += tmp.len();
buf.extend_from_slice(&tmp);
tmp.clear();
Some(buf.split().freeze())
};

self.offset = next_offset;

Expand Down
80 changes: 79 additions & 1 deletion crates/dekaf/src/topology.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ pub struct Collection {
pub spec: flow::CollectionSpec,
pub uuid_ptr: doc::Pointer,
pub value_schema: avro::Schema,
pub projections: Vec<proto_flow::flow::Projection>,
}

/// Partition is a collection journal which is mapped into a stable Kafka partition order.
Expand Down Expand Up @@ -183,6 +184,9 @@ impl Collection {
spec.key.iter().map(|p| doc::Pointer::from_str(p)).collect();
let uuid_ptr = doc::Pointer::from_str(&spec.uuid_ptr);

// Extract projections from the spec
let projections = spec.projections.clone();

let json_schema = if spec.read_schema_json.is_empty() {
&spec.write_schema_json
} else {
Expand All @@ -193,8 +197,27 @@ impl Collection {
let validator = doc::Validator::new(json_schema)?;
let mut shape = doc::Shape::infer(&validator.schemas()[0], validator.schema_index());

// Create value shape by merging all projected fields in the schema
let mut value_shape = doc::Shape::nothing();

// Add projected fields to value shape
for projection in &projections {
let ptr = doc::Pointer::from_str(&projection.ptr);
let (field_shape, exists) = shape.locate(&ptr);
if exists.cannot() {
tracing::warn!(
projection = projection.ptr,
"Projection field not found in schema"
);
continue;
}

let projected_shape = build_shape_at_pointer(&ptr, field_shape);
value_shape = doc::Shape::union(value_shape, projected_shape);
}

if matches!(auth.deletions(), DeletionMode::CDC) {
if let Some(meta) = shape
if let Some(meta) = value_shape
.object
.properties
.iter_mut()
Expand Down Expand Up @@ -246,6 +269,7 @@ impl Collection {
spec,
uuid_ptr,
value_schema,
projections,
}))
}

Expand Down Expand Up @@ -583,3 +607,57 @@ pub async fn extract_dekaf_config(
}
}
}

/// Nests the provided shape under a JSON pointer path by creating the necessary object hierarchy.
/// For example, given pointer "/a/b/c" and a field shape, creates an object structure:
/// { "a": { "b": { "c": field_shape } } }
fn build_shape_at_pointer(ptr: &doc::Pointer, shape: &doc::Shape) -> doc::Shape {
let mut current_shape = doc::Shape::nothing();
let mut current = &mut current_shape;

// For each component in the pointer path except the last one,
// create the object structure
for token in ptr.iter().take(ptr.0.len() - 1) {
match token {
doc::ptr::Token::Property(name) => {
let mut obj = doc::Shape::nothing();
obj.type_ = json::schema::types::OBJECT;

current.type_ = json::schema::types::OBJECT;
current.object.properties.push(doc::shape::ObjProperty {
name: Box::from(name.as_str()),
is_required: true,
shape: obj,
});

// Move to the newly created object
current = &mut current.object.properties.last_mut().unwrap().shape;
}
doc::ptr::Token::Index(_) => {
// Create an array shape with the next level nested inside
let mut array = doc::Shape::nothing();
array.type_ = json::schema::types::ARRAY;
array.array.additional_items = Some(Box::new(doc::Shape::nothing()));

current.type_ = json::schema::types::ARRAY;
*current = array;

// Move to the array items shape for the next iteration
current = current.array.additional_items.as_mut().unwrap();
}
_ => unreachable!("NextIndex/NextProperty shouldn't appear in concrete pointers"),
}
}

// Add the actual field shape at the final position
if let Some(doc::ptr::Token::Property(name)) = ptr.iter().last() {
current.type_ = json::schema::types::OBJECT;
current.object.properties.push(doc::shape::ObjProperty {
name: Box::from(name.as_str()),
is_required: true,
shape: shape.clone(),
});
}

current_shape
}

0 comments on commit 1f1205b

Please sign in to comment.