Skip to content

Latest commit

 

History

History
181 lines (144 loc) · 6.13 KB

README.md

File metadata and controls

181 lines (144 loc) · 6.13 KB

kysely-pg-observables

kysely-pg-observables makes reactive programming with Kysely and PostgreSQL easy and safe.

It wraps PostgreSQL's wal2json plugins, and let's you interact with the results using the type-information you have already defined for your Kysely database.

Because it depends on wal2json, it only works with installations of PostgreSQL that support it, which fortunately includes RDS, Google Cloud SQL, Supabase, default Patroni images, Crunchy images, and many others. It does not work with Neon (or Vercel Postgres) yet.

There are two main things you can do with kysely-pg-observables:

  1. Create a type-safe RxJS Subject from a Kysely database
  2. Create full observable's from Kysely queries

1. Create a type-safe RxJS Subject from a Kysely database

import { getKyselyChanges } from "kysely-pg-observables";

const pool = new Pool({
  connectionString: process.env.DATABASE_URL
});

const dialect = new PostgresDialect({
  pool: testPool,
});

type MyDatabaseType = {
  widgets: {
    id: number;
    name: string;
    color: string;
    active: boolean;
  };
  tableWithOtherPrimaryKey: {
    other_id: number;
    name: string;
    color: string;
  };
};

const db = new Kysely<MyDatabaseType>({
  dialect,
});

const changes = await getKyselyChanges(
  testPool,
  db,
  ["widgets"] // the list of tables you actually care about changes for
  {
    otherPrimaryKey: ["otherId"], // the name of the primary key for tableWithOtherPrimaryKey
  }
);

// subject is an RxJs subject that you can subscribe to, operate on, create more observables from, etc.
const subject = changes.subject

const subscription = subject.subscribe({
  next: (change) => {
    // Change is typed as:
    // {
    //   table: string;
    //   event: 'insert' | 'update' | 'delete';
    //   row: Database[table] (if event is insert or update)
    //   identity: // a partial Database[table] containing only the primary keys of the deleted row
    // }
  }
})

// When ready, cancel the wal slot and subscription altogether
subscription.unsubscribe()

// There is also changes.teardown() to fully tear everything down

getKyselyChanges takes 4 parameters:

  • pool: a pg.Pool instance that is used to connect to the database
  • db: a Kysely instance that is used to get the type information for the database
  • tables: an array of table names that you want to get changes for
  • primaryKeyOverrides: an object that maps tables to their primary keys

Why do I need to provide tables?

In order to make the events generated by wal2json type-safe, we need to limit the tables that we get changes for to only the tables that we know type information for. If we generated events for all tables, we would have to filter for them inside of the application, but we don't have a list of tables (just a type of tables).

What is this primary key override business?

For deleted records, the only information available is the value of the primary key(s) of the deleted record. By default, we'll assume that any id column on a table is a primary key. If that is not true for any table, just include its name and an array of columns which are the primary keys of the table, and we'll use that for delete event's identities instead.

2. Create full observable's from Kysely queries

You can create full observables which re-run the query whenever the underlying data changes.

import { observeQuery } from "kysely-pg-observables";

// setup otherwise similar as before
const { subject, teardown } = await getKyselyChanges(
  testPool,
  db,
  ["widgets"],
  undefined,
  { slotId }
);

const observable = await observeQuery(
  observeQuery(
    subject,
    () => db.selectFrom('widgets').selectAll().execute(),
    {
      widgets: {
        insert(row) {
          return true; // Assuming we want to re-run the query for every insert
        },
        update(row, lastResult) {
          return lastResult.some((widget) => widget.id === row.id); // Re-run only if the updated widget was in the last result
        },
        delete(identity, lastResult) {
          return lastResult.some((widget) => widget.id === identity.id); // Re-run only if the deleted widget was in the last result
        },
      },
    }
  )
);

observable.subscribe(result => {
  // This callback will fire every time the underlying data changes
})

This can be used for a query of any complexity. Do whatever you want inside of your async function! Just make sure that you take into account the way the underlying data can change and how that will influence your query results.

Usage With trpc

Although trpc does not use RxJS under the hood, the resulting observable is compatible with the type expected by trpc subscriptions.

This means you cand do:

  allWidgets: procedure.subscription(() => {
      return observeQuery(
        subject,
        () => db.selectFrom('widgets').selectAll().execute(),
        {
          widgets: {
            insert(row) {
              return true; // Assuming we want to re-run the query for every insert
            },
            update(row, lastResult) {
              return lastResult.some((widget) => widget.id === row.id); // Re-run only if the updated widget was in the last result
            },
            delete(identity, lastResult) {
              return lastResult.some((widget) => widget.id === identity.id); // Re-run only if the deleted widget was in the last result
            },
          },
        }
      )
    }
  })

Because the observable's are compatible, properly unsubscribing when the client disconnects is handled for you.

Sharing an observable between multiple trpc subscriptions clients

If you want to share an observable between multiple clients, you can pass a resulting observable to the multicast function and then create multiple observables from the same source.

If you share an observable between multiple clients, and do not multi-cast it, each event from the observable will only be fired once, and both clients will not get each event. I do not know who will get which one.

Roadmap

  • I'd like to support json-patch, with client-side utilities as well.