Skip to content

Commit

Permalink
dekaf: Refactor extraction logic into `doc::Extractor::extract_all_in…
Browse files Browse the repository at this point in the history
…to_node()`
  • Loading branch information
jshearer committed Jan 6, 2025
1 parent 1f1205b commit 2528553
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 17 deletions.
18 changes: 2 additions & 16 deletions crates/dekaf/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,6 @@ impl Read {
tmp.push(0);
tmp.extend(self.value_schema_id.to_be_bytes());

let mut value_doc = HeapNode::Object(BumpVec::new());

// Create extractors for key fields and projections
let mut projections = self.projections.clone();
// Add key fields as projections
Expand All @@ -298,20 +296,8 @@ impl Read {
)
.context("creating field extractors")?;

let mut value_doc = HeapNode::Object(BumpVec::new());

// Extract all fields by querying each extractor
for (projection, extractor) in projections.iter().zip(extractors.iter()) {
let ptr = doc::Pointer::from_str(&projection.ptr);
let extracted = match extractor.query(root.get()) {
Ok(value) => HeapNode::from_node(value, &alloc),
Err(default) => HeapNode::from_node(&default.into_owned(), &alloc),
};

ptr.create_heap_node(&mut value_doc, &alloc)
.map(|node| *node = extracted)
.with_context(|| format!("Failed to project field {}", projection.ptr))?;
}
let mut value_doc =
doc::Extractor::extract_all_into_node(&extractors, root.get(), &alloc);

if matches!(self.deletes, DeletionMode::CDC) {
let foo = DELETION_INDICATOR_PTR
Expand Down
25 changes: 24 additions & 1 deletion crates/doc/src/extractor.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::{compare::compare, AsNode, Node, OwnedNode, Pointer, SerPolicy};
use crate::{compare::compare, AsNode, BumpVec, HeapNode, Node, OwnedNode, Pointer, SerPolicy};
use bytes::BufMut;
use std::{
borrow::Cow,
Expand Down Expand Up @@ -220,6 +220,29 @@ impl Extractor {
self.extract_indicate_truncation(doc, w, indicator)
}

/// Applies extractors to a source document, creating a new document with the extracted fields.
pub fn extract_all_into_node<'alloc, N: AsNode>(
extractors: &[Self],
doc: &N,
alloc: &'alloc bumpalo::Bump,
) -> HeapNode<'alloc> {
let mut output = HeapNode::Object(BumpVec::new());

for extractor in extractors.iter() {
let extracted = match extractor.query(doc) {
Ok(value) => HeapNode::from_node(value, alloc),
Err(default) => HeapNode::from_node(&default.into_owned(), alloc),
};

extractor
.ptr
.create_heap_node(&mut output, alloc)
.map(|node| *node = extracted);
}

output
}

/// Compare the deep ordering of `lhs` and `rhs` with respect to a composite key.
pub fn compare_key<L: AsNode, R: AsNode>(key: &[Self], lhs: &L, rhs: &R) -> std::cmp::Ordering {
use std::cmp::Ordering;
Expand Down

0 comments on commit 2528553

Please sign in to comment.