diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index a355cfa00..ec6d205de 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -415,9 +415,10 @@ func PullCdcRecords[Items model.Items]( waitingForCommit = true } } else { - logger.Info(fmt.Sprintf("[%s] standby deadline reached, no records accumulated, continuing to wait", - p.flowJobName), - ) + logger.Info(fmt.Sprintf("[%s] standby deadline reached, no records accumulated, continuing to wait", p.flowJobName)) + if err := p.ReplPing(ctx); err != nil { + logger.Error("cdc failed to ReplPing", slog.Any("error", err)) + } } nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout) } @@ -595,6 +596,7 @@ func PullCdcRecords[Items model.Items]( // otherwise push to records so destination can ack once all previous messages processed if cdcRecordsStorage.IsEmpty() { if int64(clientXLogPos) > req.ConsumedOffset.Load() { + logger.Info("pg_logical_emit_message", slog.Int64("after", int64(clientXLogPos)), slog.Int64("before", req.ConsumedOffset.Load())) metadata := connmetadata.NewPostgresMetadataFromCatalog(logger, p.catalogPool) if err := metadata.SetLastOffset(ctx, req.FlowJobName, int64(clientXLogPos)); err != nil { return err diff --git a/flow/peerdbenv/dynamicconf.go b/flow/peerdbenv/dynamicconf.go index 1e2f22590..dd0b833b2 100644 --- a/flow/peerdbenv/dynamicconf.go +++ b/flow/peerdbenv/dynamicconf.go @@ -68,17 +68,14 @@ var DynamicSettings = [...]*protos.DynamicSetting{ { Name: "PEERDB_ENABLE_WAL_HEARTBEAT", Description: "Enables WAL heartbeat to prevent replication slot lag from increasing during times of no activity", - DefaultValue: "false", + DefaultValue: "true", ValueType: protos.DynconfValueType_BOOL, ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE, TargetForSetting: protos.DynconfTarget_ALL, }, { - Name: "PEERDB_WAL_HEARTBEAT_QUERY", - DefaultValue: `BEGIN; -DROP AGGREGATE IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4); -CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4); -DROP AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4); END;`, + Name: "PEERDB_WAL_HEARTBEAT_QUERY", + DefaultValue: "SELECT pg_logical_emit_message(false,'peerdb_heartbeat','')", ValueType: protos.DynconfValueType_STRING, Description: "SQL to run during each WAL heartbeat", ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,