Skip to content

Commit

Permalink
Support writing non-updating queries to debezium sinks. (#612)
Browse files Browse the repository at this point in the history
* Support writing non-updating queries to debezium sinks.

* bump arrow-json hash.
  • Loading branch information
jacksonrnewhouse authored and mwylde committed May 1, 2024
1 parent fb72b3f commit f1e785d
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 13 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 33 additions & 10 deletions crates/arroyo-df/src/physical.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use arrow::{
array::{AsArray, BooleanBuilder, TimestampNanosecondBuilder, UInt32Builder},
buffer::NullBuffer,
buffer::{BooleanBuffer, NullBuffer},
compute::{concat, kernels::zip, not, take},
};
use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream};
Expand Down Expand Up @@ -975,10 +975,15 @@ impl ExecutionPlan for ToDebeziumExec {
partition: usize,
context: Arc<TaskContext>,
) -> DFResult<SendableRecordBatchStream> {
let is_retract_index: usize = self.input.schema().index_of(IS_RETRACT_FIELD)?;
let is_retract_index = self.input.schema().index_of(IS_RETRACT_FIELD).ok();
let timestamp_index = self.input.schema().index_of(TIMESTAMP_FIELD)?;
let struct_projection = (0..self.input.schema().fields().len())
.filter(|index| *index != is_retract_index && *index != timestamp_index)
.filter(|index| {
is_retract_index
.map(|is_retract_index| *index != is_retract_index)
.unwrap_or(true)
&& *index != timestamp_index
})
.collect();
Ok(Box::pin(ToDebeziumStream {
input: self.input.execute(partition, context)?,
Expand All @@ -997,19 +1002,38 @@ impl ExecutionPlan for ToDebeziumExec {
struct ToDebeziumStream {
input: SendableRecordBatchStream,
schema: SchemaRef,
is_retract_index: usize,
is_retract_index: Option<usize>,
timestamp_index: usize,
struct_projection: Vec<usize>,
}

impl ToDebeziumStream {
fn as_debezium_batch(&mut self, batch: &RecordBatch) -> DFResult<RecordBatch> {
let value_struct = batch.project(&self.struct_projection)?;
let is_retract = batch
.column(self.is_retract_index)
.as_any()
.downcast_ref::<BooleanArray>()
.unwrap();
let timestamp_column = batch.column(self.timestamp_index).clone();
match self.is_retract_index {
Some(retract_index) => self.create_debezium(
value_struct,
timestamp_column,
batch
.column(retract_index)
.as_any()
.downcast_ref::<BooleanArray>()
.unwrap(),
),
None => {
let is_retract_array =
BooleanArray::new(BooleanBuffer::new_unset(batch.num_rows()), None);
self.create_debezium(value_struct, timestamp_column, &is_retract_array)
}
}
}
fn create_debezium(
&mut self,
value_struct: RecordBatch,
timestamp_column: Arc<dyn Array>,
is_retract: &BooleanArray,
) -> DFResult<RecordBatch> {
let after_nullability = Some(NullBuffer::new(not(is_retract)?.values().clone()));
let after_array = StructArray::try_new(
value_struct.schema().fields().clone(),
Expand All @@ -1025,7 +1049,6 @@ impl ToDebeziumStream {
let append_datum = StringArray::new_scalar("c");
let retract_datum = StringArray::new_scalar("d");
let op_array = zip::zip(is_retract, &retract_datum, &append_datum)?;
let timestamp_column = batch.column(self.timestamp_index).clone();

let columns = vec![
Arc::new(before_array),
Expand Down
4 changes: 2 additions & 2 deletions crates/arroyo-formats/src/ser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,8 @@ mod tests {
.unwrap();

let mut iter = serializer.serialize(&batch);
assert_eq!(iter.next().unwrap(), br#"{"value":1612274910045.332}"#);
assert_eq!(iter.next().unwrap(), br#"{"value":1612274910045}"#);
assert_eq!(iter.next().unwrap(), br#"{"value":null}"#);
assert_eq!(iter.next().unwrap(), br#"{"value":1712274910045.332}"#);
assert_eq!(iter.next().unwrap(), br#"{"value":1712274910045}"#);
}
}
100 changes: 100 additions & 0 deletions crates/arroyo-sql-testing/golden_outputs/debezium_coercion.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
{"after":{"counter":0},"before":null,"op":"c"}
{"after":{"counter":1},"before":null,"op":"c"}
{"after":{"counter":2},"before":null,"op":"c"}
{"after":{"counter":3},"before":null,"op":"c"}
{"after":{"counter":4},"before":null,"op":"c"}
{"after":{"counter":5},"before":null,"op":"c"}
{"after":{"counter":6},"before":null,"op":"c"}
{"after":{"counter":7},"before":null,"op":"c"}
{"after":{"counter":8},"before":null,"op":"c"}
{"after":{"counter":9},"before":null,"op":"c"}
{"after":{"counter":10},"before":null,"op":"c"}
{"after":{"counter":11},"before":null,"op":"c"}
{"after":{"counter":12},"before":null,"op":"c"}
{"after":{"counter":13},"before":null,"op":"c"}
{"after":{"counter":14},"before":null,"op":"c"}
{"after":{"counter":15},"before":null,"op":"c"}
{"after":{"counter":16},"before":null,"op":"c"}
{"after":{"counter":17},"before":null,"op":"c"}
{"after":{"counter":18},"before":null,"op":"c"}
{"after":{"counter":19},"before":null,"op":"c"}
{"after":{"counter":20},"before":null,"op":"c"}
{"after":{"counter":21},"before":null,"op":"c"}
{"after":{"counter":22},"before":null,"op":"c"}
{"after":{"counter":23},"before":null,"op":"c"}
{"after":{"counter":24},"before":null,"op":"c"}
{"after":{"counter":25},"before":null,"op":"c"}
{"after":{"counter":26},"before":null,"op":"c"}
{"after":{"counter":27},"before":null,"op":"c"}
{"after":{"counter":28},"before":null,"op":"c"}
{"after":{"counter":29},"before":null,"op":"c"}
{"after":{"counter":30},"before":null,"op":"c"}
{"after":{"counter":31},"before":null,"op":"c"}
{"after":{"counter":32},"before":null,"op":"c"}
{"after":{"counter":33},"before":null,"op":"c"}
{"after":{"counter":34},"before":null,"op":"c"}
{"after":{"counter":35},"before":null,"op":"c"}
{"after":{"counter":36},"before":null,"op":"c"}
{"after":{"counter":37},"before":null,"op":"c"}
{"after":{"counter":38},"before":null,"op":"c"}
{"after":{"counter":39},"before":null,"op":"c"}
{"after":{"counter":40},"before":null,"op":"c"}
{"after":{"counter":41},"before":null,"op":"c"}
{"after":{"counter":42},"before":null,"op":"c"}
{"after":{"counter":43},"before":null,"op":"c"}
{"after":{"counter":44},"before":null,"op":"c"}
{"after":{"counter":45},"before":null,"op":"c"}
{"after":{"counter":46},"before":null,"op":"c"}
{"after":{"counter":47},"before":null,"op":"c"}
{"after":{"counter":48},"before":null,"op":"c"}
{"after":{"counter":49},"before":null,"op":"c"}
{"after":{"counter":50},"before":null,"op":"c"}
{"after":{"counter":51},"before":null,"op":"c"}
{"after":{"counter":52},"before":null,"op":"c"}
{"after":{"counter":53},"before":null,"op":"c"}
{"after":{"counter":54},"before":null,"op":"c"}
{"after":{"counter":55},"before":null,"op":"c"}
{"after":{"counter":56},"before":null,"op":"c"}
{"after":{"counter":57},"before":null,"op":"c"}
{"after":{"counter":58},"before":null,"op":"c"}
{"after":{"counter":59},"before":null,"op":"c"}
{"after":{"counter":60},"before":null,"op":"c"}
{"after":{"counter":61},"before":null,"op":"c"}
{"after":{"counter":62},"before":null,"op":"c"}
{"after":{"counter":63},"before":null,"op":"c"}
{"after":{"counter":64},"before":null,"op":"c"}
{"after":{"counter":65},"before":null,"op":"c"}
{"after":{"counter":66},"before":null,"op":"c"}
{"after":{"counter":67},"before":null,"op":"c"}
{"after":{"counter":68},"before":null,"op":"c"}
{"after":{"counter":69},"before":null,"op":"c"}
{"after":{"counter":70},"before":null,"op":"c"}
{"after":{"counter":71},"before":null,"op":"c"}
{"after":{"counter":72},"before":null,"op":"c"}
{"after":{"counter":73},"before":null,"op":"c"}
{"after":{"counter":74},"before":null,"op":"c"}
{"after":{"counter":75},"before":null,"op":"c"}
{"after":{"counter":76},"before":null,"op":"c"}
{"after":{"counter":77},"before":null,"op":"c"}
{"after":{"counter":78},"before":null,"op":"c"}
{"after":{"counter":79},"before":null,"op":"c"}
{"after":{"counter":80},"before":null,"op":"c"}
{"after":{"counter":81},"before":null,"op":"c"}
{"after":{"counter":82},"before":null,"op":"c"}
{"after":{"counter":83},"before":null,"op":"c"}
{"after":{"counter":84},"before":null,"op":"c"}
{"after":{"counter":85},"before":null,"op":"c"}
{"after":{"counter":86},"before":null,"op":"c"}
{"after":{"counter":87},"before":null,"op":"c"}
{"after":{"counter":88},"before":null,"op":"c"}
{"after":{"counter":89},"before":null,"op":"c"}
{"after":{"counter":90},"before":null,"op":"c"}
{"after":{"counter":91},"before":null,"op":"c"}
{"after":{"counter":92},"before":null,"op":"c"}
{"after":{"counter":93},"before":null,"op":"c"}
{"after":{"counter":94},"before":null,"op":"c"}
{"after":{"counter":95},"before":null,"op":"c"}
{"after":{"counter":96},"before":null,"op":"c"}
{"after":{"counter":97},"before":null,"op":"c"}
{"after":{"counter":98},"before":null,"op":"c"}
{"after":{"counter":99},"before":null,"op":"c"}
24 changes: 24 additions & 0 deletions crates/arroyo-sql-testing/src/test/queries/debezium_coercion.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
CREATE TABLE impulse_source
(
timestamp TIMESTAMP,
counter bigint unsigned not null,
subtask_index bigint unsigned not null
) WITH (
connector = 'single_file',
path = '$input_dir/impulse.json',
format = 'json',
type = 'source'
);
CREATE TABLE output
(
counter bigint
) WITH (
connector = 'single_file',
path = '$output_path',
format = 'debezium_json',
type = 'sink'
);

INSERT INTO output
SELECT counter
FROM impulse_source;

0 comments on commit f1e785d

Please sign in to comment.