Skip to content

Commit

Permalink
Add DB subscription (#114)
Browse files Browse the repository at this point in the history
Adds a struct that can subscribe to changes in a database query,
starting from some cursor onwards. Features:
- Poll based on an interval or on an input notifier channel
- Serve updates via an output channel
- Stops polling when context is closed
- Non-racy unit tests (hopefully)

Also adds a flexible DB query against the gateway_envelopes table that
can query based on a range of params. Given the size of the query, I
should add a unit test for it eventually.
  • Loading branch information
richardhuaaa authored Aug 26, 2024
1 parent 4dc7b3d commit 9771fb2
Show file tree
Hide file tree
Showing 11 changed files with 703 additions and 279 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,3 +108,5 @@ We use [sqlc](https://docs.sqlc.dev/en/latest/index.html) to generate the code f
```sh
dev/generate
```

If needed, there is a sqlc [playground](https://play.sqlc.dev/p/f6eebe941750560934cefa943c77f63497debc828c487e8d1771fb6d83773246) for experimenting with how the query syntax translates into Go code.
2 changes: 1 addition & 1 deletion dev/test
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ set -e

ulimit -n 2048

go test ./... "$@"
go test -timeout 3s `go list ./... | grep -v -e 'pkg/abis' -e 'pkg/config' -e 'pkg/proto' -e 'pkg/mock' -e 'pkg/testing'` "$@"

if [ -n "${RACE:-}" ]; then
echo
Expand Down
17 changes: 16 additions & 1 deletion pkg/db/queries.sql
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,22 @@ WHERE

-- name: InsertGatewayEnvelope :execrows
SELECT
insert_gateway_envelope(@originator_id, @sequence_id, @topic, @originator_envelope);
insert_gateway_envelope(@originator_id, @originator_sequence_id, @topic, @originator_envelope);

-- name: SelectGatewayEnvelopes :many
SELECT
*
FROM
gateway_envelopes
WHERE (sqlc.narg('topic')::BYTEA IS NULL
OR topic = @topic)
AND (sqlc.narg('originator_node_id')::INT IS NULL
OR originator_node_id = @originator_node_id)
AND (sqlc.narg('originator_sequence_id')::BIGINT IS NULL
OR originator_sequence_id > @originator_sequence_id)
AND (sqlc.narg('gateway_sequence_id')::BIGINT IS NULL
OR id > @gateway_sequence_id)
LIMIT sqlc.narg('row_limit')::INT;

-- name: InsertStagedOriginatorEnvelope :one
SELECT
Expand Down
10 changes: 5 additions & 5 deletions pkg/db/queries/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

70 changes: 65 additions & 5 deletions pkg/db/queries/queries.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

101 changes: 101 additions & 0 deletions pkg/db/subscription.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package db

import (
"context"
"fmt"
"time"

"go.uber.org/zap"
)

type PollableDBQuery[ValueType any] func(ctx context.Context, lastSeenID int64, numRows int32) (results []ValueType, lastID int64, err error)

// Poll whenever notified, or at an interval if not notified
type PollingOptions struct {
Interval time.Duration
Notifier <-chan bool
NumRows int32
}

type DBSubscription[ValueType any] struct {
ctx context.Context
log *zap.Logger
lastSeenID int64
options PollingOptions
query PollableDBQuery[ValueType]
updates chan<- []ValueType
}

func NewDBSubscription[ValueType any](
ctx context.Context,
log *zap.Logger,
query PollableDBQuery[ValueType],
lastSeenID int64,
options PollingOptions,
) *DBSubscription[ValueType] {
return &DBSubscription[ValueType]{
ctx: ctx,
log: log,
lastSeenID: lastSeenID,
options: options,
query: query,
updates: nil,
}
}

func (s *DBSubscription[ValueType]) Start() (<-chan []ValueType, error) {
if s.updates != nil {
return nil, fmt.Errorf("Already started")
}
if s.options.NumRows <= 0 || s.log == nil {
return nil, fmt.Errorf("Required params not provided")
}
updates := make(chan []ValueType)
s.updates = updates

go func() {
s.poll()

timer := time.NewTimer(s.options.Interval)
for {
timer.Reset(s.options.Interval)
select {
case <-s.ctx.Done():
s.log.Info("Context done; stopping subscription")
close(s.updates)
return
case <-s.options.Notifier:
s.poll()
case <-timer.C:
s.poll()
}
}
}()

return updates, nil
}

func (s *DBSubscription[ValueType]) poll() {
// Repeatedly query page by page until no more results
for {
results, lastID, err := s.query(s.ctx, s.lastSeenID, s.options.NumRows)
if err != nil {
s.log.Error(
"Error querying for DB subscription",
zap.Error(err),
zap.Int64("lastSeenID", s.lastSeenID),
zap.Int32("numRows", s.options.NumRows),
)
// Did not update lastSeenID; will retry on next poll
break
}
if len(results) == 0 {
break
}
s.lastSeenID = lastID
s.updates <- results
if int32(len(results)) < s.options.NumRows {
break
}
}
}
Loading

0 comments on commit 9771fb2

Please sign in to comment.