diff --git a/Cargo.lock b/Cargo.lock index 8a6375823..f5e9ff421 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -345,7 +345,7 @@ dependencies = [ [[package]] name = "arrow-json" version = "50.0.0" -source = "git+https://github.com/ArroyoSystems/arrow-rs?branch=50.0.0/json#8b568ec317aa1f2ab7d1e33ea6fef1fc5697c5fc" +source = "git+https://github.com/ArroyoSystems/arrow-rs?branch=50.0.0/json#61448d628a2cfb7b80bb66b2a47c061bf21a7b1d" dependencies = [ "arrow-array", "arrow-buffer", diff --git a/crates/arroyo-df/src/physical.rs b/crates/arroyo-df/src/physical.rs index 4e79d4bc5..912beb909 100644 --- a/crates/arroyo-df/src/physical.rs +++ b/crates/arroyo-df/src/physical.rs @@ -1,6 +1,6 @@ use arrow::{ array::{AsArray, BooleanBuilder, TimestampNanosecondBuilder, UInt32Builder}, - buffer::NullBuffer, + buffer::{BooleanBuffer, NullBuffer}, compute::{concat, kernels::zip, not, take}, }; use datafusion_execution::{RecordBatchStream, SendableRecordBatchStream}; @@ -975,10 +975,15 @@ impl ExecutionPlan for ToDebeziumExec { partition: usize, context: Arc, ) -> DFResult { - let is_retract_index: usize = self.input.schema().index_of(IS_RETRACT_FIELD)?; + let is_retract_index = self.input.schema().index_of(IS_RETRACT_FIELD).ok(); let timestamp_index = self.input.schema().index_of(TIMESTAMP_FIELD)?; let struct_projection = (0..self.input.schema().fields().len()) - .filter(|index| *index != is_retract_index && *index != timestamp_index) + .filter(|index| { + is_retract_index + .map(|is_retract_index| *index != is_retract_index) + .unwrap_or(true) + && *index != timestamp_index + }) .collect(); Ok(Box::pin(ToDebeziumStream { input: self.input.execute(partition, context)?, @@ -997,7 +1002,7 @@ impl ExecutionPlan for ToDebeziumExec { struct ToDebeziumStream { input: SendableRecordBatchStream, schema: SchemaRef, - is_retract_index: usize, + is_retract_index: Option, timestamp_index: usize, struct_projection: Vec, } @@ -1005,11 +1010,30 @@ struct ToDebeziumStream { impl ToDebeziumStream { fn as_debezium_batch(&mut self, batch: &RecordBatch) -> DFResult { let value_struct = batch.project(&self.struct_projection)?; - let is_retract = batch - .column(self.is_retract_index) - .as_any() - .downcast_ref::() - .unwrap(); + let timestamp_column = batch.column(self.timestamp_index).clone(); + match self.is_retract_index { + Some(retract_index) => self.create_debezium( + value_struct, + timestamp_column, + batch + .column(retract_index) + .as_any() + .downcast_ref::() + .unwrap(), + ), + None => { + let is_retract_array = + BooleanArray::new(BooleanBuffer::new_unset(batch.num_rows()), None); + self.create_debezium(value_struct, timestamp_column, &is_retract_array) + } + } + } + fn create_debezium( + &mut self, + value_struct: RecordBatch, + timestamp_column: Arc, + is_retract: &BooleanArray, + ) -> DFResult { let after_nullability = Some(NullBuffer::new(not(is_retract)?.values().clone())); let after_array = StructArray::try_new( value_struct.schema().fields().clone(), @@ -1025,7 +1049,6 @@ impl ToDebeziumStream { let append_datum = StringArray::new_scalar("c"); let retract_datum = StringArray::new_scalar("d"); let op_array = zip::zip(is_retract, &retract_datum, &append_datum)?; - let timestamp_column = batch.column(self.timestamp_index).clone(); let columns = vec![ Arc::new(before_array), diff --git a/crates/arroyo-formats/src/ser.rs b/crates/arroyo-formats/src/ser.rs index 8ec1cb74a..2de9f4156 100644 --- a/crates/arroyo-formats/src/ser.rs +++ b/crates/arroyo-formats/src/ser.rs @@ -348,8 +348,8 @@ mod tests { .unwrap(); let mut iter = serializer.serialize(&batch); - assert_eq!(iter.next().unwrap(), br#"{"value":1612274910045.332}"#); + assert_eq!(iter.next().unwrap(), br#"{"value":1612274910045}"#); assert_eq!(iter.next().unwrap(), br#"{"value":null}"#); - assert_eq!(iter.next().unwrap(), br#"{"value":1712274910045.332}"#); + assert_eq!(iter.next().unwrap(), br#"{"value":1712274910045}"#); } } diff --git a/crates/arroyo-sql-testing/golden_outputs/debezium_coercion.json b/crates/arroyo-sql-testing/golden_outputs/debezium_coercion.json new file mode 100644 index 000000000..378b7a567 --- /dev/null +++ b/crates/arroyo-sql-testing/golden_outputs/debezium_coercion.json @@ -0,0 +1,100 @@ +{"after":{"counter":0},"before":null,"op":"c"} +{"after":{"counter":1},"before":null,"op":"c"} +{"after":{"counter":2},"before":null,"op":"c"} +{"after":{"counter":3},"before":null,"op":"c"} +{"after":{"counter":4},"before":null,"op":"c"} +{"after":{"counter":5},"before":null,"op":"c"} +{"after":{"counter":6},"before":null,"op":"c"} +{"after":{"counter":7},"before":null,"op":"c"} +{"after":{"counter":8},"before":null,"op":"c"} +{"after":{"counter":9},"before":null,"op":"c"} +{"after":{"counter":10},"before":null,"op":"c"} +{"after":{"counter":11},"before":null,"op":"c"} +{"after":{"counter":12},"before":null,"op":"c"} +{"after":{"counter":13},"before":null,"op":"c"} +{"after":{"counter":14},"before":null,"op":"c"} +{"after":{"counter":15},"before":null,"op":"c"} +{"after":{"counter":16},"before":null,"op":"c"} +{"after":{"counter":17},"before":null,"op":"c"} +{"after":{"counter":18},"before":null,"op":"c"} +{"after":{"counter":19},"before":null,"op":"c"} +{"after":{"counter":20},"before":null,"op":"c"} +{"after":{"counter":21},"before":null,"op":"c"} +{"after":{"counter":22},"before":null,"op":"c"} +{"after":{"counter":23},"before":null,"op":"c"} +{"after":{"counter":24},"before":null,"op":"c"} +{"after":{"counter":25},"before":null,"op":"c"} +{"after":{"counter":26},"before":null,"op":"c"} +{"after":{"counter":27},"before":null,"op":"c"} +{"after":{"counter":28},"before":null,"op":"c"} +{"after":{"counter":29},"before":null,"op":"c"} +{"after":{"counter":30},"before":null,"op":"c"} +{"after":{"counter":31},"before":null,"op":"c"} +{"after":{"counter":32},"before":null,"op":"c"} +{"after":{"counter":33},"before":null,"op":"c"} +{"after":{"counter":34},"before":null,"op":"c"} +{"after":{"counter":35},"before":null,"op":"c"} +{"after":{"counter":36},"before":null,"op":"c"} +{"after":{"counter":37},"before":null,"op":"c"} +{"after":{"counter":38},"before":null,"op":"c"} +{"after":{"counter":39},"before":null,"op":"c"} +{"after":{"counter":40},"before":null,"op":"c"} +{"after":{"counter":41},"before":null,"op":"c"} +{"after":{"counter":42},"before":null,"op":"c"} +{"after":{"counter":43},"before":null,"op":"c"} +{"after":{"counter":44},"before":null,"op":"c"} +{"after":{"counter":45},"before":null,"op":"c"} +{"after":{"counter":46},"before":null,"op":"c"} +{"after":{"counter":47},"before":null,"op":"c"} +{"after":{"counter":48},"before":null,"op":"c"} +{"after":{"counter":49},"before":null,"op":"c"} +{"after":{"counter":50},"before":null,"op":"c"} +{"after":{"counter":51},"before":null,"op":"c"} +{"after":{"counter":52},"before":null,"op":"c"} +{"after":{"counter":53},"before":null,"op":"c"} +{"after":{"counter":54},"before":null,"op":"c"} +{"after":{"counter":55},"before":null,"op":"c"} +{"after":{"counter":56},"before":null,"op":"c"} +{"after":{"counter":57},"before":null,"op":"c"} +{"after":{"counter":58},"before":null,"op":"c"} +{"after":{"counter":59},"before":null,"op":"c"} +{"after":{"counter":60},"before":null,"op":"c"} +{"after":{"counter":61},"before":null,"op":"c"} +{"after":{"counter":62},"before":null,"op":"c"} +{"after":{"counter":63},"before":null,"op":"c"} +{"after":{"counter":64},"before":null,"op":"c"} +{"after":{"counter":65},"before":null,"op":"c"} +{"after":{"counter":66},"before":null,"op":"c"} +{"after":{"counter":67},"before":null,"op":"c"} +{"after":{"counter":68},"before":null,"op":"c"} +{"after":{"counter":69},"before":null,"op":"c"} +{"after":{"counter":70},"before":null,"op":"c"} +{"after":{"counter":71},"before":null,"op":"c"} +{"after":{"counter":72},"before":null,"op":"c"} +{"after":{"counter":73},"before":null,"op":"c"} +{"after":{"counter":74},"before":null,"op":"c"} +{"after":{"counter":75},"before":null,"op":"c"} +{"after":{"counter":76},"before":null,"op":"c"} +{"after":{"counter":77},"before":null,"op":"c"} +{"after":{"counter":78},"before":null,"op":"c"} +{"after":{"counter":79},"before":null,"op":"c"} +{"after":{"counter":80},"before":null,"op":"c"} +{"after":{"counter":81},"before":null,"op":"c"} +{"after":{"counter":82},"before":null,"op":"c"} +{"after":{"counter":83},"before":null,"op":"c"} +{"after":{"counter":84},"before":null,"op":"c"} +{"after":{"counter":85},"before":null,"op":"c"} +{"after":{"counter":86},"before":null,"op":"c"} +{"after":{"counter":87},"before":null,"op":"c"} +{"after":{"counter":88},"before":null,"op":"c"} +{"after":{"counter":89},"before":null,"op":"c"} +{"after":{"counter":90},"before":null,"op":"c"} +{"after":{"counter":91},"before":null,"op":"c"} +{"after":{"counter":92},"before":null,"op":"c"} +{"after":{"counter":93},"before":null,"op":"c"} +{"after":{"counter":94},"before":null,"op":"c"} +{"after":{"counter":95},"before":null,"op":"c"} +{"after":{"counter":96},"before":null,"op":"c"} +{"after":{"counter":97},"before":null,"op":"c"} +{"after":{"counter":98},"before":null,"op":"c"} +{"after":{"counter":99},"before":null,"op":"c"} diff --git a/crates/arroyo-sql-testing/src/test/queries/debezium_coercion.sql b/crates/arroyo-sql-testing/src/test/queries/debezium_coercion.sql new file mode 100644 index 000000000..84ef47626 --- /dev/null +++ b/crates/arroyo-sql-testing/src/test/queries/debezium_coercion.sql @@ -0,0 +1,24 @@ +CREATE TABLE impulse_source +( + timestamp TIMESTAMP, + counter bigint unsigned not null, + subtask_index bigint unsigned not null +) WITH ( + connector = 'single_file', + path = '$input_dir/impulse.json', + format = 'json', + type = 'source' + ); +CREATE TABLE output +( + counter bigint +) WITH ( + connector = 'single_file', + path = '$output_path', + format = 'debezium_json', + type = 'sink' +); + +INSERT INTO output +SELECT counter +FROM impulse_source; \ No newline at end of file