diff --git a/pg_replicate/src/pipeline/batching/data_pipeline.rs b/pg_replicate/src/pipeline/batching/data_pipeline.rs index 9036460..f3c2636 100644 --- a/pg_replicate/src/pipeline/batching/data_pipeline.rs +++ b/pg_replicate/src/pipeline/batching/data_pipeline.rs @@ -3,7 +3,7 @@ use std::{collections::HashSet, time::Instant}; use futures::StreamExt; use tokio::pin; use tokio_postgres::types::PgLsn; -use tracing::{debug, info}; +use tracing::{debug, info, warn}; use crate::{ conversions::cdc_event::{CdcEvent, CdcEventConversionError}, @@ -84,13 +84,8 @@ impl BatchDataPipeline { while let Some(batch) = batch_timeout_stream.next().await { info!("got {} table copy events in a batch", batch.len()); - //TODO: Avoid a vec copy - let mut rows = Vec::with_capacity(batch.len()); - for row in batch { - rows.push(row.map_err(CommonSourceError::TableCopyStream)?); - } self.sink - .write_table_rows(rows, table_schema.table_id) + .write_table_rows(batch, table_schema.table_id) .await .map_err(PipelineError::Sink)?; } diff --git a/pg_replicate/src/pipeline/sinks/bigquery.rs b/pg_replicate/src/pipeline/sinks/bigquery.rs index 4fc373c..8c65385 100644 --- a/pg_replicate/src/pipeline/sinks/bigquery.rs +++ b/pg_replicate/src/pipeline/sinks/bigquery.rs @@ -9,7 +9,7 @@ use tracing::info; use crate::{ clients::bigquery::BigQueryClient, conversions::{cdc_event::CdcEvent, table_row::TableRow, Cell}, - pipeline::PipelineResumptionState, + pipeline::{PipelineResumptionState, sources::postgres::TableCopyStreamError}, table::{ColumnSchema, TableId, TableName, TableSchema}, }; @@ -162,19 +162,24 @@ impl BatchSink for BigQueryBatchSink { async fn write_table_rows( &mut self, - mut table_rows: Vec, + mut table_rows: Vec>, table_id: TableId, ) -> Result<(), Self::Error> { let table_schema = self.get_table_schema(table_id)?; let table_name = Self::table_name_in_bq(&table_schema.table_name); let table_descriptor = table_schema.into(); - for table_row in &mut table_rows { - table_row.values.push(Cell::String("UPSERT".to_string())); - } + let new_rows = table_rows + .drain(..) + .filter_map(|row| row.ok()) + .map(|mut row| { + row.values.push(Cell::String("UPSERT".to_string())); + row + }) + .collect::>(); self.client - .stream_rows(&self.dataset_id, table_name, &table_descriptor, &table_rows) + .stream_rows(&self.dataset_id, table_name, &table_descriptor, &new_rows) .await?; Ok(()) diff --git a/pg_replicate/src/pipeline/sinks/mod.rs b/pg_replicate/src/pipeline/sinks/mod.rs index bd7b03c..8ef876e 100644 --- a/pg_replicate/src/pipeline/sinks/mod.rs +++ b/pg_replicate/src/pipeline/sinks/mod.rs @@ -9,7 +9,7 @@ use crate::{ table::{TableId, TableSchema}, }; -use super::PipelineResumptionState; +use super::{sources::postgres::TableCopyStreamError, PipelineResumptionState}; #[cfg(feature = "bigquery")] pub mod bigquery; @@ -53,7 +53,7 @@ pub trait BatchSink { ) -> Result<(), Self::Error>; async fn write_table_rows( &mut self, - rows: Vec, + rows: Vec>, table_id: TableId, ) -> Result<(), Self::Error>; async fn write_cdc_events(&mut self, events: Vec) -> Result;