Skip to content

Commit

Permalink
allow write_table_rows to handle errored rows itself
Browse files Browse the repository at this point in the history
  • Loading branch information
passcod committed Nov 30, 2024
1 parent 09f8be8 commit a4bcd32
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 15 deletions.
9 changes: 2 additions & 7 deletions pg_replicate/src/pipeline/batching/data_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::{collections::HashSet, time::Instant};
use futures::StreamExt;
use tokio::pin;
use tokio_postgres::types::PgLsn;
use tracing::{debug, info};
use tracing::{debug, info, warn};

use crate::{
conversions::cdc_event::{CdcEvent, CdcEventConversionError},
Expand Down Expand Up @@ -84,13 +84,8 @@ impl<Src: Source, Snk: BatchSink> BatchDataPipeline<Src, Snk> {

while let Some(batch) = batch_timeout_stream.next().await {
info!("got {} table copy events in a batch", batch.len());
//TODO: Avoid a vec copy
let mut rows = Vec::with_capacity(batch.len());
for row in batch {
rows.push(row.map_err(CommonSourceError::TableCopyStream)?);
}
self.sink
.write_table_rows(rows, table_schema.table_id)
.write_table_rows(batch, table_schema.table_id)
.await
.map_err(PipelineError::Sink)?;
}
Expand Down
17 changes: 11 additions & 6 deletions pg_replicate/src/pipeline/sinks/bigquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use tracing::info;
use crate::{
clients::bigquery::BigQueryClient,
conversions::{cdc_event::CdcEvent, table_row::TableRow, Cell},
pipeline::PipelineResumptionState,
pipeline::{PipelineResumptionState, sources::postgres::TableCopyStreamError},
table::{ColumnSchema, TableId, TableName, TableSchema},
};

Expand Down Expand Up @@ -162,19 +162,24 @@ impl BatchSink for BigQueryBatchSink {

async fn write_table_rows(
&mut self,
mut table_rows: Vec<TableRow>,
mut table_rows: Vec<Result<TableRow, TableCopyStreamError>>,
table_id: TableId,
) -> Result<(), Self::Error> {
let table_schema = self.get_table_schema(table_id)?;
let table_name = Self::table_name_in_bq(&table_schema.table_name);
let table_descriptor = table_schema.into();

for table_row in &mut table_rows {
table_row.values.push(Cell::String("UPSERT".to_string()));
}
let new_rows = table_rows
.drain(..)
.filter_map(|row| row.ok())
.map(|mut row| {
row.values.push(Cell::String("UPSERT".to_string()));
row
})
.collect::<Vec<TableRow>>();

self.client
.stream_rows(&self.dataset_id, table_name, &table_descriptor, &table_rows)
.stream_rows(&self.dataset_id, table_name, &table_descriptor, &new_rows)
.await?;

Ok(())
Expand Down
4 changes: 2 additions & 2 deletions pg_replicate/src/pipeline/sinks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use crate::{
table::{TableId, TableSchema},
};

use super::PipelineResumptionState;
use super::{sources::postgres::TableCopyStreamError, PipelineResumptionState};

#[cfg(feature = "bigquery")]
pub mod bigquery;
Expand Down Expand Up @@ -53,7 +53,7 @@ pub trait BatchSink {
) -> Result<(), Self::Error>;
async fn write_table_rows(
&mut self,
rows: Vec<TableRow>,
rows: Vec<Result<TableRow, TableCopyStreamError>>,
table_id: TableId,
) -> Result<(), Self::Error>;
async fn write_cdc_events(&mut self, events: Vec<CdcEvent>) -> Result<PgLsn, Self::Error>;
Expand Down

0 comments on commit a4bcd32

Please sign in to comment.