From d07822ba0e219c642008faba4330fb08d59925d2 Mon Sep 17 00:00:00 2001 From: Raminder Singh Date: Sat, 30 Nov 2024 13:38:27 +0530 Subject: [PATCH] ignore cdc events with missing schema error --- pg_replicate/src/pipeline/batching/data_pipeline.rs | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/pg_replicate/src/pipeline/batching/data_pipeline.rs b/pg_replicate/src/pipeline/batching/data_pipeline.rs index 6f9df24..8dd7ea4 100644 --- a/pg_replicate/src/pipeline/batching/data_pipeline.rs +++ b/pg_replicate/src/pipeline/batching/data_pipeline.rs @@ -6,11 +6,11 @@ use tokio_postgres::types::PgLsn; use tracing::{debug, info}; use crate::{ - conversions::cdc_event::CdcEvent, + conversions::cdc_event::{CdcEvent, CdcEventConversionError}, pipeline::{ batching::stream::BatchTimeoutStream, sinks::BatchSink, - sources::{Source, SourceError}, + sources::{postgres::CdcStreamError, Source, SourceError}, PipelineAction, PipelineError, }, table::TableId, @@ -111,6 +111,12 @@ impl BatchDataPipeline { let mut send_status_update = false; let mut events = Vec::with_capacity(batch.len()); for event in batch { + if let Err(CdcStreamError::CdcEventConversion( + CdcEventConversionError::MissingSchema(_), + )) = event + { + continue; + } let event = event.map_err(SourceError::CdcStream)?; if let CdcEvent::KeepAliveRequested { reply } = event { send_status_update = reply;