Skip to content

Commit

Permalink
ignore cdc events with missing schema error
Browse files Browse the repository at this point in the history
  • Loading branch information
imor committed Nov 30, 2024
1 parent 856e101 commit d07822b
Showing 1 changed file with 8 additions and 2 deletions.
10 changes: 8 additions & 2 deletions pg_replicate/src/pipeline/batching/data_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,11 @@ use tokio_postgres::types::PgLsn;
use tracing::{debug, info};

use crate::{
conversions::cdc_event::CdcEvent,
conversions::cdc_event::{CdcEvent, CdcEventConversionError},
pipeline::{
batching::stream::BatchTimeoutStream,
sinks::BatchSink,
sources::{Source, SourceError},
sources::{postgres::CdcStreamError, Source, SourceError},
PipelineAction, PipelineError,
},
table::TableId,
Expand Down Expand Up @@ -111,6 +111,12 @@ impl<Src: Source, Snk: BatchSink> BatchDataPipeline<Src, Snk> {
let mut send_status_update = false;
let mut events = Vec::with_capacity(batch.len());
for event in batch {
if let Err(CdcStreamError::CdcEventConversion(
CdcEventConversionError::MissingSchema(_),
)) = event
{
continue;
}
let event = event.map_err(SourceError::CdcStream)?;
if let CdcEvent::KeepAliveRequested { reply } = event {
send_status_update = reply;
Expand Down

0 comments on commit d07822b

Please sign in to comment.