Skip to content

Commit

Permalink
Send standby status once a minute
Browse files Browse the repository at this point in the history
also enable wal heartbeats by default. users predating PG14 should change the query or disable wal heartbeats

Heartbeats were being sent but for some reason RDS never sets off PKM
  • Loading branch information
serprex committed Nov 4, 2024
1 parent fc59d94 commit 610a4fd
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 9 deletions.
8 changes: 5 additions & 3 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,9 +415,10 @@ func PullCdcRecords[Items model.Items](
waitingForCommit = true
}
} else {
logger.Info(fmt.Sprintf("[%s] standby deadline reached, no records accumulated, continuing to wait",
p.flowJobName),
)
logger.Info(fmt.Sprintf("[%s] standby deadline reached, no records accumulated, continuing to wait", p.flowJobName))
if err := p.ReplPing(ctx); err != nil {
logger.Error("cdc failed to ReplPing", slog.Any("error", err))
}
}
nextStandbyMessageDeadline = time.Now().Add(standbyMessageTimeout)
}
Expand Down Expand Up @@ -595,6 +596,7 @@ func PullCdcRecords[Items model.Items](
// otherwise push to records so destination can ack once all previous messages processed
if cdcRecordsStorage.IsEmpty() {
if int64(clientXLogPos) > req.ConsumedOffset.Load() {
logger.Info("pg_logical_emit_message", slog.Int64("after", int64(clientXLogPos)), slog.Int64("before", req.ConsumedOffset.Load()))
metadata := connmetadata.NewPostgresMetadataFromCatalog(logger, p.catalogPool)
if err := metadata.SetLastOffset(ctx, req.FlowJobName, int64(clientXLogPos)); err != nil {
return err
Expand Down
9 changes: 3 additions & 6 deletions flow/peerdbenv/dynamicconf.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,17 +68,14 @@ var DynamicSettings = [...]*protos.DynamicSetting{
{
Name: "PEERDB_ENABLE_WAL_HEARTBEAT",
Description: "Enables WAL heartbeat to prevent replication slot lag from increasing during times of no activity",
DefaultValue: "false",
DefaultValue: "true",
ValueType: protos.DynconfValueType_BOOL,
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
TargetForSetting: protos.DynconfTarget_ALL,
},
{
Name: "PEERDB_WAL_HEARTBEAT_QUERY",
DefaultValue: `BEGIN;
DROP AGGREGATE IF EXISTS PEERDB_EPHEMERAL_HEARTBEAT(float4);
CREATE AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4) (SFUNC = float4pl, STYPE = float4);
DROP AGGREGATE PEERDB_EPHEMERAL_HEARTBEAT(float4); END;`,
Name: "PEERDB_WAL_HEARTBEAT_QUERY",
DefaultValue: "SELECT pg_logical_emit_message(false,'peerdb_heartbeat','')",
ValueType: protos.DynconfValueType_STRING,
Description: "SQL to run during each WAL heartbeat",
ApplyMode: protos.DynconfApplyMode_APPLY_MODE_IMMEDIATE,
Expand Down

0 comments on commit 610a4fd

Please sign in to comment.