diff --git a/crates/dekaf/src/read.rs b/crates/dekaf/src/read.rs index 92232beb2b..01804b953c 100644 --- a/crates/dekaf/src/read.rs +++ b/crates/dekaf/src/read.rs @@ -277,8 +277,6 @@ impl Read { tmp.push(0); tmp.extend(self.value_schema_id.to_be_bytes()); - let mut value_doc = HeapNode::Object(BumpVec::new()); - // Create extractors for key fields and projections let mut projections = self.projections.clone(); // Add key fields as projections @@ -298,20 +296,8 @@ impl Read { ) .context("creating field extractors")?; - let mut value_doc = HeapNode::Object(BumpVec::new()); - - // Extract all fields by querying each extractor - for (projection, extractor) in projections.iter().zip(extractors.iter()) { - let ptr = doc::Pointer::from_str(&projection.ptr); - let extracted = match extractor.query(root.get()) { - Ok(value) => HeapNode::from_node(value, &alloc), - Err(default) => HeapNode::from_node(&default.into_owned(), &alloc), - }; - - ptr.create_heap_node(&mut value_doc, &alloc) - .map(|node| *node = extracted) - .with_context(|| format!("Failed to project field {}", projection.ptr))?; - } + let mut value_doc = + doc::Extractor::extract_all_into_node(&extractors, root.get(), &alloc); if matches!(self.deletes, DeletionMode::CDC) { let foo = DELETION_INDICATOR_PTR diff --git a/crates/doc/src/extractor.rs b/crates/doc/src/extractor.rs index e9e0131035..3037b1a320 100644 --- a/crates/doc/src/extractor.rs +++ b/crates/doc/src/extractor.rs @@ -1,4 +1,4 @@ -use crate::{compare::compare, AsNode, Node, OwnedNode, Pointer, SerPolicy}; +use crate::{compare::compare, AsNode, BumpVec, HeapNode, Node, OwnedNode, Pointer, SerPolicy}; use bytes::BufMut; use std::{ borrow::Cow, @@ -220,6 +220,29 @@ impl Extractor { self.extract_indicate_truncation(doc, w, indicator) } + /// Applies extractors to a source document, creating a new document with the extracted fields. + pub fn extract_all_into_node<'alloc, N: AsNode>( + extractors: &[Self], + doc: &N, + alloc: &'alloc bumpalo::Bump, + ) -> HeapNode<'alloc> { + let mut output = HeapNode::Object(BumpVec::new()); + + for extractor in extractors.iter() { + let extracted = match extractor.query(doc) { + Ok(value) => HeapNode::from_node(value, alloc), + Err(default) => HeapNode::from_node(&default.into_owned(), alloc), + }; + + extractor + .ptr + .create_heap_node(&mut output, alloc) + .map(|node| *node = extracted); + } + + output + } + /// Compare the deep ordering of `lhs` and `rhs` with respect to a composite key. pub fn compare_key(key: &[Self], lhs: &L, rhs: &R) -> std::cmp::Ordering { use std::cmp::Ordering;