Skip to content

Commit

Permalink
Allow custom serializer / deserializer functions (#21)
Browse files Browse the repository at this point in the history
  • Loading branch information
andywer authored May 15, 2019
1 parent 77bebce commit c9a731b
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 6 deletions.
27 changes: 22 additions & 5 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,18 @@ export interface Options {
* Timeout in ms after which to stop retrying and just fail. Defaults to 3000 ms.
*/
retryTimeout?: number

/**
* Custom function to control how the payload data is stringified on `.notify()`.
* Use together with the `serialize` option. Defaults to `JSON.parse`.
*/
parse?: (serialized: string) => any

/**
* Custom function to control how the payload data is stringified on `.notify()`.
* Use together with the `parse` option. Defaults to `JSON.stringify`.
*/
serialize?: (data: any) => string
}

function connect (connectionConfig: pg.ClientConfig | undefined, options: Options) {
Expand Down Expand Up @@ -113,13 +125,13 @@ function connect (connectionConfig: pg.ClientConfig | undefined, options: Option
}
}

function forwardDBNotificationEvents (dbClient: pg.Client, emitter: TypedEventEmitter<PgListenEvents>) {
function forwardDBNotificationEvents (dbClient: pg.Client, emitter: TypedEventEmitter<PgListenEvents>, parse: (stringifiedData: string) => any) {
const onNotification = (notification: PgNotification) => {
notificationLogger(`Received PostgreSQL notification on "${notification.channel}":`, notification.payload)

let payload
try {
payload = notification.payload ? JSON.parse(notification.payload) : notification.payload
payload = notification.payload !== undefined ? parse(notification.payload) : undefined
} catch (error) {
error.message = `Error parsing PostgreSQL notification payload: ${error.message}`
return emitter.emit("error", error)
Expand Down Expand Up @@ -173,7 +185,11 @@ export interface Subscriber {
}

function createPostgresSubscriber (connectionConfig?: pg.ClientConfig, options: Options = {}): Subscriber {
const { paranoidChecking = 30000 } = options
const {
paranoidChecking = 30000,
parse = JSON.parse,
serialize = JSON.stringify
} = options

const emitter = new EventEmitter() as TypedEventEmitter<PgListenEvents>
emitter.setMaxListeners(0) // unlimited listeners
Expand All @@ -197,7 +213,7 @@ function createPostgresSubscriber (connectionConfig?: pg.ClientConfig, options:

const initialize = (client: pg.Client) => {
// Wire the DB client events to our exposed emitter's events
cancelEventForwarding = forwardDBNotificationEvents(client, emitter)
cancelEventForwarding = forwardDBNotificationEvents(client, emitter, parse)

dbClient.on("error", (error: any) => {
if (!reinitializingRightNow) {
Expand Down Expand Up @@ -282,7 +298,8 @@ function createPostgresSubscriber (connectionConfig?: pg.ClientConfig, options:
},
notify (channelName: string, payload: any) {
notificationLogger(`Sending PostgreSQL notification to "${channelName}":`, payload)
return dbClient.query(`NOTIFY ${format.ident(channelName)}, ${format.literal(JSON.stringify(payload))}`)
const serialized = serialize(payload)
return dbClient.query(`NOTIFY ${format.ident(channelName)}, ${format.literal(serialized)}`)
},
unlisten (channelName: string) {
if (subscribedChannels.indexOf(channelName) === -1) {
Expand Down
36 changes: 35 additions & 1 deletion test/integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,41 @@ test("can listen and notify", async t => {
}
})

test("getting notified after connection is terminated", async t => {
test("can use custom `parse` function", async t => {
const notifications: PgParsedNotification[] = []
const receivedPayloads: any[] = []

const connectionString = "postgres://postgres:postgres@localhost:5432/postgres"

const hub = createPostgresSubscriber(
{ connectionString },
{ parse: (base64: string) => Buffer.from(base64, "base64").toString("utf8") }
)
await hub.connect()

let client = new pg.Client({ connectionString })
await client.connect()

try {
await hub.listenTo("test")
await hub.events.on("notification", (notification: PgParsedNotification) => notifications.push(notification))

await client.query(`NOTIFY test, '${Buffer.from("I am a payload.", "utf8").toString("base64")}'`)
await delay(200)

t.deepEqual(notifications, [
{
channel: "test",
payload: "I am a payload.",
processId: notifications[0].processId
}
])
} finally {
await hub.close()
}
})

test.serial("getting notified after connection is terminated", async t => {
const notifications: PgParsedNotification[] = []
const receivedPayloads: any[] = []
let reconnects = 0
Expand Down

0 comments on commit c9a731b

Please sign in to comment.