Skip to content

Commit

Permalink
Fixed postgresql>=10 secondary server lag always 0, SuperQ proposed a…
Browse files Browse the repository at this point in the history
… more clean code solution :), pg_replication_test modified to test pgReplicationQueryBeforeVersion10 or pgReplicationQueryAfterVersion10 depending of the postgresql version

Signed-off-by: kr0m <[email protected]>
  • Loading branch information
kr0m authored and kr0m committed Nov 29, 2023
1 parent f5b613a commit ad16a32
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 12 deletions.
25 changes: 19 additions & 6 deletions collector/pg_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ package collector

import (
"context"

"github.com/blang/semver/v4"
"github.com/prometheus/client_golang/prometheus"
)

Expand Down Expand Up @@ -52,23 +52,36 @@ var (
[]string{}, nil,
)

pgReplicationQuery = `SELECT
pgReplicationQueryBeforeVersion10 = `SELECT
CASE
WHEN NOT pg_is_in_recovery() THEN 0
WHEN pg_last_wal_receive_lsn () = pg_last_wal_replay_lsn () THEN 0
WHEN pg_last_wal_receive_lsn () = pg_last_wal_replay_lsn () THEN 0
ELSE GREATEST (0, EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp())))
END AS lag,
CASE
WHEN pg_is_in_recovery() THEN 1
ELSE 0
END as is_replica`

pgReplicationQueryAfterVersion10 = `SELECT
CASE
WHEN NOT pg_is_in_recovery() THEN 0
ELSE GREATEST (0, EXTRACT(EPOCH FROM (now() - pg_last_xact_replay_timestamp())))
END AS lag,
CASE
WHEN pg_is_in_recovery() THEN 1
ELSE 0
END as is_replica`
)

func (c *PGReplicationCollector) Update(ctx context.Context, instance *instance, ch chan<- prometheus.Metric) error {
db := instance.getDB()
row := db.QueryRowContext(ctx,
pgReplicationQuery,
)
query := pgReplicationQueryBeforeVersion10
if instance.version.GE(semver.MustParse("10.0.0")) {
query = pgReplicationQueryAfterVersion10
}

row := db.QueryRowContext(ctx, query)

var lag float64
var isReplica int64
Expand Down
52 changes: 46 additions & 6 deletions collector/pg_replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ package collector

import (
"context"
"testing"

"github.com/DATA-DOG/go-sqlmock"
"github.com/blang/semver/v4"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/smartystreets/goconvey/convey"
"testing"
)

func TestPgReplicationCollector(t *testing.T) {
func TestPgReplicationCollectorBeforeVersion10(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
Expand All @@ -32,9 +32,49 @@ func TestPgReplicationCollector(t *testing.T) {
inst := &instance{db: db}

columns := []string{"lag", "is_replica"}
rows := sqlmock.NewRows(columns).
AddRow(1000, 1)
mock.ExpectQuery(sanitizeQuery(pgReplicationQuery)).WillReturnRows(rows)
rows := sqlmock.NewRows(columns).AddRow(1000, 1)
mock.ExpectQuery(sanitizeQuery(pgReplicationQueryBeforeVersion10)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGReplicationCollector{}

if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGReplicationCollector.Update: %s", err)
}
}()

expected := []MetricResult{
{labels: labelMap{}, value: 1000, metricType: dto.MetricType_GAUGE},
{labels: labelMap{}, value: 1, metricType: dto.MetricType_GAUGE},
}

convey.Convey("Metrics comparison", t, func() {
for _, expect := range expected {
m := readMetric(<-ch)
convey.So(expect, convey.ShouldResemble, m)
}
})
if err := mock.ExpectationsWereMet(); err != nil {
t.Errorf("there were unfulfilled exceptions: %s", err)
}
}

func TestPgReplicationCollectorAfterVersion10(t *testing.T) {
db, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("Error opening a stub db connection: %s", err)
}
defer db.Close()

//inst := &instance{db: db}
// Force test with a defined DB instance version, so ExpectQuery(pgReplicationQueryAfterVersion10) will match with PGReplicationCollector.Update query variable value
inst := &instance{db: db, version: semver.MustParse("10.0.0")}

columns := []string{"lag", "is_replica"}
rows := sqlmock.NewRows(columns).AddRow(1000, 1)
mock.ExpectQuery(sanitizeQuery(pgReplicationQueryAfterVersion10)).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
Expand Down

0 comments on commit ad16a32

Please sign in to comment.