diff --git a/pg_replicate/src/pipeline/batching/data_pipeline.rs b/pg_replicate/src/pipeline/batching/data_pipeline.rs index 69626fe..9036460 100644 --- a/pg_replicate/src/pipeline/batching/data_pipeline.rs +++ b/pg_replicate/src/pipeline/batching/data_pipeline.rs @@ -84,8 +84,13 @@ impl BatchDataPipeline { while let Some(batch) = batch_timeout_stream.next().await { info!("got {} table copy events in a batch", batch.len()); + //TODO: Avoid a vec copy + let mut rows = Vec::with_capacity(batch.len()); + for row in batch { + rows.push(row.map_err(CommonSourceError::TableCopyStream)?); + } self.sink - .write_table_rows(batch, table_schema.table_id) + .write_table_rows(rows, table_schema.table_id) .await .map_err(PipelineError::Sink)?; } diff --git a/pg_replicate/src/pipeline/sinks/bigquery.rs b/pg_replicate/src/pipeline/sinks/bigquery.rs index f598254..4fc373c 100644 --- a/pg_replicate/src/pipeline/sinks/bigquery.rs +++ b/pg_replicate/src/pipeline/sinks/bigquery.rs @@ -9,7 +9,7 @@ use tracing::info; use crate::{ clients::bigquery::BigQueryClient, conversions::{cdc_event::CdcEvent, table_row::TableRow, Cell}, - pipeline::{sources::postgres::TableCopyStreamError, PipelineResumptionState}, + pipeline::PipelineResumptionState, table::{ColumnSchema, TableId, TableName, TableSchema}, }; @@ -162,24 +162,19 @@ impl BatchSink for BigQueryBatchSink { async fn write_table_rows( &mut self, - mut table_rows: Vec>, + mut table_rows: Vec, table_id: TableId, ) -> Result<(), Self::Error> { let table_schema = self.get_table_schema(table_id)?; let table_name = Self::table_name_in_bq(&table_schema.table_name); let table_descriptor = table_schema.into(); - let new_rows = table_rows - .drain(..) - .filter_map(|row| row.ok()) - .map(|mut row| { - row.values.push(Cell::String("UPSERT".to_string())); - row - }) - .collect::>(); + for table_row in &mut table_rows { + table_row.values.push(Cell::String("UPSERT".to_string())); + } self.client - .stream_rows(&self.dataset_id, table_name, &table_descriptor, &new_rows) + .stream_rows(&self.dataset_id, table_name, &table_descriptor, &table_rows) .await?; Ok(()) diff --git a/pg_replicate/src/pipeline/sinks/mod.rs b/pg_replicate/src/pipeline/sinks/mod.rs index 8ef876e..bd7b03c 100644 --- a/pg_replicate/src/pipeline/sinks/mod.rs +++ b/pg_replicate/src/pipeline/sinks/mod.rs @@ -9,7 +9,7 @@ use crate::{ table::{TableId, TableSchema}, }; -use super::{sources::postgres::TableCopyStreamError, PipelineResumptionState}; +use super::PipelineResumptionState; #[cfg(feature = "bigquery")] pub mod bigquery; @@ -53,7 +53,7 @@ pub trait BatchSink { ) -> Result<(), Self::Error>; async fn write_table_rows( &mut self, - rows: Vec>, + rows: Vec, table_id: TableId, ) -> Result<(), Self::Error>; async fn write_cdc_events(&mut self, events: Vec) -> Result;