diff --git a/packages/runtime/src/tag.ts b/packages/runtime/src/tag.ts index 7f242bc8..26619574 100644 --- a/packages/runtime/src/tag.ts +++ b/packages/runtime/src/tag.ts @@ -2,8 +2,14 @@ import { SQLQueryIR, parseTSQuery, TSQueryAST } from '@pgtyped/parser'; import { processSQLQueryIR } from './preprocessor-sql.js'; import { processTSQueryAST } from './preprocessor-ts.js'; +export interface ICursor { + read(rowCount: number): Promise; + close(): Promise; +} + export interface IDatabaseConnection { query: (query: string, bindings: any[]) => Promise<{ rows: any[] }>; + stream?: (query: string, bindings: any[]) => ICursor; } /** Check for column modifier suffixes (exclamation and question marks). */ @@ -32,6 +38,11 @@ export class TaggedQuery { dbConnection: IDatabaseConnection, ) => Promise>; + public stream: ( + params: TTypePair['params'], + dbConnection: IDatabaseConnection, + ) => ICursor>; + private readonly query: TSQueryAST; constructor(query: TSQueryAST) { @@ -44,6 +55,24 @@ export class TaggedQuery { const result = await connection.query(processedQuery, bindings); return mapQueryResultRows(result.rows); }; + this.stream = (params, connection) => { + const { query: processedQuery, bindings } = processTSQueryAST( + this.query, + params as any, + ); + if (connection.stream == null) + throw new Error("Connection doesn't support streaming."); + const cursor = connection.stream(processedQuery, bindings); + return { + async read(rowCount: number) { + const rows = await cursor.read(rowCount); + return mapQueryResultRows(rows); + }, + async close() { + await cursor.close(); + }, + }; + }; } } @@ -66,6 +95,11 @@ export class PreparedQuery { dbConnection: IDatabaseConnection, ) => Promise>; + public stream: ( + params: TParamType, + dbConnection: IDatabaseConnection, + ) => ICursor>; + private readonly queryIR: SQLQueryIR; constructor(queryIR: SQLQueryIR) { @@ -78,6 +112,24 @@ export class PreparedQuery { const result = await connection.query(processedQuery, bindings); return mapQueryResultRows(result.rows); }; + this.stream = (params, connection) => { + const { query: processedQuery, bindings } = processSQLQueryIR( + this.queryIR, + params as any, + ); + if (connection.stream == null) + throw new Error("Connection doesn't support streaming."); + const cursor = connection.stream(processedQuery, bindings); + return { + async read(rowCount: number) { + const rows = await cursor.read(rowCount); + return mapQueryResultRows(rows); + }, + async close() { + await cursor.close(); + }, + }; + }; } }