kysely-pg-observables
makes reactive programming with Kysely and PostgreSQL easy and safe.
It wraps PostgreSQL's wal2json
plugins, and let's you interact with the results using the type-information
you have already defined for your Kysely database.
Because it depends on wal2json
, it only works with installations of PostgreSQL that support it, which fortunately
includes RDS, Google Cloud SQL, Supabase, default Patroni images, Crunchy images, and many others.
It does not work with Neon (or Vercel Postgres) yet.
There are two main things you can do with kysely-pg-observables
:
- Create a type-safe RxJS
Subject
from a Kysely database - Create full observable's from Kysely queries
import { getKyselyChanges } from "kysely-pg-observables";
const pool = new Pool({
connectionString: process.env.DATABASE_URL
});
const dialect = new PostgresDialect({
pool: testPool,
});
type MyDatabaseType = {
widgets: {
id: number;
name: string;
color: string;
active: boolean;
};
tableWithOtherPrimaryKey: {
other_id: number;
name: string;
color: string;
};
};
const db = new Kysely<MyDatabaseType>({
dialect,
});
const changes = await getKyselyChanges(
testPool,
db,
["widgets"] // the list of tables you actually care about changes for
{
otherPrimaryKey: ["otherId"], // the name of the primary key for tableWithOtherPrimaryKey
}
);
// subject is an RxJs subject that you can subscribe to, operate on, create more observables from, etc.
const subject = changes.subject
const subscription = subject.subscribe({
next: (change) => {
// Change is typed as:
// {
// table: string;
// event: 'insert' | 'update' | 'delete';
// row: Database[table] (if event is insert or update)
// identity: // a partial Database[table] containing only the primary keys of the deleted row
// }
}
})
// When ready, cancel the wal slot and subscription altogether
subscription.unsubscribe()
// There is also changes.teardown() to fully tear everything down
getKyselyChanges
takes 4 parameters:
pool
: apg.Pool
instance that is used to connect to the databasedb
: aKysely
instance that is used to get the type information for the databasetables
: an array of table names that you want to get changes forprimaryKeyOverrides
: an object that maps tables to their primary keys
In order to make the events generated by wal2json
type-safe, we need to limit the tables that we get changes for
to only the tables that we know type information for. If we generated events for all tables, we would have to
filter for them inside of the application, but we don't have a list of tables (just a type of tables).
For deleted records, the only information available is the value of the primary key(s) of the deleted record.
By default, we'll assume that any id
column on a table is a primary key. If that is not true for any table,
just include its name and an array of columns which are the primary keys of the table, and we'll use that
for delete event's identities instead.
You can create full observables which re-run the query whenever the underlying data changes.
import { observeQuery } from "kysely-pg-observables";
// setup otherwise similar as before
const { subject, teardown } = await getKyselyChanges(
testPool,
db,
["widgets"],
undefined,
{ slotId }
);
const observable = await observeQuery(
observeQuery(
subject,
() => db.selectFrom('widgets').selectAll().execute(),
{
widgets: {
insert(row) {
return true; // Assuming we want to re-run the query for every insert
},
update(row, lastResult) {
return lastResult.some((widget) => widget.id === row.id); // Re-run only if the updated widget was in the last result
},
delete(identity, lastResult) {
return lastResult.some((widget) => widget.id === identity.id); // Re-run only if the deleted widget was in the last result
},
},
}
)
);
observable.subscribe(result => {
// This callback will fire every time the underlying data changes
})
This can be used for a query of any complexity. Do whatever you want inside of your async function! Just make sure that you take into account the way the underlying data can change and how that will influence your query results.
Although trpc does not use RxJS under the hood, the resulting observable is compatible with the type expected by trpc subscriptions.
This means you cand do:
allWidgets: procedure.subscription(() => {
return observeQuery(
subject,
() => db.selectFrom('widgets').selectAll().execute(),
{
widgets: {
insert(row) {
return true; // Assuming we want to re-run the query for every insert
},
update(row, lastResult) {
return lastResult.some((widget) => widget.id === row.id); // Re-run only if the updated widget was in the last result
},
delete(identity, lastResult) {
return lastResult.some((widget) => widget.id === identity.id); // Re-run only if the deleted widget was in the last result
},
},
}
)
}
})
Because the observable's are compatible, properly unsubscribing when the client disconnects is handled for you.
If you want to share an observable between multiple clients, you can pass a resulting observable to the
multicast
function and then create multiple observables from the same source.
If you share an observable between multiple clients, and do not multi-cast it, each event from the observable will only be fired once, and both clients will not get each event. I do not know who will get which one.
- I'd like to support json-patch, with client-side utilities as well.